1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 vector_dimension: None,
112 vector_metric: None,
113 context_index_fields: Vec::new(),
114 declared_columns: Vec::new(),
115 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117 timestamps_enabled: false,
118 context_index_enabled: false,
119 metrics_raw_retention_ms: None,
120 metrics_rollup_policies: Vec::new(),
121 metrics_tenant_identity: None,
122 metrics_namespace: None,
123 append_only: false,
126 subscriptions: Vec::new(),
127 analytics_config: Vec::new(),
128 session_key: None,
129 session_gap_ms: None,
130 retention_duration_ms: None,
131 analytical_storage: None,
132 })
133 .map(|_| ())
134 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
135}
136
137fn implicit_contract_unix_ms() -> u128 {
138 std::time::SystemTime::now()
139 .duration_since(std::time::UNIX_EPOCH)
140 .unwrap_or_default()
141 .as_millis()
142}
143
144fn collection_model_allows(
145 declared_model: crate::catalog::CollectionModel,
146 requested_model: crate::catalog::CollectionModel,
147) -> bool {
148 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
149}
150
151fn ensure_vector_dimension_contract(
152 db: &crate::storage::unified::devx::RedDB,
153 collection: &str,
154 actual_dimension: usize,
155) -> RedDBResult<()> {
156 let Some(expected_dimension) = db
157 .collection_contract(collection)
158 .and_then(|contract| contract.vector_dimension)
159 else {
160 return Ok(());
161 };
162 if expected_dimension == actual_dimension {
163 return Ok(());
164 }
165 Err(crate::RedDBError::Query(format!(
166 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
167 )))
168}
169
170fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
171 match model {
172 crate::catalog::CollectionModel::Table => "table",
173 crate::catalog::CollectionModel::Document => "document",
174 crate::catalog::CollectionModel::Graph => "graph",
175 crate::catalog::CollectionModel::Vector => "vector",
176 crate::catalog::CollectionModel::Hll => "hll",
177 crate::catalog::CollectionModel::Sketch => "sketch",
178 crate::catalog::CollectionModel::Filter => "filter",
179 crate::catalog::CollectionModel::Kv => "kv",
180 crate::catalog::CollectionModel::Config => "config",
181 crate::catalog::CollectionModel::Vault => "vault",
182 crate::catalog::CollectionModel::Mixed => "mixed",
183 crate::catalog::CollectionModel::TimeSeries => "timeseries",
184 crate::catalog::CollectionModel::Queue => "queue",
185 crate::catalog::CollectionModel::Metrics => "metrics",
186 }
187}
188
189#[derive(Clone)]
190struct UniquenessRule {
191 name: String,
192 columns: Vec<String>,
193 primary_key: bool,
194}
195
196#[derive(Copy, Clone, PartialEq, Eq)]
197enum NormalizeMode {
198 Insert,
202 Update,
206}
207
208mod collection_contract_enforcement {
209 use super::*;
210
211 pub(super) struct CollectionContractWriteEnforcer<'a> {
212 db: &'a crate::storage::unified::devx::RedDB,
213 collection: &'a str,
214 }
215
216 impl<'a> CollectionContractWriteEnforcer<'a> {
217 pub(super) fn new(
218 db: &'a crate::storage::unified::devx::RedDB,
219 collection: &'a str,
220 ) -> Self {
221 Self { db, collection }
222 }
223
224 pub(super) fn ensure_model(
225 &self,
226 requested_model: crate::catalog::CollectionModel,
227 ) -> RedDBResult<()> {
228 ensure_collection_model_contract(self.db, self.collection, requested_model)
229 }
230
231 pub(super) fn normalize_insert_fields(
232 &self,
233 fields: Vec<(String, Value)>,
234 ) -> RedDBResult<Vec<(String, Value)>> {
235 normalize_row_fields_for_contract_with_mode(
236 self.db,
237 self.collection,
238 fields,
239 NormalizeMode::Insert,
240 )
241 }
242
243 pub(super) fn normalize_update_fields(
244 &self,
245 fields: Vec<(String, Value)>,
246 ) -> RedDBResult<Vec<(String, Value)>> {
247 normalize_row_fields_for_contract_with_mode(
248 self.db,
249 self.collection,
250 fields,
251 NormalizeMode::Update,
252 )
253 }
254
255 pub(super) fn enforce_row_uniqueness(
256 &self,
257 fields: &[(String, Value)],
258 exclude_id: Option<crate::storage::EntityId>,
259 ) -> RedDBResult<()> {
260 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
261 }
262
263 pub(super) fn enforce_batch_uniqueness(
264 &self,
265 rows: &[Vec<(String, Value)>],
266 ) -> RedDBResult<()> {
267 enforce_row_batch_uniqueness(self.db, self.collection, rows)
268 }
269
270 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
271 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
272 }
273 }
274}
275
276use collection_contract_enforcement::CollectionContractWriteEnforcer;
277
278fn normalize_row_fields_for_contract_with_mode(
279 db: &crate::storage::unified::devx::RedDB,
280 collection: &str,
281 fields: Vec<(String, Value)>,
282 mode: NormalizeMode,
283) -> RedDBResult<Vec<(String, Value)>> {
284 let Some(contract) = db.collection_contract(collection) else {
285 return Ok(fields);
286 };
287
288 if contract.declared_model != crate::catalog::CollectionModel::Table
289 || (contract.declared_columns.is_empty()
290 && contract
291 .table_def
292 .as_ref()
293 .map(|table| table.columns.is_empty())
294 .unwrap_or(true))
295 {
296 return Ok(fields);
297 }
298
299 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
309 fields
310 .iter()
311 .find(|(n, _)| n == "created_at")
312 .map(|(_, v)| v.clone())
313 } else {
314 None
315 };
316
317 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
322 for (name, _) in &fields {
323 if name == "created_at" || name == "updated_at" {
324 return Err(crate::RedDBError::Query(format!(
325 "collection '{}' manages '{}' automatically — do not set it in INSERT",
326 collection, name
327 )));
328 }
329 }
330 }
331
332 let mut provided = std::collections::BTreeMap::new();
333 for (name, value) in &fields {
334 provided.insert(name.clone(), value.clone());
335 }
336
337 let resolved_columns = resolved_contract_columns(&contract)?;
338 let declared_names: std::collections::BTreeSet<String> = resolved_columns
339 .iter()
340 .map(|column| column.name.clone())
341 .collect();
342 let unknown_fields: Vec<String> = fields
343 .iter()
344 .filter(|(name, _)| !declared_names.contains(name))
345 .map(|(name, _)| name.clone())
346 .collect();
347 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
348 && !unknown_fields.is_empty()
349 {
350 return Err(crate::RedDBError::Query(format!(
351 "collection '{}' is strict and does not allow undeclared fields: {}",
352 collection,
353 unknown_fields.join(", ")
354 )));
355 }
356 let mut normalized = Vec::new();
357 let now_ms = current_unix_ms_u64();
358
359 for column in &resolved_columns {
360 match provided.remove(&column.name) {
361 Some(value) => {
362 if contract.timestamps_enabled && mode == NormalizeMode::Update {
367 match column.name.as_str() {
368 "created_at" => {
369 normalized.push((
370 column.name.clone(),
371 existing_created_at
372 .clone()
373 .unwrap_or(Value::UnsignedInteger(now_ms)),
374 ));
375 continue;
376 }
377 "updated_at" => {
378 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
379 continue;
380 }
381 _ => {}
382 }
383 }
384 normalized.push((
385 column.name.clone(),
386 normalize_contract_value(collection, column, value)?,
387 ));
388 }
389 None => {
390 if contract.timestamps_enabled
394 && (column.name == "created_at" || column.name == "updated_at")
395 {
396 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
397 continue;
398 }
399 if let Some(default) = &column.default {
400 normalized.push((
401 column.name.clone(),
402 coerce_contract_literal(collection, &column.name, column, default)?,
403 ));
404 } else if column.not_null {
405 return Err(crate::RedDBError::Query(format!(
406 "missing required column '{}' for collection '{}'",
407 column.name, collection
408 )));
409 }
410 }
411 }
412 }
413
414 for (name, value) in fields {
415 if !declared_names.contains(&name) {
416 normalized.push((name, value));
417 }
418 }
419
420 Ok(normalized)
421}
422
423fn current_unix_ms_u64() -> u64 {
424 std::time::SystemTime::now()
425 .duration_since(std::time::UNIX_EPOCH)
426 .map(|d| d.as_millis() as u64)
427 .unwrap_or(0)
428}
429
430fn enforce_row_uniqueness(
431 db: &crate::storage::unified::devx::RedDB,
432 collection: &str,
433 fields: &[(String, Value)],
434 exclude_id: Option<crate::storage::EntityId>,
435) -> RedDBResult<()> {
436 let Some(contract) = db.collection_contract(collection) else {
437 return Ok(());
438 };
439 if contract.declared_model != crate::catalog::CollectionModel::Table
440 && contract.declared_model != crate::catalog::CollectionModel::Mixed
441 {
442 return Ok(());
443 }
444
445 let rules = resolved_uniqueness_rules(&contract);
446 if rules.is_empty() {
447 return Ok(());
448 }
449
450 let store = db.store();
451 let Some(manager) = store.get_collection(collection) else {
452 return Ok(());
453 };
454
455 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
456
457 for rule in &rules {
458 let mut expected_signatures = Vec::new();
459 let mut skip_rule = false;
460
461 for column in &rule.columns {
462 match input_fields.get(column) {
463 Some(Value::Null) | None if rule.primary_key => {
464 return Err(crate::RedDBError::Query(format!(
465 "primary key '{}' in collection '{}' requires non-null column '{}'",
466 rule.name, collection, column
467 )))
468 }
469 Some(Value::Null) | None => {
470 skip_rule = true;
471 break;
472 }
473 Some(value) => {
474 expected_signatures.push((column.clone(), value_signature(value)));
475 }
476 }
477 }
478
479 if skip_rule {
480 continue;
481 }
482
483 for entity in manager.query_all(|_| true) {
484 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
485 continue;
486 }
487 let Some(existing_fields) = row_fields_from_entity(&entity) else {
488 continue;
489 };
490
491 let duplicate = expected_signatures.iter().all(|(column, expected)| {
492 existing_fields
493 .get(column)
494 .map(|value| value_signature(value) == *expected)
495 .unwrap_or(false)
496 });
497
498 if duplicate {
499 let qualifier = if rule.primary_key {
500 "primary key"
501 } else {
502 "unique constraint"
503 };
504 return Err(crate::RedDBError::Query(format!(
505 "{} '{}' violated on collection '{}' for columns [{}]",
506 qualifier,
507 rule.name,
508 collection,
509 rule.columns.join(", ")
510 )));
511 }
512 }
513 }
514
515 Ok(())
516}
517
518fn enforce_row_batch_uniqueness(
519 db: &crate::storage::unified::devx::RedDB,
520 collection: &str,
521 rows: &[Vec<(String, Value)>],
522) -> RedDBResult<()> {
523 let Some(contract) = db.collection_contract(collection) else {
524 return Ok(());
525 };
526 if contract.declared_model != crate::catalog::CollectionModel::Table
527 && contract.declared_model != crate::catalog::CollectionModel::Mixed
528 {
529 return Ok(());
530 }
531
532 let rules = resolved_uniqueness_rules(&contract);
533 if rules.is_empty() {
534 return Ok(());
535 }
536
537 for rule in &rules {
538 let mut seen = std::collections::HashMap::<String, usize>::new();
539 for (row_index, fields) in rows.iter().enumerate() {
540 let input_fields: std::collections::BTreeMap<String, Value> =
541 fields.iter().cloned().collect();
542 let mut signatures = Vec::new();
543 let mut skip_rule = false;
544
545 for column in &rule.columns {
546 match input_fields.get(column) {
547 Some(Value::Null) | None if rule.primary_key => {
548 return Err(crate::RedDBError::Query(format!(
549 "primary key '{}' in collection '{}' requires non-null column '{}'",
550 rule.name, collection, column
551 )))
552 }
553 Some(Value::Null) | None => {
554 skip_rule = true;
555 break;
556 }
557 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
558 }
559 }
560
561 if skip_rule {
562 continue;
563 }
564
565 let signature = signatures.join("|");
566 if let Some(previous_index) = seen.insert(signature, row_index) {
567 return Err(crate::RedDBError::Query(format!(
568 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
569 rule.name,
570 collection,
571 previous_index + 1,
572 row_index + 1
573 )));
574 }
575 }
576 }
577
578 Ok(())
579}
580
581fn row_update_requires_uniqueness_check(
582 db: &crate::storage::unified::devx::RedDB,
583 collection: &str,
584 modified_columns: &[String],
585) -> bool {
586 if modified_columns.is_empty() {
587 return false;
588 }
589
590 let Some(contract) = db.collection_contract(collection) else {
591 return false;
592 };
593 if contract.declared_model != crate::catalog::CollectionModel::Table
594 && contract.declared_model != crate::catalog::CollectionModel::Mixed
595 {
596 return false;
597 }
598
599 let rules = resolved_uniqueness_rules(&contract);
600 if rules.is_empty() {
601 return false;
602 }
603
604 rules.iter().any(|rule| {
605 rule.columns.iter().any(|column| {
606 modified_columns
607 .iter()
608 .any(|modified| modified.eq_ignore_ascii_case(column))
609 })
610 })
611}
612
613pub(crate) fn build_row_update_contract_plan(
614 db: &crate::storage::unified::devx::RedDB,
615 collection: &str,
616) -> RedDBResult<Option<RowUpdateContractPlan>> {
617 let Some(contract) = db.collection_contract(collection) else {
618 return Ok(None);
619 };
620
621 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
622 && !(contract.declared_columns.is_empty()
623 && contract
624 .table_def
625 .as_ref()
626 .map(|table| table.columns.is_empty())
627 .unwrap_or(true))
628 {
629 resolved_contract_columns(&contract)?
630 .into_iter()
631 .map(|rule| {
632 (
633 rule.name.clone(),
634 RowUpdateColumnRule {
635 name: rule.name,
636 data_type: rule.data_type,
637 data_type_name: rule.data_type_name,
638 not_null: rule.not_null,
639 enum_variants: rule.enum_variants,
640 },
641 )
642 })
643 .collect()
644 } else {
645 HashMap::new()
646 };
647
648 let unique_columns = if matches!(
649 contract.declared_model,
650 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
651 ) {
652 resolved_uniqueness_rules(&contract)
653 .into_iter()
654 .flat_map(|rule| rule.columns.into_iter())
655 .map(|column| (column, ()))
656 .collect()
657 } else {
658 HashMap::new()
659 };
660
661 Ok(Some(RowUpdateContractPlan {
662 timestamps_enabled: contract.timestamps_enabled,
663 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
664 declared_rules,
665 unique_columns,
666 }))
667}
668
669pub(crate) fn normalize_row_update_assignment_with_plan(
670 collection: &str,
671 column: &str,
672 value: Value,
673 row_contract_plan: Option<&RowUpdateContractPlan>,
674) -> RedDBResult<Value> {
675 let Some(plan) = row_contract_plan else {
676 return Ok(value);
677 };
678
679 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
680 return Err(crate::RedDBError::Query(format!(
681 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
682 collection, column
683 )));
684 }
685
686 if let Some(rule) = plan.declared_rules.get(column) {
687 let rule = ResolvedColumnRule {
688 name: rule.name.clone(),
689 data_type: rule.data_type,
690 data_type_name: rule.data_type_name.clone(),
691 not_null: rule.not_null,
692 default: None,
693 enum_variants: rule.enum_variants.clone(),
694 };
695 normalize_contract_value(collection, &rule, value)
696 } else if plan.strict_schema {
697 Err(crate::RedDBError::Query(format!(
698 "collection '{}' is strict and does not allow undeclared fields: {}",
699 collection, column
700 )))
701 } else {
702 Ok(value)
703 }
704}
705
706pub(crate) fn normalize_row_update_value_for_rule(
707 collection: &str,
708 value: Value,
709 row_rule: Option<&RowUpdateColumnRule>,
710) -> RedDBResult<Value> {
711 let Some(rule) = row_rule else {
712 return Ok(value);
713 };
714
715 let rule = ResolvedColumnRule {
716 name: rule.name.clone(),
717 data_type: rule.data_type,
718 data_type_name: rule.data_type_name.clone(),
719 not_null: rule.not_null,
720 default: None,
721 enum_variants: rule.enum_variants.clone(),
722 };
723 normalize_contract_value(collection, &rule, value)
724}
725
726fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
727 if let Some(named) = row.named.as_mut() {
728 named.insert(name.to_string(), value);
729 return;
730 }
731
732 if let Some(schema) = row.schema.as_ref() {
733 if let Some(index) = schema.iter().position(|column| column == name) {
734 if let Some(slot) = row.columns.get_mut(index) {
735 *slot = value;
736 return;
737 }
738 }
739
740 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
741 for (column, current) in schema.iter().zip(row.columns.iter()) {
742 named.insert(column.clone(), current.clone());
743 }
744 named.insert(name.to_string(), value);
745 row.named = Some(named);
746 return;
747 }
748
749 let mut named = HashMap::with_capacity(1);
750 named.insert(name.to_string(), value);
751 row.named = Some(named);
752}
753
754fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
755 if let Some(named) = row.named.as_ref() {
756 named
757 .iter()
758 .map(|(key, value)| (key.clone(), value.clone()))
759 .collect()
760 } else if let Some(schema) = row.schema.as_ref() {
761 schema
762 .iter()
763 .cloned()
764 .zip(row.columns.iter().cloned())
765 .collect()
766 } else {
767 Vec::new()
768 }
769}
770
771fn apply_row_field_assignments_raw<I>(
772 row: &mut crate::storage::unified::entity::RowData,
773 field_assignments: I,
774) where
775 I: IntoIterator<Item = (String, Value)>,
776{
777 for (column, value) in field_assignments {
778 set_row_field(row, &column, value);
779 }
780}
781
782fn apply_row_field_assignments_incremental<I>(
783 collection: &str,
784 row: &mut crate::storage::unified::entity::RowData,
785 field_assignments: I,
786 row_contract_plan: Option<&RowUpdateContractPlan>,
787) -> RedDBResult<()>
788where
789 I: IntoIterator<Item = (String, Value)>,
790{
791 for (column, value) in field_assignments {
792 let value = normalize_row_update_assignment_with_plan(
793 collection,
794 &column,
795 value,
796 row_contract_plan,
797 )?;
798
799 set_row_field(row, &column, value);
800 }
801
802 Ok(())
803}
804
805fn resolved_uniqueness_rules(
806 contract: &crate::physical::CollectionContract,
807) -> Vec<UniquenessRule> {
808 let mut rules = Vec::new();
809
810 if let Some(table_def) = &contract.table_def {
811 if !table_def.primary_key.is_empty() {
812 rules.push(UniquenessRule {
813 name: "primary_key".to_string(),
814 columns: table_def.primary_key.clone(),
815 primary_key: true,
816 });
817 }
818
819 for constraint in &table_def.constraints {
820 if matches!(
821 constraint.constraint_type,
822 crate::storage::schema::ConstraintType::PrimaryKey
823 ) && !constraint.columns.is_empty()
824 {
825 rules.push(UniquenessRule {
826 name: constraint.name.clone(),
827 columns: constraint.columns.clone(),
828 primary_key: true,
829 });
830 } else if matches!(
831 constraint.constraint_type,
832 crate::storage::schema::ConstraintType::Unique
833 ) && !constraint.columns.is_empty()
834 {
835 rules.push(UniquenessRule {
836 name: constraint.name.clone(),
837 columns: constraint.columns.clone(),
838 primary_key: false,
839 });
840 }
841 }
842 } else {
843 for column in &contract.declared_columns {
844 if column.primary_key {
845 rules.push(UniquenessRule {
846 name: format!("pk_{}", column.name),
847 columns: vec![column.name.clone()],
848 primary_key: true,
849 });
850 } else if column.unique {
851 rules.push(UniquenessRule {
852 name: format!("uniq_{}", column.name),
853 columns: vec![column.name.clone()],
854 primary_key: false,
855 });
856 }
857 }
858 }
859
860 let mut dedup = std::collections::BTreeSet::new();
861 rules
862 .into_iter()
863 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
864 .collect()
865}
866
867fn row_fields_from_entity(
868 entity: &crate::storage::UnifiedEntity,
869) -> Option<std::collections::BTreeMap<String, Value>> {
870 match &entity.data {
871 crate::storage::EntityData::Row(row) => {
872 if let Some(named) = &row.named {
873 Some(
874 named
875 .iter()
876 .map(|(key, value)| (key.clone(), value.clone()))
877 .collect(),
878 )
879 } else {
880 row.schema.as_ref().map(|schema| {
881 schema
882 .iter()
883 .cloned()
884 .zip(row.columns.iter().cloned())
885 .collect()
886 })
887 }
888 }
889 _ => None,
890 }
891}
892
893fn value_signature(value: &Value) -> String {
894 format!("{value:?}")
895}
896
897fn normalize_contract_value(
898 collection: &str,
899 column: &ResolvedColumnRule,
900 value: Value,
901) -> RedDBResult<Value> {
902 if matches!(value, Value::Null) {
903 if column.not_null {
904 return Err(crate::RedDBError::Query(format!(
905 "column '{}' in collection '{}' cannot be null",
906 column.name, collection
907 )));
908 }
909 return Ok(Value::Null);
910 }
911
912 let target = column.data_type;
913 if value_matches_declared_type(&value, target) {
914 return Ok(value);
915 }
916
917 let Some(raw) = value_to_coercion_input(&value) else {
918 return Err(crate::RedDBError::Query(format!(
919 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
920 column.name, collection, column.data_type_name
921 )));
922 };
923
924 coerce_contract_literal(collection, &column.name, column, &raw)
925}
926
927fn coerce_contract_literal(
928 collection: &str,
929 column_name: &str,
930 column: &ResolvedColumnRule,
931 raw: &str,
932) -> RedDBResult<Value> {
933 let target = column.data_type;
934 match target {
935 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
936 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
937 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
938 crate::RedDBError::Query(format!(
939 "failed to coerce column '{}' in collection '{}' to '{}': {}",
940 column_name, collection, column.data_type_name, err
941 ))
942 }),
943 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
944 crate::RedDBError::Query(format!(
945 "failed to coerce column '{}' in collection '{}' to '{}': {}",
946 column_name, collection, column.data_type_name, err
947 ))
948 }),
949 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
950 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
951 column_name, collection, column.data_type_name
952 ))),
953 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
954 |err| {
955 crate::RedDBError::Query(format!(
956 "failed to coerce column '{}' in collection '{}' to '{}': {}",
957 column_name, collection, column.data_type_name, err
958 ))
959 },
960 ),
961 }
962}
963
964struct ResolvedColumnRule {
965 name: String,
966 data_type: DataType,
967 data_type_name: String,
968 not_null: bool,
969 default: Option<String>,
970 enum_variants: Vec<String>,
971}
972
973fn resolved_contract_columns(
974 contract: &crate::physical::CollectionContract,
975) -> RedDBResult<Vec<ResolvedColumnRule>> {
976 if let Some(table_def) = &contract.table_def {
977 return Ok(table_def
978 .columns
979 .iter()
980 .map(|column| ResolvedColumnRule {
981 name: column.name.clone(),
982 data_type: column.data_type,
983 data_type_name: data_type_name(column.data_type).to_string(),
984 not_null: !column.nullable,
985 default: column
986 .default
987 .as_ref()
988 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
989 enum_variants: column.enum_variants.clone(),
990 })
991 .collect());
992 }
993
994 contract
995 .declared_columns
996 .iter()
997 .map(|column| {
998 let data_type = column
999 .sql_type
1000 .as_ref()
1001 .map(crate::storage::query::resolve_sql_type_name)
1002 .transpose()
1003 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1004 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1005 Ok(ResolvedColumnRule {
1006 name: column.name.clone(),
1007 data_type,
1008 data_type_name: column.data_type.clone(),
1009 not_null: column.not_null,
1010 default: column.default.clone(),
1011 enum_variants: column.enum_variants.clone(),
1012 })
1013 })
1014 .collect()
1015}
1016
1017fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1018 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1019}
1020
1021fn data_type_name(data_type: DataType) -> &'static str {
1022 match data_type {
1023 DataType::Integer => "integer",
1024 DataType::UnsignedInteger => "unsigned_integer",
1025 DataType::Float => "float",
1026 DataType::Text => "text",
1027 DataType::Blob => "blob",
1028 DataType::Boolean => "boolean",
1029 DataType::Timestamp => "timestamp",
1030 DataType::Duration => "duration",
1031 DataType::IpAddr => "ipaddr",
1032 DataType::MacAddr => "macaddr",
1033 DataType::Vector => "vector",
1034 DataType::Nullable => "nullable",
1035 DataType::Unknown => "unknown",
1036 DataType::Json => "json",
1037 DataType::Uuid => "uuid",
1038 DataType::NodeRef => "noderef",
1039 DataType::EdgeRef => "edgeref",
1040 DataType::VectorRef => "vectorref",
1041 DataType::RowRef => "rowref",
1042 DataType::Color => "color",
1043 DataType::Email => "email",
1044 DataType::Url => "url",
1045 DataType::Phone => "phone",
1046 DataType::Semver => "semver",
1047 DataType::Cidr => "cidr",
1048 DataType::Date => "date",
1049 DataType::Time => "time",
1050 DataType::Decimal => "decimal",
1051 DataType::Enum => "enum",
1052 DataType::Array => "array",
1053 DataType::TimestampMs => "timestamp_ms",
1054 DataType::Ipv4 => "ipv4",
1055 DataType::Ipv6 => "ipv6",
1056 DataType::Subnet => "subnet",
1057 DataType::Port => "port",
1058 DataType::Latitude => "latitude",
1059 DataType::Longitude => "longitude",
1060 DataType::GeoPoint => "geopoint",
1061 DataType::Country2 => "country2",
1062 DataType::Country3 => "country3",
1063 DataType::Lang2 => "lang2",
1064 DataType::Lang5 => "lang5",
1065 DataType::Currency => "currency",
1066 DataType::AssetCode => "asset_code",
1067 DataType::Money => "money",
1068 DataType::ColorAlpha => "color_alpha",
1069 DataType::BigInt => "bigint",
1070 DataType::KeyRef => "keyref",
1071 DataType::DocRef => "docref",
1072 DataType::TableRef => "tableref",
1073 DataType::PageRef => "pageref",
1074 DataType::Secret => "secret",
1075 DataType::Password => "password",
1076 DataType::TextZstd => "text",
1077 DataType::BlobZstd => "blob",
1078 }
1079}
1080
1081fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1082 matches!(
1083 (value, target),
1084 (Value::Null, _)
1085 | (Value::Integer(_), DataType::Integer)
1086 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1087 | (Value::Float(_), DataType::Float)
1088 | (Value::Text(_), DataType::Text)
1089 | (Value::Blob(_), DataType::Blob)
1090 | (Value::Boolean(_), DataType::Boolean)
1091 | (Value::Timestamp(_), DataType::Timestamp)
1092 | (Value::Duration(_), DataType::Duration)
1093 | (Value::IpAddr(_), DataType::IpAddr)
1094 | (Value::MacAddr(_), DataType::MacAddr)
1095 | (Value::Vector(_), DataType::Vector)
1096 | (Value::Json(_), DataType::Json)
1097 | (Value::Uuid(_), DataType::Uuid)
1098 | (Value::NodeRef(_), DataType::NodeRef)
1099 | (Value::EdgeRef(_), DataType::EdgeRef)
1100 | (Value::VectorRef(_, _), DataType::VectorRef)
1101 | (Value::RowRef(_, _), DataType::RowRef)
1102 | (Value::Color(_), DataType::Color)
1103 | (Value::Email(_), DataType::Email)
1104 | (Value::Url(_), DataType::Url)
1105 | (Value::Phone(_), DataType::Phone)
1106 | (Value::Semver(_), DataType::Semver)
1107 | (Value::Cidr(_, _), DataType::Cidr)
1108 | (Value::Date(_), DataType::Date)
1109 | (Value::Time(_), DataType::Time)
1110 | (Value::Decimal(_), DataType::Decimal)
1111 | (Value::EnumValue(_), DataType::Enum)
1112 | (Value::Array(_), DataType::Array)
1113 | (Value::TimestampMs(_), DataType::TimestampMs)
1114 | (Value::Ipv4(_), DataType::Ipv4)
1115 | (Value::Ipv6(_), DataType::Ipv6)
1116 | (Value::Subnet(_, _), DataType::Subnet)
1117 | (Value::Port(_), DataType::Port)
1118 | (Value::Latitude(_), DataType::Latitude)
1119 | (Value::Longitude(_), DataType::Longitude)
1120 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1121 | (Value::Country2(_), DataType::Country2)
1122 | (Value::Country3(_), DataType::Country3)
1123 | (Value::Lang2(_), DataType::Lang2)
1124 | (Value::Lang5(_), DataType::Lang5)
1125 | (Value::Currency(_), DataType::Currency)
1126 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1127 | (Value::BigInt(_), DataType::BigInt)
1128 | (Value::KeyRef(_, _), DataType::KeyRef)
1129 | (Value::DocRef(_, _), DataType::DocRef)
1130 | (Value::TableRef(_), DataType::TableRef)
1131 | (Value::PageRef(_), DataType::PageRef)
1132 | (Value::Secret(_), DataType::Secret)
1133 | (Value::Password(_), DataType::Password)
1134 )
1135}
1136
1137fn value_to_coercion_input(value: &Value) -> Option<String> {
1138 match value {
1139 Value::Null => None,
1140 Value::Integer(value) => Some(value.to_string()),
1141 Value::UnsignedInteger(value) => Some(value.to_string()),
1142 Value::Float(value) => Some(value.to_string()),
1143 Value::Text(value) => Some(value.to_string()),
1144 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1145 Value::Boolean(value) => Some(value.to_string()),
1146 Value::Timestamp(value) => Some(value.to_string()),
1147 Value::Duration(value) => Some(value.to_string()),
1148 Value::IpAddr(value) => Some(value.to_string()),
1149 Value::MacAddr(value) => Some(format!(
1150 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1151 value[0], value[1], value[2], value[3], value[4], value[5]
1152 )),
1153 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1154 Value::Email(value) => Some(value.clone()),
1155 Value::Url(value) => Some(value.clone()),
1156 Value::Phone(value) => Some(value.to_string()),
1157 Value::Semver(value) => Some(format!(
1158 "{}.{}.{}",
1159 value / 1_000_000,
1160 (value / 1_000) % 1_000,
1161 value % 1_000
1162 )),
1163 Value::Date(value) => Some(value.to_string()),
1164 Value::Time(value) => Some(value.to_string()),
1165 Value::Decimal(value) => Some(value.to_string()),
1166 Value::TimestampMs(value) => Some(value.to_string()),
1167 Value::Ipv4(value) => Some(format!(
1168 "{}.{}.{}.{}",
1169 (value >> 24) & 0xFF,
1170 (value >> 16) & 0xFF,
1171 (value >> 8) & 0xFF,
1172 value & 0xFF
1173 )),
1174 Value::Port(value) => Some(value.to_string()),
1175 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1176 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1177 Value::GeoPoint(lat, lon) => Some(format!(
1178 "{},{}",
1179 *lat as f64 / 1_000_000.0,
1180 *lon as f64 / 1_000_000.0
1181 )),
1182 Value::BigInt(value) => Some(value.to_string()),
1183 Value::TableRef(value) => Some(value.clone()),
1184 Value::PageRef(value) => Some(value.to_string()),
1185 Value::Password(value) => Some(value.clone()),
1186 _ => None,
1187 }
1188}
1189
1190fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1191 if modified_columns.is_empty() {
1192 return modified_columns;
1193 }
1194
1195 let mut unique = Vec::with_capacity(modified_columns.len());
1196 for column in modified_columns.drain(..) {
1197 if !unique
1198 .iter()
1199 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1200 {
1201 unique.push(column);
1202 }
1203 }
1204 unique
1205}
1206
1207fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1208 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1209 return Err(crate::RedDBError::Query(
1210 "array positional document patch paths are unsupported; replace the array or full document body instead"
1211 .to_string(),
1212 ));
1213 }
1214 Ok(())
1215}
1216
1217fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1218 match fields.get("body") {
1219 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1220 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1221 }),
1222 Some(_) => Err(crate::RedDBError::Query(
1223 "document body field must contain JSON".to_string(),
1224 )),
1225 None => Ok(JsonValue::Object(Default::default())),
1226 }
1227}
1228
1229fn replace_document_row_body(
1230 fields: &mut HashMap<String, Value>,
1231 body: JsonValue,
1232 modified_columns: &mut Vec<String>,
1233) -> RedDBResult<()> {
1234 modified_columns.push("body".to_string());
1235
1236 let old_keys: Vec<String> = fields
1237 .keys()
1238 .filter(|key| key.as_str() != "body")
1239 .cloned()
1240 .collect();
1241 for key in old_keys {
1242 fields.remove(&key);
1243 modified_columns.push(key);
1244 }
1245
1246 let body_bytes = json_to_vec(&body).map_err(|err| {
1247 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1248 })?;
1249 fields.insert("body".to_string(), Value::Json(body_bytes));
1250
1251 if let JsonValue::Object(map) = &body {
1252 for (key, value) in map {
1253 fields.insert(key.clone(), json_to_storage_value(value)?);
1254 modified_columns.push(key.clone());
1255 }
1256 }
1257
1258 Ok(())
1259}
1260
1261impl RedDBRuntime {
1262 pub(crate) fn apply_loaded_patch_entity_core(
1263 &self,
1264 collection: String,
1265 mut entity: crate::storage::UnifiedEntity,
1266 payload: JsonValue,
1267 operations: Vec<PatchEntityOperation>,
1268 ) -> RedDBResult<AppliedEntityMutation> {
1269 let id = entity.id;
1270 let operations = normalize_ttl_patch_operations(operations)?;
1271 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1274
1275 let db = self.db();
1276 let store = db.store();
1277 let Some(manager) = store.get_collection(&collection) else {
1278 return Err(crate::RedDBError::NotFound(format!(
1279 "collection not found: {collection}"
1280 )));
1281 };
1282
1283 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1284 let mut metadata_changed = false;
1285 let mut modified_columns: Vec<String> = Vec::new();
1286 let mut context_index_dirty = false;
1287 let mut graph_node_type: Option<String> = None;
1288 let mut graph_edge_weight: Option<f32> = None;
1289
1290 let row_contract_timestamps = db
1291 .collection_contract(&collection)
1292 .map(|c| c.timestamps_enabled)
1293 .unwrap_or(false);
1294
1295 match &mut entity.data {
1296 crate::storage::EntityData::Row(row) => {
1297 let is_document_collection = db
1298 .collection_contract(&collection)
1299 .map(|contract| {
1300 contract.declared_model == crate::catalog::CollectionModel::Document
1301 })
1302 .unwrap_or(false);
1303 let mut field_ops = Vec::new();
1304 let mut metadata_ops = Vec::new();
1305 let mut document_body_ops = Vec::new();
1306
1307 for mut op in operations {
1308 let Some(root) = op.path.first().map(String::as_str) else {
1309 return Err(crate::RedDBError::Query(
1310 "patch path cannot be empty".to_string(),
1311 ));
1312 };
1313
1314 match root {
1315 "body" if is_document_collection => {
1316 if op.path.len() < 2 {
1317 return Err(crate::RedDBError::Query(
1318 "document body patch paths require a nested key; use payload.body for full replacement"
1319 .to_string(),
1320 ));
1321 }
1322 op.path.remove(0);
1323 reject_document_array_position_path(&op.path)?;
1324 document_body_ops.push(op);
1325 }
1326 "fields" | "named" => {
1327 if op.path.len() < 2 {
1328 return Err(crate::RedDBError::Query(
1329 "patch path 'fields' requires a nested key".to_string(),
1330 ));
1331 }
1332 if row_contract_timestamps {
1333 let leaf = op.path.get(1).map(String::as_str);
1334 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1335 return Err(crate::RedDBError::Query(format!(
1336 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1337 collection,
1338 leaf.unwrap_or("")
1339 )));
1340 }
1341 }
1342 op.path.remove(0);
1343 field_ops.push(op);
1344 }
1345 "metadata" => {
1346 if op.path.len() < 2 {
1347 return Err(crate::RedDBError::Query(
1348 "patch path 'metadata' requires a nested key".to_string(),
1349 ));
1350 }
1351 op.path.remove(0);
1352 metadata_ops.push(op);
1353 }
1354 _ => {
1355 return Err(crate::RedDBError::Query(format!(
1356 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1357 )));
1358 }
1359 }
1360 }
1361
1362 if !document_body_ops.is_empty() {
1363 context_index_dirty = true;
1364 let named = row.named.get_or_insert_with(Default::default);
1365 let mut body = document_body_from_named(named)?;
1366 apply_patch_operations_to_json(&mut body, &document_body_ops)
1367 .map_err(crate::RedDBError::Query)?;
1368 replace_document_row_body(named, body, &mut modified_columns)?;
1369 }
1370
1371 if is_document_collection {
1372 if let Some(body) = payload.get("body") {
1373 context_index_dirty = true;
1374 let named = row.named.get_or_insert_with(Default::default);
1375 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1376 }
1377 }
1378
1379 if !field_ops.is_empty() {
1380 context_index_dirty = true;
1381 for op in &field_ops {
1382 if let Some(col) = op.path.first() {
1383 modified_columns.push(col.clone());
1384 }
1385 }
1386 let named = row.named.get_or_insert_with(Default::default);
1387 apply_patch_operations_to_storage_map(named, &field_ops)?;
1388 }
1389
1390 if let Some(fields) = payload
1391 .get("fields")
1392 .and_then(crate::json::Value::as_object)
1393 {
1394 if row_contract_timestamps {
1395 for key in fields.keys() {
1396 if key == "created_at" || key == "updated_at" {
1397 return Err(crate::RedDBError::Query(format!(
1398 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1399 collection, key
1400 )));
1401 }
1402 }
1403 }
1404 context_index_dirty = true;
1405 let named = row.named.get_or_insert_with(Default::default);
1406 for (key, value) in fields {
1407 modified_columns.push(key.clone());
1408 named.insert(key.clone(), json_to_storage_value(value)?);
1409 }
1410 }
1411
1412 if !metadata_ops.is_empty() {
1413 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1414 let metadata = patch_metadata.get_or_insert_with(|| {
1415 store.get_metadata(&collection, id).unwrap_or_default()
1416 });
1417 let mut metadata_json = metadata_to_json(metadata);
1418 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1419 .map_err(crate::RedDBError::Query)?;
1420 *metadata = metadata_from_json(&metadata_json)?;
1421 metadata_changed = true;
1422 }
1423
1424 if !modified_columns.is_empty() || row_contract_timestamps {
1425 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1426 let current_fields = if let Some(named) = row.named.take() {
1427 named.into_iter().collect::<Vec<_>>()
1428 } else if let Some(schema) = row.schema.as_ref() {
1429 schema
1430 .iter()
1431 .cloned()
1432 .zip(row.columns.iter().cloned())
1433 .collect::<Vec<_>>()
1434 } else {
1435 Vec::new()
1436 };
1437 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1438 if row_contract_timestamps {
1439 modified_columns.push("updated_at".to_string());
1440 context_index_dirty = true;
1441 }
1442 if contract.requires_uniqueness_check(&modified_columns) {
1443 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1444 }
1445 row.named = Some(normalized_fields.into_iter().collect());
1446 }
1447 }
1448 crate::storage::EntityData::Node(node) => {
1449 let mut field_ops = Vec::new();
1450 let mut metadata_ops = Vec::new();
1451 let mut node_type_ops = Vec::new();
1452
1453 for mut op in operations {
1454 let Some(root) = op.path.first().map(String::as_str) else {
1455 return Err(crate::RedDBError::Query(
1456 "patch path cannot be empty".to_string(),
1457 ));
1458 };
1459
1460 match root {
1461 "fields" | "properties" => {
1462 if op.path.len() < 2 {
1463 return Err(crate::RedDBError::Query(
1464 "patch path 'fields' requires a nested key".to_string(),
1465 ));
1466 }
1467 op.path.remove(0);
1468 field_ops.push(op);
1469 }
1470 "metadata" => {
1471 if op.path.len() < 2 {
1472 return Err(crate::RedDBError::Query(
1473 "patch path 'metadata' requires a nested key".to_string(),
1474 ));
1475 }
1476 op.path.remove(0);
1477 metadata_ops.push(op);
1478 }
1479 "node_type" => {
1480 if op.path.len() != 1 {
1481 return Err(crate::RedDBError::Query(
1482 "patch path 'node_type' does not allow nested keys".to_string(),
1483 ));
1484 }
1485 op.path.clear();
1486 node_type_ops.push(op);
1487 }
1488 _ => {
1489 return Err(crate::RedDBError::Query(format!(
1490 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1491 )));
1492 }
1493 }
1494 }
1495
1496 for op in node_type_ops {
1497 context_index_dirty = true;
1498 let value = op.value.ok_or_else(|| {
1499 crate::RedDBError::Query("node_type operations require a value".to_string())
1500 })?;
1501
1502 match op.op {
1503 PatchEntityOperationType::Unset => {
1504 return Err(crate::RedDBError::Query(
1505 "node_type cannot be unset through patch operations".to_string(),
1506 ));
1507 }
1508 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1509 let Some(node_type) = value.as_str() else {
1510 return Err(crate::RedDBError::Query(
1511 "node_type operation requires a text value".to_string(),
1512 ));
1513 };
1514 graph_node_type = Some(node_type.to_string());
1515 modified_columns.push("node_type".to_string());
1516 }
1517 }
1518 }
1519
1520 if !field_ops.is_empty() {
1521 context_index_dirty = true;
1522 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1523 }
1524
1525 if let Some(fields) = payload
1526 .get("fields")
1527 .and_then(crate::json::Value::as_object)
1528 {
1529 context_index_dirty = true;
1530 for (key, value) in fields {
1531 node.properties
1532 .insert(key.clone(), json_to_storage_value(value)?);
1533 }
1534 }
1535
1536 if !metadata_ops.is_empty() {
1537 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1538 let metadata = patch_metadata.get_or_insert_with(|| {
1539 store.get_metadata(&collection, id).unwrap_or_default()
1540 });
1541 let mut metadata_json = metadata_to_json(metadata);
1542 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1543 .map_err(crate::RedDBError::Query)?;
1544 *metadata = metadata_from_json(&metadata_json)?;
1545 metadata_changed = true;
1546 }
1547 }
1548 crate::storage::EntityData::Edge(edge) => {
1549 let mut field_ops = Vec::new();
1550 let mut metadata_ops = Vec::new();
1551 let mut weight_ops = Vec::new();
1552
1553 for mut op in operations {
1554 let Some(root) = op.path.first().map(String::as_str) else {
1555 return Err(crate::RedDBError::Query(
1556 "patch path cannot be empty".to_string(),
1557 ));
1558 };
1559
1560 match root {
1561 "fields" | "properties" => {
1562 if op.path.len() < 2 {
1563 return Err(crate::RedDBError::Query(
1564 "patch path 'fields' requires a nested key".to_string(),
1565 ));
1566 }
1567 op.path.remove(0);
1568 field_ops.push(op);
1569 }
1570 "weight" => {
1571 if op.path.len() != 1 {
1572 return Err(crate::RedDBError::Query(
1573 "patch path 'weight' does not allow nested keys".to_string(),
1574 ));
1575 }
1576 op.path.clear();
1577 weight_ops.push(op);
1578 }
1579 "metadata" => {
1580 if op.path.len() < 2 {
1581 return Err(crate::RedDBError::Query(
1582 "patch path 'metadata' requires a nested key".to_string(),
1583 ));
1584 }
1585 op.path.remove(0);
1586 metadata_ops.push(op);
1587 }
1588 _ => {
1589 return Err(crate::RedDBError::Query(format!(
1590 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1591 )));
1592 }
1593 }
1594 }
1595
1596 if !field_ops.is_empty() {
1597 context_index_dirty = true;
1598 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1599 }
1600
1601 for op in weight_ops {
1602 context_index_dirty = true;
1603 let value = op.value.ok_or_else(|| {
1604 crate::RedDBError::Query("weight operations require a value".to_string())
1605 })?;
1606
1607 match op.op {
1608 PatchEntityOperationType::Unset => {
1609 return Err(crate::RedDBError::Query(
1610 "weight cannot be unset through patch operations".to_string(),
1611 ));
1612 }
1613 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1614 let Some(weight) = value.as_f64() else {
1615 return Err(crate::RedDBError::Query(
1616 "weight operation requires a numeric value".to_string(),
1617 ));
1618 };
1619 graph_edge_weight = Some(weight as f32);
1620 edge.weight = weight as f32;
1621 modified_columns.push("weight".to_string());
1622 }
1623 }
1624 }
1625
1626 if let Some(fields) = payload
1627 .get("fields")
1628 .and_then(crate::json::Value::as_object)
1629 {
1630 context_index_dirty = true;
1631 for (key, value) in fields {
1632 edge.properties
1633 .insert(key.clone(), json_to_storage_value(value)?);
1634 }
1635 }
1636
1637 if !metadata_ops.is_empty() {
1638 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1639 let metadata = patch_metadata.get_or_insert_with(|| {
1640 store.get_metadata(&collection, id).unwrap_or_default()
1641 });
1642 let mut metadata_json = metadata_to_json(metadata);
1643 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1644 .map_err(crate::RedDBError::Query)?;
1645 *metadata = metadata_from_json(&metadata_json)?;
1646 metadata_changed = true;
1647 }
1648 }
1649 crate::storage::EntityData::Vector(vector) => {
1650 let mut field_ops = Vec::new();
1651 let mut metadata_ops = Vec::new();
1652
1653 for mut op in operations {
1654 let Some(root) = op.path.first().map(String::as_str) else {
1655 return Err(crate::RedDBError::Query(
1656 "patch path cannot be empty".to_string(),
1657 ));
1658 };
1659
1660 match root {
1661 "fields" => {
1662 if op.path.len() < 2 {
1663 return Err(crate::RedDBError::Query(
1664 "patch path 'fields' requires a nested key".to_string(),
1665 ));
1666 }
1667 op.path.remove(0);
1668 let Some(target) = op.path.first().map(String::as_str) else {
1669 return Err(crate::RedDBError::Query(
1670 "patch path requires a target under fields".to_string(),
1671 ));
1672 };
1673 if !matches!(target, "dense" | "content" | "sparse") {
1674 return Err(crate::RedDBError::Query(format!(
1675 "unsupported vector patch target '{target}'"
1676 )));
1677 }
1678 field_ops.push(op);
1679 }
1680 "metadata" => {
1681 if op.path.len() < 2 {
1682 return Err(crate::RedDBError::Query(
1683 "patch path 'metadata' requires a nested key".to_string(),
1684 ));
1685 }
1686 op.path.remove(0);
1687 metadata_ops.push(op);
1688 }
1689 _ => {
1690 return Err(crate::RedDBError::Query(format!(
1691 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1692 )));
1693 }
1694 }
1695 }
1696
1697 if !field_ops.is_empty() {
1698 context_index_dirty = true;
1699 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1700 }
1701
1702 if let Some(fields) = payload
1703 .get("fields")
1704 .and_then(crate::json::Value::as_object)
1705 {
1706 context_index_dirty = true;
1707 if let Some(content) =
1708 fields.get("content").and_then(crate::json::Value::as_str)
1709 {
1710 vector.content = Some(content.to_string());
1711 }
1712 if let Some(dense) = fields.get("dense") {
1713 vector.dense = dense
1714 .as_array()
1715 .ok_or_else(|| {
1716 crate::RedDBError::Query(
1717 "field 'dense' must be an array".to_string(),
1718 )
1719 })?
1720 .iter()
1721 .map(|value| {
1722 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1723 crate::RedDBError::Query(
1724 "field 'dense' must contain only numbers".to_string(),
1725 )
1726 })
1727 })
1728 .collect::<Result<Vec<_>, _>>()?;
1729 }
1730 }
1731
1732 if !metadata_ops.is_empty() {
1733 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1734 let metadata = patch_metadata.get_or_insert_with(|| {
1735 store.get_metadata(&collection, id).unwrap_or_default()
1736 });
1737 let mut metadata_json = metadata_to_json(metadata);
1738 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1739 .map_err(crate::RedDBError::Query)?;
1740 *metadata = metadata_from_json(&metadata_json)?;
1741 metadata_changed = true;
1742 }
1743 }
1744 crate::storage::EntityData::TimeSeries(_)
1745 | crate::storage::EntityData::QueueMessage(_) => {
1746 return Err(crate::RedDBError::Query(
1747 "patch operations are not supported for TimeSeries or QueueMessage entities"
1748 .to_string(),
1749 ));
1750 }
1751 }
1752
1753 if let Some(node_type) = graph_node_type {
1754 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1755 node.node_type = node_type;
1756 }
1757 }
1758 if let Some(weight) = graph_edge_weight {
1759 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1760 edge.weight = (weight * 1000.0) as u32;
1761 }
1762 }
1763
1764 if let Some(metadata) = payload
1765 .get("metadata")
1766 .and_then(crate::json::Value::as_object)
1767 {
1768 let patch_metadata = patch_metadata
1769 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1770 for (key, value) in metadata {
1771 ensure_non_tree_reserved_metadata_key(key)?;
1772 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1773 }
1774 metadata_changed = true;
1775 }
1776
1777 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1778 let patch_metadata = patch_metadata
1779 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1780 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1781 patch_metadata.remove(&key);
1782 } else {
1783 patch_metadata.set(key, value);
1784 }
1785 metadata_changed = true;
1786 }
1787
1788 entity.updated_at = std::time::SystemTime::now()
1789 .duration_since(std::time::UNIX_EPOCH)
1790 .unwrap_or_default()
1791 .as_secs();
1792
1793 modified_columns = dedupe_modified_columns(modified_columns);
1794
1795 Ok(AppliedEntityMutation {
1796 id,
1797 collection,
1798 entity,
1799 metadata: patch_metadata,
1800 modified_columns,
1801 persist_metadata: metadata_changed,
1802 context_index_dirty,
1803 replaced_entity: None,
1804 replaced_entity_previous_xmax: 0,
1805 pre_mutation_fields,
1806 })
1807 }
1808
1809 pub(crate) fn apply_loaded_sql_update_row_core(
1810 &self,
1811 collection: String,
1812 mut entity: crate::storage::UnifiedEntity,
1813 static_field_assignments: &[(String, Value)],
1814 dynamic_field_assignments: Vec<(String, Value)>,
1815 static_metadata_assignments: &[(String, MetadataValue)],
1816 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1817 row_contract_plan: Option<&RowUpdateContractPlan>,
1818 row_modified_columns_template: &[String],
1819 row_touches_unique_columns: bool,
1820 ) -> RedDBResult<AppliedEntityMutation> {
1821 let id = entity.id;
1822 let previous_xmax = entity.xmax;
1823 let db = self.db();
1824 let store = db.store();
1825 let Some(_) = store.get_collection(&collection) else {
1826 return Err(crate::RedDBError::NotFound(format!(
1827 "collection not found: {collection}"
1828 )));
1829 };
1830
1831 let versioned_update_xid = match self.current_xid() {
1832 Some(xid) => Some(xid),
1833 None => {
1834 let snapshot_manager = self.snapshot_manager();
1835 let xid = snapshot_manager.begin();
1836 snapshot_manager.commit(xid);
1837 Some(xid)
1838 }
1839 };
1840 let mut replaced_entity = versioned_update_xid.map(|xid| {
1841 let mut old = entity.clone();
1842 old.set_xmax(xid);
1843 old
1844 });
1845
1846 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1847 let row_contract_timestamps = row_contract_plan
1848 .map(|plan| plan.timestamps_enabled)
1849 .unwrap_or(false);
1850 let mut metadata_changed = false;
1851 let mut modified_columns = row_modified_columns_template.to_vec();
1852 let mut context_index_dirty = !modified_columns.is_empty();
1853
1854 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1858
1859 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1860 return Err(crate::RedDBError::Query(
1861 "SQL row update fast path requires a row entity".to_string(),
1862 ));
1863 };
1864
1865 let _ = row_contract_plan;
1866 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1867 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1868
1869 for (key, value) in static_metadata_assignments
1870 .iter()
1871 .cloned()
1872 .chain(dynamic_metadata_assignments)
1873 {
1874 ensure_non_tree_reserved_metadata_key(&key)?;
1875 patch_metadata
1876 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1877 .set(key, value);
1878 metadata_changed = true;
1879 }
1880
1881 if !modified_columns.is_empty() || row_contract_timestamps {
1882 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1883 if row_contract_timestamps {
1884 context_index_dirty = true;
1885 set_row_field(
1886 row,
1887 "updated_at",
1888 Value::UnsignedInteger(current_unix_ms_u64()),
1889 );
1890 modified_columns.push("updated_at".to_string());
1891 }
1892 if row_touches_unique_columns {
1893 let current_fields = collect_row_fields(row);
1894 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1895 }
1896 }
1897
1898 entity.updated_at = std::time::SystemTime::now()
1899 .duration_since(std::time::UNIX_EPOCH)
1900 .unwrap_or_default()
1901 .as_secs();
1902
1903 if let Some(xid) = versioned_update_xid {
1904 let logical_id = entity.logical_id();
1905 entity.id = store.next_entity_id();
1906 entity.set_logical_id(logical_id);
1907 entity.set_xmin(xid);
1908 entity.set_xmax(0);
1909 if let Some(old) = replaced_entity.as_mut() {
1910 old.set_xmax(xid);
1911 }
1912 }
1913
1914 modified_columns = dedupe_modified_columns(modified_columns);
1915
1916 Ok(AppliedEntityMutation {
1917 id: entity.id,
1918 collection,
1919 entity,
1920 metadata: patch_metadata,
1921 modified_columns,
1922 persist_metadata: metadata_changed,
1923 context_index_dirty,
1924 replaced_entity,
1925 replaced_entity_previous_xmax: previous_xmax,
1926 pre_mutation_fields,
1927 })
1928 }
1929
1930 pub(crate) fn persist_applied_entity_mutations(
1931 &self,
1932 applied: &[AppliedEntityMutation],
1933 ) -> RedDBResult<()> {
1934 if applied.is_empty() {
1935 return Ok(());
1936 }
1937
1938 let store = self.db().store();
1939 let collection = &applied[0].collection;
1940 let Some(manager) = store.get_collection(collection) else {
1941 return Err(crate::RedDBError::NotFound(format!(
1942 "collection not found: {collection}"
1943 )));
1944 };
1945
1946 let mut ordinary = Vec::with_capacity(applied.len());
1947 for item in applied {
1948 if let Some(old_version) = item.replaced_entity.as_ref() {
1949 store
1950 .install_versioned_table_row_update(
1951 collection,
1952 old_version.clone(),
1953 item.entity.clone(),
1954 if item.persist_metadata {
1955 item.metadata.as_ref()
1956 } else {
1957 None
1958 },
1959 )
1960 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1961 if self.current_xid().is_some() {
1962 self.record_pending_versioned_update(
1963 crate::runtime::impl_core::current_connection_id(),
1964 collection,
1965 old_version.id,
1966 item.entity.id,
1967 old_version.xmax,
1968 item.replaced_entity_previous_xmax,
1969 );
1970 }
1971 } else {
1972 ordinary.push(item);
1973 }
1974 }
1975 if ordinary.is_empty() {
1976 return Ok(());
1977 }
1978
1979 manager
1980 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1981 (
1982 &item.entity,
1983 item.modified_columns.as_slice(),
1984 if item.persist_metadata {
1985 item.metadata.as_ref()
1986 } else {
1987 None
1988 },
1989 )
1990 }))
1991 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1992
1993 let indexed_cols = self
2002 .index_store_ref()
2003 .indexed_columns_set(collection.as_str());
2004 let all_hot = !indexed_cols.is_empty()
2005 && ordinary.iter().all(|item| {
2006 !item.persist_metadata
2007 && !item
2008 .modified_columns
2009 .iter()
2010 .any(|c| indexed_cols.contains(c))
2011 })
2012 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2013
2014 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2018 ordinary.iter().map(|item| &item.entity).collect();
2019 let persist_fn = if all_hot {
2020 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2021 } else {
2022 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2023 };
2024 persist_fn(store.as_ref(), collection, &entity_refs)
2025 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2026 }
2027
2028 pub(crate) fn flush_applied_entity_mutation(
2029 &self,
2030 applied: &AppliedEntityMutation,
2031 ) -> RedDBResult<()> {
2032 let store = self.db().store();
2033 if applied.context_index_dirty {
2034 store
2035 .context_index()
2036 .index_entity(&applied.collection, &applied.entity);
2037 }
2038 let mut changed_columns: Option<Vec<String>> = None;
2046 if !applied.pre_mutation_fields.is_empty() {
2047 let post = entity_row_fields_snapshot(&applied.entity);
2048 if !post.is_empty() {
2049 let damage = crate::application::entity::row_damage_vector(
2050 &applied.pre_mutation_fields,
2051 &post,
2052 );
2053 if !damage.is_empty() {
2054 changed_columns = Some(
2055 damage
2056 .touched_columns()
2057 .into_iter()
2058 .map(str::to_string)
2059 .collect(),
2060 );
2061 }
2062
2063 let indexed_cols: std::collections::HashSet<String> = self
2072 .index_store_ref()
2073 .list_indices(applied.collection.as_str())
2074 .into_iter()
2075 .filter_map(|idx| idx.columns.first().cloned())
2076 .collect();
2077 let modified_cols: std::collections::HashSet<String> = damage
2078 .touched_columns()
2079 .into_iter()
2080 .map(str::to_string)
2081 .collect();
2082 if let Some(old_version) = applied.replaced_entity.as_ref() {
2083 let old_index_fields: Vec<(String, Value)> = applied
2084 .pre_mutation_fields
2085 .iter()
2086 .filter(|(col, _)| indexed_cols.contains(col))
2087 .cloned()
2088 .collect();
2089 let new_index_fields: Vec<(String, Value)> = post
2090 .iter()
2091 .filter(|(col, _)| indexed_cols.contains(col))
2092 .cloned()
2093 .collect();
2094 if !old_index_fields.is_empty() {
2095 self.index_store_ref()
2096 .index_entity_delete(
2097 &applied.collection,
2098 old_version.id,
2099 &old_index_fields,
2100 )
2101 .map_err(crate::RedDBError::Internal)?;
2102 }
2103 if !new_index_fields.is_empty() {
2104 self.index_store_ref()
2105 .index_entity_insert(
2106 &applied.collection,
2107 applied.entity.id,
2108 &new_index_fields,
2109 )
2110 .map_err(crate::RedDBError::Internal)?;
2111 }
2112 } else {
2113 let decision = crate::storage::engine::hot_update::decide(
2114 &crate::storage::engine::hot_update::HotUpdateInputs {
2115 collection: applied.collection.as_str(),
2116 indexed_columns: &indexed_cols,
2117 modified_columns: &modified_cols,
2118 new_tuple_size: 0,
2122 page_free_space: usize::MAX,
2123 },
2124 );
2125 if !decision.can_hot {
2126 self.index_store_ref()
2127 .index_entity_update(
2128 &applied.collection,
2129 applied.id,
2130 &applied.pre_mutation_fields,
2131 &post,
2132 )
2133 .map_err(crate::RedDBError::Internal)?;
2134 } else {
2135 tracing::debug!(
2139 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2140 "hot_update fast-path: skipped index_entity_update"
2141 );
2142 }
2143 }
2144 }
2145 }
2146 self.cdc_emit_prebuilt_with_columns(
2147 crate::replication::cdc::ChangeOperation::Update,
2148 &applied.collection,
2149 &applied.entity,
2150 "entity",
2151 applied.metadata.as_ref(),
2152 true,
2153 changed_columns,
2154 );
2155 Ok(())
2156 }
2157
2158 pub(crate) fn apply_loaded_patch_entity(
2159 &self,
2160 collection: String,
2161 entity: crate::storage::UnifiedEntity,
2162 payload: JsonValue,
2163 operations: Vec<PatchEntityOperation>,
2164 ) -> RedDBResult<CreateEntityOutput> {
2165 let applied =
2166 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2167 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2168 self.flush_applied_entity_mutation(&applied)?;
2169 Ok(CreateEntityOutput {
2170 id: applied.id,
2171 entity: Some(applied.entity),
2172 })
2173 }
2174}
2175
2176fn ensure_non_tree_reserved_metadata_patch_paths(
2177 operations: &[PatchEntityOperation],
2178) -> RedDBResult<()> {
2179 for operation in operations {
2180 let Some(key) = operation.path.first().map(String::as_str) else {
2181 continue;
2182 };
2183 ensure_non_tree_reserved_metadata_key(key)?;
2184 }
2185 Ok(())
2186}
2187
2188fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2189 if key.starts_with(TREE_METADATA_PREFIX) {
2190 return Err(crate::RedDBError::Query(format!(
2191 "metadata key '{}' is reserved for managed trees",
2192 key
2193 )));
2194 }
2195 Ok(())
2196}
2197
2198fn ensure_non_tree_reserved_metadata_entries(
2199 metadata: &[(String, MetadataValue)],
2200) -> RedDBResult<()> {
2201 for (key, _) in metadata {
2202 ensure_non_tree_reserved_metadata_key(key)?;
2203 }
2204 Ok(())
2205}
2206
2207fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2208 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2209 return Err(crate::RedDBError::Query(format!(
2210 "edge label '{}' is reserved for managed trees",
2211 TREE_CHILD_EDGE_LABEL
2212 )));
2213 }
2214 Ok(())
2215}
2216
2217impl RedDBRuntime {
2218 pub(crate) fn create_node_unchecked(
2219 &self,
2220 input: CreateNodeInput,
2221 ) -> RedDBResult<CreateEntityOutput> {
2222 let db = self.db();
2223 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2224 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2225 let mut metadata = input.metadata;
2226 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2227 let mut builder = db.node(&input.collection, &input.label);
2228
2229 if let Some(node_type) = input.node_type {
2230 builder = builder.node_type(node_type);
2231 }
2232
2233 for (key, value) in input.properties {
2234 builder = builder.property(key, value);
2235 }
2236
2237 for (key, value) in metadata {
2238 builder = builder.metadata(key, value);
2239 }
2240
2241 for embedding in input.embeddings {
2242 if let Some(model) = embedding.model {
2243 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2244 } else {
2245 builder = builder.embedding(embedding.name, embedding.vector);
2246 }
2247 }
2248
2249 for link in input.table_links {
2250 builder = builder.link_to_table(link.key, link.table);
2251 }
2252
2253 for link in input.node_links {
2254 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2255 }
2256
2257 let id = builder.save()?;
2258 self.stamp_xmin_if_in_txn(&input.collection, id);
2261 refresh_context_index(&db, &input.collection, id)?;
2262 self.cdc_emit(
2263 crate::replication::cdc::ChangeOperation::Insert,
2264 &input.collection,
2265 id.raw(),
2266 "graph_node",
2267 );
2268 Ok(CreateEntityOutput {
2269 id,
2270 entity: db.store().get(&input.collection, id),
2271 })
2272 }
2273
2274 pub(crate) fn create_edge_unchecked(
2275 &self,
2276 input: CreateEdgeInput,
2277 ) -> RedDBResult<CreateEntityOutput> {
2278 let db = self.db();
2279 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2280 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2281 let mut metadata = input.metadata;
2282 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2283 let mut builder = db
2284 .edge(&input.collection, &input.label)
2285 .from(input.from)
2286 .to(input.to);
2287
2288 if let Some(weight) = input.weight {
2289 builder = builder.weight(weight);
2290 }
2291
2292 for (key, value) in input.properties {
2293 builder = builder.property(key, value);
2294 }
2295
2296 for (key, value) in metadata {
2297 builder = builder.metadata(key, value);
2298 }
2299
2300 let id = builder.save()?;
2301 self.stamp_xmin_if_in_txn(&input.collection, id);
2304 refresh_context_index(&db, &input.collection, id)?;
2305 self.cdc_emit(
2306 crate::replication::cdc::ChangeOperation::Insert,
2307 &input.collection,
2308 id.raw(),
2309 "graph_edge",
2310 );
2311 Ok(CreateEntityOutput {
2312 id,
2313 entity: db.store().get(&input.collection, id),
2314 })
2315 }
2316}
2317
2318fn create_rows_batch_prevalidated_columnar_with_outputs(
2319 runtime: &RedDBRuntime,
2320 collection: String,
2321 column_names: std::sync::Arc<Vec<String>>,
2322 rows: Vec<Vec<crate::storage::schema::Value>>,
2323) -> RedDBResult<Vec<CreateEntityOutput>> {
2324 use crate::storage::{
2325 unified::{EntityData, EntityKind, RowData},
2326 EntityId, UnifiedEntity,
2327 };
2328 use std::sync::Arc;
2329
2330 if rows.is_empty() {
2331 return Ok(Vec::new());
2332 }
2333 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2334 runtime.check_batch_size(rows.len())?;
2335 runtime.check_db_size()?;
2336
2337 let db = runtime.db();
2338 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2339 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2340
2341 let store = db.store();
2342 let table_arc: Arc<str> = Arc::from(collection.as_str());
2343
2344 let indexed_cols = runtime
2345 .index_store_ref()
2346 .indexed_columns_set(collection.as_str());
2347 let has_secondary_indexes = !indexed_cols.is_empty();
2348 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2349 if has_secondary_indexes {
2350 Vec::with_capacity(rows.len())
2351 } else {
2352 Vec::new()
2353 };
2354
2355 let entities: Vec<UnifiedEntity> = rows
2356 .into_iter()
2357 .map(|values| {
2358 if has_secondary_indexes {
2359 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2360 Vec::with_capacity(indexed_cols.len());
2361 for (name, val) in column_names.iter().zip(values.iter()) {
2362 if indexed_cols.contains(name) {
2363 snap.push((name.clone(), val.clone()));
2364 }
2365 }
2366 field_snapshots.push(snap);
2367 }
2368 let mut row = RowData::new(values);
2369 row.schema = Some(Arc::clone(&column_names));
2370 UnifiedEntity::new(
2371 EntityId::new(0),
2372 EntityKind::TableRow {
2373 table: Arc::clone(&table_arc),
2374 row_id: 0,
2375 },
2376 EntityData::Row(row),
2377 )
2378 })
2379 .collect();
2380
2381 let ids = store
2382 .bulk_insert(&collection, entities)
2383 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2384
2385 if has_secondary_indexes {
2386 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2387 .iter()
2388 .zip(field_snapshots)
2389 .map(|(id, fields)| (*id, fields))
2390 .collect();
2391 runtime
2392 .index_store_ref()
2393 .index_entity_insert_batch(&collection, &index_rows)
2394 .map_err(crate::RedDBError::Internal)?;
2395 }
2396
2397 runtime.invalidate_result_cache();
2398 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2399
2400 Ok(ids
2401 .into_iter()
2402 .map(|id| CreateEntityOutput { id, entity: None })
2403 .collect())
2404}
2405
2406impl RuntimeEntityPort for RedDBRuntime {
2407 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2408 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2409 let db = self.db();
2410 let CreateRowInput {
2411 collection,
2412 fields,
2413 metadata: input_metadata,
2414 node_links,
2415 vector_links,
2416 } = input;
2417 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2418 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2419 let mut metadata = input_metadata;
2420 apply_collection_default_ttl(&db, &collection, &mut metadata);
2421 let fields = contract.normalize_insert_fields(fields)?;
2422 contract.enforce_row_uniqueness(&fields, None)?;
2423 let engine = self.mutation_engine();
2425 let result = engine.apply(
2426 collection.clone(),
2427 vec![crate::runtime::mutation::MutationRow {
2428 fields,
2429 metadata,
2430 node_links,
2431 vector_links,
2432 }],
2433 )?;
2434 let id = result.ids[0];
2435 Ok(CreateEntityOutput {
2440 id,
2441 entity: db.store().get(&collection, id),
2442 })
2443 }
2444
2445 fn create_rows_batch(
2446 &self,
2447 input: CreateRowsBatchInput,
2448 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2449 if input.rows.is_empty() {
2450 return Ok(Vec::new());
2451 }
2452 self.check_batch_size(input.rows.len())?;
2453 self.check_db_size()?;
2454
2455 let db = self.db();
2456 let collection = input.collection;
2457 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2458 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2459
2460 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2461 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2462 for row in input.rows {
2463 if row.collection != collection {
2464 return Err(crate::RedDBError::Query(format!(
2465 "batch row collection mismatch: expected '{}', got '{}'",
2466 collection, row.collection
2467 )));
2468 }
2469
2470 let mut metadata = row.metadata;
2471 apply_collection_default_ttl(&db, &collection, &mut metadata);
2472 let fields = contract.normalize_insert_fields(row.fields)?;
2473 contract.enforce_row_uniqueness(&fields, None)?;
2474 uniqueness_rows.push(fields.clone());
2475 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2476 }
2477
2478 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2479
2480 let engine = {
2483 let e = self.mutation_engine();
2484 if input.suppress_events {
2485 e.with_suppress_events()
2486 } else {
2487 e
2488 }
2489 };
2490 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2491 .into_iter()
2492 .map(|(fields, metadata, node_links, vector_links)| {
2493 crate::runtime::mutation::MutationRow {
2494 fields,
2495 metadata,
2496 node_links,
2497 vector_links,
2498 }
2499 })
2500 .collect();
2501
2502 let result = engine
2503 .apply(collection.clone(), mutation_rows)
2504 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2505
2506 let store = db.store();
2507 Ok(result
2508 .ids
2509 .into_iter()
2510 .map(|id| CreateEntityOutput {
2511 id,
2512 entity: store.get(&collection, id),
2513 })
2514 .collect())
2515 }
2516
2517 fn create_rows_batch_prevalidated_columnar(
2518 &self,
2519 collection: String,
2520 column_names: std::sync::Arc<Vec<String>>,
2521 rows: Vec<Vec<crate::storage::schema::Value>>,
2522 ) -> RedDBResult<usize> {
2523 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2524 .map(|outputs| outputs.len())
2525 }
2526
2527 fn create_rows_batch_columnar(
2528 &self,
2529 collection: String,
2530 column_names: std::sync::Arc<Vec<String>>,
2531 rows: Vec<Vec<crate::storage::schema::Value>>,
2532 ) -> RedDBResult<usize> {
2533 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2534 .map(|outputs| outputs.len())
2535 }
2536
2537 fn create_rows_batch_columnar_with_outputs(
2538 &self,
2539 collection: String,
2540 column_names: std::sync::Arc<Vec<String>>,
2541 rows: Vec<Vec<crate::storage::schema::Value>>,
2542 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2543 if rows.is_empty() {
2544 return Ok(Vec::new());
2545 }
2546 self.check_batch_size(rows.len())?;
2547 self.check_db_size()?;
2548
2549 let db = self.db();
2550 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2551 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2552
2553 let needs_normalisation = match db.collection_contract(&collection) {
2563 Some(c) => {
2564 c.declared_model == crate::catalog::CollectionModel::Table
2565 && (!c.declared_columns.is_empty()
2566 || c.table_def
2567 .as_ref()
2568 .map(|t| !t.columns.is_empty())
2569 .unwrap_or(false))
2570 }
2571 None => false,
2572 };
2573 if !needs_normalisation {
2574 return create_rows_batch_prevalidated_columnar_with_outputs(
2575 self,
2576 collection,
2577 column_names,
2578 rows,
2579 );
2580 }
2581
2582 let ncols = column_names.len();
2589 let tuple_rows: Vec<CreateRowInput> = rows
2590 .into_iter()
2591 .map(|values| {
2592 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2593 Vec::with_capacity(ncols);
2594 for (name, value) in column_names.iter().zip(values) {
2595 fields.push((name.clone(), value));
2596 }
2597 CreateRowInput {
2598 collection: collection.clone(),
2599 fields,
2600 metadata: Vec::new(),
2601 node_links: Vec::new(),
2602 vector_links: Vec::new(),
2603 }
2604 })
2605 .collect();
2606 self.create_rows_batch(CreateRowsBatchInput {
2607 collection,
2608 rows: tuple_rows,
2609 suppress_events: false,
2610 })
2611 }
2612
2613 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2614 if input.rows.is_empty() {
2615 return Ok(0);
2616 }
2617 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2618 self.check_batch_size(input.rows.len())?;
2619 self.check_db_size()?;
2620
2621 let db = self.db();
2622 let collection = input.collection;
2623 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2628 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2629
2630 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2636
2637 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2638 Vec::with_capacity(input.rows.len());
2639 let mut mutation_rows = mutation_rows;
2640 for row in input.rows {
2641 if row.collection != collection {
2642 return Err(crate::RedDBError::Query(format!(
2643 "batch row collection mismatch: expected '{}', got '{}'",
2644 collection, row.collection
2645 )));
2646 }
2647 let mut metadata = row.metadata;
2648 if let Some(ttl) = default_ttl_ms {
2649 if !has_internal_ttl_metadata(&metadata) {
2650 metadata.push((
2651 "_ttl_ms".to_string(),
2652 if ttl <= i64::MAX as u64 {
2653 MetadataValue::Int(ttl as i64)
2654 } else {
2655 MetadataValue::Timestamp(ttl)
2656 },
2657 ));
2658 }
2659 }
2660 mutation_rows.push(crate::runtime::mutation::MutationRow {
2661 fields: row.fields,
2662 metadata,
2663 node_links: row.node_links,
2664 vector_links: row.vector_links,
2665 });
2666 }
2667
2668 let engine = self.mutation_engine();
2669 let result = engine
2670 .apply(collection, mutation_rows)
2671 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2672 Ok(result.ids.len())
2673 }
2674
2675 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2676 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2677 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2678 input.properties.iter().map(|(key, _)| key.as_str()),
2679 &format!("node '{}'", input.collection),
2680 )?;
2681 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2682 self.create_node_unchecked(input)
2683 }
2684
2685 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2686 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2687 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2688 input.properties.iter().map(|(key, _)| key.as_str()),
2689 &format!("edge '{}'", input.collection),
2690 )?;
2691 ensure_non_tree_structural_edge_label(&input.label)?;
2692 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2693 self.create_edge_unchecked(input)
2694 }
2695
2696 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2697 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2698 let db = self.db();
2699 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2700 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2701 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2702 let mut metadata = input.metadata;
2703 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2704 let mut builder = db.vector(&input.collection).dense(input.dense);
2705
2706 if let Some(content) = input.content {
2707 builder = builder.content(content);
2708 }
2709
2710 for (key, value) in metadata {
2711 builder = builder.metadata(key, value);
2712 }
2713
2714 if let Some(link_row) = input.link_row {
2715 builder = builder.link_to_table(link_row);
2716 }
2717
2718 if let Some(link_node) = input.link_node {
2719 builder = builder.link_to_node(link_node);
2720 }
2721
2722 let id = builder.save()?;
2723 let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2724 Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2725 _ => None,
2726 };
2727 self.stamp_xmin_if_in_txn(&input.collection, id);
2730 refresh_context_index(&db, &input.collection, id)?;
2731 if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2737 state.ensure_populated(&db.store(), &input.collection);
2741 use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2743 fire(InjectionPoint::BeforeWalFsync);
2744 let _ = db
2745 .store()
2746 .append_vector_insert_record(&input.collection, id.raw(), &dense);
2747 fire(InjectionPoint::BeforeIndexCommit);
2748 let mut index = state.index.lock();
2749 index.insert(id, dense.clone());
2750 fire(InjectionPoint::BeforeExtentFsync);
2757 if let Some(extent) = state.extent.lock().as_mut() {
2758 let packed = index.encode_for_extent(&dense);
2759 let _ = extent.append(&packed);
2760 }
2761 }
2762 self.cdc_emit(
2763 crate::replication::cdc::ChangeOperation::Insert,
2764 &input.collection,
2765 id.raw(),
2766 "vector",
2767 );
2768 Ok(CreateEntityOutput {
2769 id,
2770 entity: db.store().get(&input.collection, id),
2771 })
2772 }
2773
2774 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2775 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2776 let db = self.db();
2777 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2778 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2779
2780 if let JsonValue::Object(ref map) = input.body {
2781 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2782 map.keys().map(String::as_str),
2783 &format!("document '{}'", input.collection),
2784 )?;
2785 }
2786
2787 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2789 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2790 })?;
2791 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2792 "body".to_string(),
2793 crate::storage::schema::Value::Json(body_bytes),
2794 )];
2795
2796 if let JsonValue::Object(ref map) = input.body {
2798 for (key, value) in map {
2799 let storage_value = json_to_storage_value(value)?;
2800 fields.push((key.clone(), storage_value));
2801 }
2802 }
2803
2804 let mut metadata = input.metadata;
2805 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2806 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2807 .iter()
2808 .map(|(key, value)| (key.as_str(), value.clone()))
2809 .collect();
2810 let mut builder = db.row(&input.collection, columns);
2811
2812 for (key, value) in metadata {
2813 builder = builder.metadata(key, value);
2814 }
2815
2816 for node in input.node_links {
2817 builder = builder.link_to_node(node);
2818 }
2819
2820 for vector in input.vector_links {
2821 builder = builder.link_to_vector(vector);
2822 }
2823
2824 let id = builder.save()?;
2825 self.stamp_xmin_if_in_txn(&input.collection, id);
2827 refresh_context_index(&db, &input.collection, id)?;
2828 self.cdc_emit(
2829 crate::replication::cdc::ChangeOperation::Insert,
2830 &input.collection,
2831 id.raw(),
2832 "document",
2833 );
2834 Ok(CreateEntityOutput {
2835 id,
2836 entity: db.store().get(&input.collection, id),
2837 })
2838 }
2839
2840 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2841 let db = self.db();
2842 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2843 let declared_model = db
2844 .collection_contract(&input.collection)
2845 .map(|contract| contract.declared_model);
2846 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2847 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2848 self.seal_vault_value(&input.collection, input.value)?
2849 } else {
2850 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2851 input.value
2852 };
2853 let fields = vec![
2854 (
2855 "key".to_string(),
2856 crate::storage::schema::Value::text(input.key),
2857 ),
2858 ("value".to_string(), value),
2859 ];
2860 let collection = input.collection;
2861 let result = self.mutation_engine().apply(
2862 collection.clone(),
2863 vec![crate::runtime::mutation::MutationRow {
2864 fields,
2865 metadata: input.metadata,
2866 node_links: Vec::new(),
2867 vector_links: Vec::new(),
2868 }],
2869 )?;
2870 let id = result.ids[0];
2871 Ok(CreateEntityOutput {
2872 id,
2873 entity: db.store().get(&collection, id),
2874 })
2875 }
2876
2877 fn create_timeseries_point(
2878 &self,
2879 input: CreateTimeSeriesPointInput,
2880 ) -> RedDBResult<CreateEntityOutput> {
2881 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2882 let db = self.db();
2883 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2884 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2885
2886 let mut fields = vec![
2887 (
2888 "metric".to_string(),
2889 crate::storage::schema::Value::text(input.metric),
2890 ),
2891 (
2892 "value".to_string(),
2893 crate::storage::schema::Value::Float(input.value),
2894 ),
2895 ];
2896
2897 if let Some(timestamp_ns) = input.timestamp_ns {
2898 fields.push((
2899 "timestamp".to_string(),
2900 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2901 ));
2902 }
2903
2904 if !input.tags.is_empty() {
2905 let tags_json = JsonValue::Object(
2906 input
2907 .tags
2908 .into_iter()
2909 .map(|(key, value)| (key, JsonValue::String(value)))
2910 .collect(),
2911 );
2912 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2913 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2914 })?;
2915 fields.push((
2916 "tags".to_string(),
2917 crate::storage::schema::Value::Json(tags_bytes),
2918 ));
2919 }
2920
2921 let collection = input.collection;
2922 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2923 self.stamp_xmin_if_in_txn(&collection, id);
2926 refresh_context_index(&db, &collection, id)?;
2927
2928 Ok(CreateEntityOutput {
2929 id,
2930 entity: db.store().get(&collection, id),
2931 })
2932 }
2933
2934 fn get_kv(
2935 &self,
2936 collection: &str,
2937 key: &str,
2938 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2939 let db = self.db();
2940 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2941 let store = db.store();
2942 let Some(manager) = store.get_collection(collection) else {
2943 return Ok(None);
2944 };
2945 let entities = manager.query_all(|_| true);
2946 for entity in entities {
2947 if let crate::storage::EntityData::Row(ref row) = entity.data {
2948 if let Some(ref named) = row.named {
2949 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2950 if &**k == key {
2951 let value = named
2952 .get("value")
2953 .cloned()
2954 .unwrap_or(crate::storage::schema::Value::Null);
2955 return Ok(Some((value, entity.id)));
2956 }
2957 }
2958 }
2959 }
2960 }
2961 Ok(None)
2962 }
2963
2964 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2965 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2966 let found = self.get_kv(collection, key)?;
2967 if let Some((_, id)) = found {
2968 let db = self.db();
2969 let store = db.store();
2970 let deleted = store
2971 .delete(collection, id)
2972 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2973 if deleted {
2974 store.context_index().remove_entity(id);
2975 }
2976 Ok(deleted)
2977 } else {
2978 Ok(false)
2979 }
2980 }
2981
2982 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2983 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2984 let PatchEntityInput {
2985 collection,
2986 id,
2987 payload,
2988 operations,
2989 } = input;
2990 let db = self.db();
2991 let store = db.store();
2992 let Some(manager) = store.get_collection(&collection) else {
2993 return Err(crate::RedDBError::NotFound(format!(
2994 "collection not found: {collection}"
2995 )));
2996 };
2997 let Some(entity) = manager.get(id) else {
2998 return Err(crate::RedDBError::NotFound(format!(
2999 "entity not found: {}",
3000 id.raw()
3001 )));
3002 };
3003 self.apply_loaded_patch_entity(collection, entity, payload, operations)
3004 }
3005
3006 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3007 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3008 let store = self.db().store();
3009 let pre_delete_fields = store
3013 .get(&input.collection, input.id)
3014 .as_ref()
3015 .map(entity_row_fields_snapshot)
3016 .unwrap_or_default();
3017 let deleted = store
3021 .delete(&input.collection, input.id)
3022 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3023 if deleted {
3024 store.context_index().remove_entity(input.id);
3025 if !pre_delete_fields.is_empty() {
3028 self.index_store_ref()
3029 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3030 .map_err(crate::RedDBError::Internal)?;
3031 }
3032 self.cdc_emit(
3033 crate::replication::cdc::ChangeOperation::Delete,
3034 &input.collection,
3035 input.id.raw(),
3036 "entity",
3037 );
3038 }
3039 Ok(DeleteEntityOutput {
3040 deleted,
3041 id: input.id,
3042 })
3043 }
3044}