1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 if requested_model == crate::catalog::CollectionModel::Vector
100 && contract
101 .ai_policy
102 .as_ref()
103 .is_some_and(|policy| policy.embed.is_some() || policy.vision.is_some())
104 {
105 return Ok(());
106 }
107 return Err(crate::RedDBError::InvalidOperation(format!(
108 "collection '{}' is declared as '{}' and does not allow '{}' writes",
109 collection,
110 collection_model_name(contract.declared_model),
111 collection_model_name(requested_model)
112 )));
113 }
114
115 let now = implicit_contract_unix_ms();
116 db.save_collection_contract(crate::physical::CollectionContract {
117 name: collection.to_string(),
118 declared_model: requested_model,
119 schema_mode: crate::catalog::SchemaMode::Dynamic,
120 origin: crate::physical::ContractOrigin::Implicit,
121 version: 1,
122 created_at_unix_ms: now,
123 updated_at_unix_ms: now,
124 default_ttl_ms: db.collection_default_ttl_ms(collection),
125 vector_dimension: None,
126 vector_metric: None,
127 context_index_fields: Vec::new(),
128 declared_columns: Vec::new(),
129 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
130 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
131 timestamps_enabled: false,
132 context_index_enabled: false,
133 metrics_raw_retention_ms: None,
134 metrics_rollup_policies: Vec::new(),
135 metrics_tenant_identity: None,
136 metrics_namespace: None,
137 append_only: false,
140 subscriptions: Vec::new(),
141 analytics_config: Vec::new(),
142 session_key: None,
143 session_gap_ms: None,
144 retention_duration_ms: None,
145 analytical_storage: None,
146
147 ai_policy: None,
148 })
149 .map(|_| ())
150 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
151}
152
153fn implicit_contract_unix_ms() -> u128 {
154 std::time::SystemTime::now()
155 .duration_since(std::time::UNIX_EPOCH)
156 .unwrap_or_default()
157 .as_millis()
158}
159
160fn collection_model_allows(
161 declared_model: crate::catalog::CollectionModel,
162 requested_model: crate::catalog::CollectionModel,
163) -> bool {
164 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
165}
166
167fn ensure_vector_dimension_contract(
168 db: &crate::storage::unified::devx::RedDB,
169 collection: &str,
170 actual_dimension: usize,
171) -> RedDBResult<()> {
172 let Some(expected_dimension) = db
173 .collection_contract(collection)
174 .and_then(|contract| contract.vector_dimension)
175 else {
176 return Ok(());
177 };
178 if expected_dimension == actual_dimension {
179 return Ok(());
180 }
181 Err(crate::RedDBError::Query(format!(
182 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
183 )))
184}
185
186fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
187 match model {
188 crate::catalog::CollectionModel::Table => "table",
189 crate::catalog::CollectionModel::Document => "document",
190 crate::catalog::CollectionModel::Graph => "graph",
191 crate::catalog::CollectionModel::Vector => "vector",
192 crate::catalog::CollectionModel::Hll => "hll",
193 crate::catalog::CollectionModel::Sketch => "sketch",
194 crate::catalog::CollectionModel::Filter => "filter",
195 crate::catalog::CollectionModel::Kv => "kv",
196 crate::catalog::CollectionModel::Config => "config",
197 crate::catalog::CollectionModel::Vault => "vault",
198 crate::catalog::CollectionModel::Mixed => "mixed",
199 crate::catalog::CollectionModel::TimeSeries => "timeseries",
200 crate::catalog::CollectionModel::Queue => "queue",
201 crate::catalog::CollectionModel::Metrics => "metrics",
202 }
203}
204
205#[derive(Clone)]
206struct UniquenessRule {
207 name: String,
208 columns: Vec<String>,
209 primary_key: bool,
210}
211
212#[derive(Copy, Clone, PartialEq, Eq)]
213enum NormalizeMode {
214 Insert,
218 Update,
222}
223
224mod collection_contract_enforcement {
225 use super::*;
226
227 pub(super) struct CollectionContractWriteEnforcer<'a> {
228 db: &'a crate::storage::unified::devx::RedDB,
229 collection: &'a str,
230 }
231
232 impl<'a> CollectionContractWriteEnforcer<'a> {
233 pub(super) fn new(
234 db: &'a crate::storage::unified::devx::RedDB,
235 collection: &'a str,
236 ) -> Self {
237 Self { db, collection }
238 }
239
240 pub(super) fn ensure_model(
241 &self,
242 requested_model: crate::catalog::CollectionModel,
243 ) -> RedDBResult<()> {
244 ensure_collection_model_contract(self.db, self.collection, requested_model)
245 }
246
247 pub(super) fn normalize_insert_fields(
248 &self,
249 fields: Vec<(String, Value)>,
250 ) -> RedDBResult<Vec<(String, Value)>> {
251 normalize_row_fields_for_contract_with_mode(
252 self.db,
253 self.collection,
254 fields,
255 NormalizeMode::Insert,
256 )
257 }
258
259 pub(super) fn normalize_update_fields(
260 &self,
261 fields: Vec<(String, Value)>,
262 ) -> RedDBResult<Vec<(String, Value)>> {
263 normalize_row_fields_for_contract_with_mode(
264 self.db,
265 self.collection,
266 fields,
267 NormalizeMode::Update,
268 )
269 }
270
271 pub(super) fn enforce_row_uniqueness(
272 &self,
273 fields: &[(String, Value)],
274 exclude_id: Option<crate::storage::EntityId>,
275 ) -> RedDBResult<()> {
276 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
277 }
278
279 pub(super) fn enforce_batch_uniqueness(
280 &self,
281 rows: &[Vec<(String, Value)>],
282 ) -> RedDBResult<()> {
283 enforce_row_batch_uniqueness(self.db, self.collection, rows)
284 }
285
286 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
287 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
288 }
289 }
290}
291
292use collection_contract_enforcement::CollectionContractWriteEnforcer;
293
294fn normalize_row_fields_for_contract_with_mode(
295 db: &crate::storage::unified::devx::RedDB,
296 collection: &str,
297 fields: Vec<(String, Value)>,
298 mode: NormalizeMode,
299) -> RedDBResult<Vec<(String, Value)>> {
300 let Some(contract) = db.collection_contract(collection) else {
301 return Ok(fields);
302 };
303
304 if contract.declared_model != crate::catalog::CollectionModel::Table
305 || (contract.declared_columns.is_empty()
306 && contract
307 .table_def
308 .as_ref()
309 .map(|table| table.columns.is_empty())
310 .unwrap_or(true))
311 {
312 return Ok(fields);
313 }
314
315 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
325 fields
326 .iter()
327 .find(|(n, _)| n == "created_at")
328 .map(|(_, v)| v.clone())
329 } else {
330 None
331 };
332
333 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
338 for (name, _) in &fields {
339 if name == "created_at" || name == "updated_at" {
340 return Err(crate::RedDBError::Query(format!(
341 "collection '{}' manages '{}' automatically — do not set it in INSERT",
342 collection, name
343 )));
344 }
345 }
346 }
347
348 let mut provided = std::collections::BTreeMap::new();
349 for (name, value) in &fields {
350 provided.insert(name.clone(), value.clone());
351 }
352
353 let resolved_columns = resolved_contract_columns(&contract)?;
354 let declared_names: std::collections::BTreeSet<String> = resolved_columns
355 .iter()
356 .map(|column| column.name.clone())
357 .collect();
358 let unknown_fields: Vec<String> = fields
359 .iter()
360 .filter(|(name, _)| !declared_names.contains(name))
361 .map(|(name, _)| name.clone())
362 .collect();
363 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
364 && !unknown_fields.is_empty()
365 {
366 return Err(crate::RedDBError::Query(format!(
367 "collection '{}' is strict and does not allow undeclared fields: {}",
368 collection,
369 unknown_fields.join(", ")
370 )));
371 }
372 let mut normalized = Vec::new();
373 let now_ms = current_unix_ms_u64();
374
375 for column in &resolved_columns {
376 match provided.remove(&column.name) {
377 Some(value) => {
378 if contract.timestamps_enabled && mode == NormalizeMode::Update {
383 match column.name.as_str() {
384 "created_at" => {
385 normalized.push((
386 column.name.clone(),
387 existing_created_at
388 .clone()
389 .unwrap_or(Value::UnsignedInteger(now_ms)),
390 ));
391 continue;
392 }
393 "updated_at" => {
394 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395 continue;
396 }
397 _ => {}
398 }
399 }
400 normalized.push((
401 column.name.clone(),
402 normalize_contract_value(collection, column, value)?,
403 ));
404 }
405 None => {
406 if contract.timestamps_enabled
410 && (column.name == "created_at" || column.name == "updated_at")
411 {
412 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
413 continue;
414 }
415 if let Some(default) = &column.default {
416 normalized.push((
417 column.name.clone(),
418 coerce_contract_literal(collection, &column.name, column, default)?,
419 ));
420 } else if column.not_null {
421 return Err(crate::RedDBError::Query(format!(
422 "missing required column '{}' for collection '{}'",
423 column.name, collection
424 )));
425 }
426 }
427 }
428 }
429
430 for (name, value) in fields {
431 if !declared_names.contains(&name) {
432 normalized.push((name, value));
433 }
434 }
435
436 Ok(normalized)
437}
438
439fn current_unix_ms_u64() -> u64 {
440 std::time::SystemTime::now()
441 .duration_since(std::time::UNIX_EPOCH)
442 .map(|d| d.as_millis() as u64)
443 .unwrap_or(0)
444}
445
446fn enforce_row_uniqueness(
447 db: &crate::storage::unified::devx::RedDB,
448 collection: &str,
449 fields: &[(String, Value)],
450 exclude_id: Option<crate::storage::EntityId>,
451) -> RedDBResult<()> {
452 let Some(contract) = db.collection_contract(collection) else {
453 return Ok(());
454 };
455 if contract.declared_model != crate::catalog::CollectionModel::Table
456 && contract.declared_model != crate::catalog::CollectionModel::Mixed
457 {
458 return Ok(());
459 }
460
461 let rules = resolved_uniqueness_rules(&contract);
462 if rules.is_empty() {
463 return Ok(());
464 }
465
466 let store = db.store();
467 let Some(manager) = store.get_collection(collection) else {
468 return Ok(());
469 };
470
471 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
472
473 for rule in &rules {
474 let mut expected_signatures = Vec::new();
475 let mut skip_rule = false;
476
477 for column in &rule.columns {
478 match input_fields.get(column) {
479 Some(Value::Null) | None if rule.primary_key => {
480 return Err(crate::RedDBError::Query(format!(
481 "primary key '{}' in collection '{}' requires non-null column '{}'",
482 rule.name, collection, column
483 )))
484 }
485 Some(Value::Null) | None => {
486 skip_rule = true;
487 break;
488 }
489 Some(value) => {
490 expected_signatures.push((column.clone(), value_signature(value)));
491 }
492 }
493 }
494
495 if skip_rule {
496 continue;
497 }
498
499 for entity in manager.query_all(|_| true) {
500 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
501 continue;
502 }
503 let Some(existing_fields) = row_fields_from_entity(&entity) else {
504 continue;
505 };
506
507 let duplicate = expected_signatures.iter().all(|(column, expected)| {
508 existing_fields
509 .get(column)
510 .map(|value| value_signature(value) == *expected)
511 .unwrap_or(false)
512 });
513
514 if duplicate {
515 let qualifier = if rule.primary_key {
516 "primary key"
517 } else {
518 "unique constraint"
519 };
520 return Err(crate::RedDBError::Query(format!(
521 "{} '{}' violated on collection '{}' for columns [{}]",
522 qualifier,
523 rule.name,
524 collection,
525 rule.columns.join(", ")
526 )));
527 }
528 }
529 }
530
531 Ok(())
532}
533
534fn enforce_row_batch_uniqueness(
535 db: &crate::storage::unified::devx::RedDB,
536 collection: &str,
537 rows: &[Vec<(String, Value)>],
538) -> RedDBResult<()> {
539 let Some(contract) = db.collection_contract(collection) else {
540 return Ok(());
541 };
542 if contract.declared_model != crate::catalog::CollectionModel::Table
543 && contract.declared_model != crate::catalog::CollectionModel::Mixed
544 {
545 return Ok(());
546 }
547
548 let rules = resolved_uniqueness_rules(&contract);
549 if rules.is_empty() {
550 return Ok(());
551 }
552
553 for rule in &rules {
554 let mut seen = std::collections::HashMap::<String, usize>::new();
555 for (row_index, fields) in rows.iter().enumerate() {
556 let input_fields: std::collections::BTreeMap<String, Value> =
557 fields.iter().cloned().collect();
558 let mut signatures = Vec::new();
559 let mut skip_rule = false;
560
561 for column in &rule.columns {
562 match input_fields.get(column) {
563 Some(Value::Null) | None if rule.primary_key => {
564 return Err(crate::RedDBError::Query(format!(
565 "primary key '{}' in collection '{}' requires non-null column '{}'",
566 rule.name, collection, column
567 )))
568 }
569 Some(Value::Null) | None => {
570 skip_rule = true;
571 break;
572 }
573 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
574 }
575 }
576
577 if skip_rule {
578 continue;
579 }
580
581 let signature = signatures.join("|");
582 if let Some(previous_index) = seen.insert(signature, row_index) {
583 return Err(crate::RedDBError::Query(format!(
584 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
585 rule.name,
586 collection,
587 previous_index + 1,
588 row_index + 1
589 )));
590 }
591 }
592 }
593
594 Ok(())
595}
596
597fn row_update_requires_uniqueness_check(
598 db: &crate::storage::unified::devx::RedDB,
599 collection: &str,
600 modified_columns: &[String],
601) -> bool {
602 if modified_columns.is_empty() {
603 return false;
604 }
605
606 let Some(contract) = db.collection_contract(collection) else {
607 return false;
608 };
609 if contract.declared_model != crate::catalog::CollectionModel::Table
610 && contract.declared_model != crate::catalog::CollectionModel::Mixed
611 {
612 return false;
613 }
614
615 let rules = resolved_uniqueness_rules(&contract);
616 if rules.is_empty() {
617 return false;
618 }
619
620 rules.iter().any(|rule| {
621 rule.columns.iter().any(|column| {
622 modified_columns
623 .iter()
624 .any(|modified| modified.eq_ignore_ascii_case(column))
625 })
626 })
627}
628
629pub(crate) fn build_row_update_contract_plan(
630 db: &crate::storage::unified::devx::RedDB,
631 collection: &str,
632) -> RedDBResult<Option<RowUpdateContractPlan>> {
633 let Some(contract) = db.collection_contract(collection) else {
634 return Ok(None);
635 };
636
637 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
638 && !(contract.declared_columns.is_empty()
639 && contract
640 .table_def
641 .as_ref()
642 .map(|table| table.columns.is_empty())
643 .unwrap_or(true))
644 {
645 resolved_contract_columns(&contract)?
646 .into_iter()
647 .map(|rule| {
648 (
649 rule.name.clone(),
650 RowUpdateColumnRule {
651 name: rule.name,
652 data_type: rule.data_type,
653 data_type_name: rule.data_type_name,
654 not_null: rule.not_null,
655 enum_variants: rule.enum_variants,
656 },
657 )
658 })
659 .collect()
660 } else {
661 HashMap::new()
662 };
663
664 let unique_columns = if matches!(
665 contract.declared_model,
666 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
667 ) {
668 resolved_uniqueness_rules(&contract)
669 .into_iter()
670 .flat_map(|rule| rule.columns.into_iter())
671 .map(|column| (column, ()))
672 .collect()
673 } else {
674 HashMap::new()
675 };
676
677 Ok(Some(RowUpdateContractPlan {
678 timestamps_enabled: contract.timestamps_enabled,
679 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
680 declared_rules,
681 unique_columns,
682 }))
683}
684
685pub(crate) fn normalize_row_update_assignment_with_plan(
686 collection: &str,
687 column: &str,
688 value: Value,
689 row_contract_plan: Option<&RowUpdateContractPlan>,
690) -> RedDBResult<Value> {
691 let Some(plan) = row_contract_plan else {
692 return Ok(value);
693 };
694
695 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
696 return Err(crate::RedDBError::Query(format!(
697 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
698 collection, column
699 )));
700 }
701
702 if let Some(rule) = plan.declared_rules.get(column) {
703 let rule = ResolvedColumnRule {
704 name: rule.name.clone(),
705 data_type: rule.data_type,
706 data_type_name: rule.data_type_name.clone(),
707 not_null: rule.not_null,
708 default: None,
709 enum_variants: rule.enum_variants.clone(),
710 };
711 normalize_contract_value(collection, &rule, value)
712 } else if plan.strict_schema {
713 Err(crate::RedDBError::Query(format!(
714 "collection '{}' is strict and does not allow undeclared fields: {}",
715 collection, column
716 )))
717 } else {
718 Ok(value)
719 }
720}
721
722pub(crate) fn normalize_row_update_value_for_rule(
723 collection: &str,
724 value: Value,
725 row_rule: Option<&RowUpdateColumnRule>,
726) -> RedDBResult<Value> {
727 let Some(rule) = row_rule else {
728 return Ok(value);
729 };
730
731 let rule = ResolvedColumnRule {
732 name: rule.name.clone(),
733 data_type: rule.data_type,
734 data_type_name: rule.data_type_name.clone(),
735 not_null: rule.not_null,
736 default: None,
737 enum_variants: rule.enum_variants.clone(),
738 };
739 normalize_contract_value(collection, &rule, value)
740}
741
742fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
743 if let Some(named) = row.named.as_mut() {
744 named.insert(name.to_string(), value);
745 return;
746 }
747
748 if let Some(schema) = row.schema.as_ref() {
749 if let Some(index) = schema.iter().position(|column| column == name) {
750 if let Some(slot) = row.columns.get_mut(index) {
751 *slot = value;
752 return;
753 }
754 }
755
756 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
757 for (column, current) in schema.iter().zip(row.columns.iter()) {
758 named.insert(column.clone(), current.clone());
759 }
760 named.insert(name.to_string(), value);
761 row.named = Some(named);
762 return;
763 }
764
765 let mut named = HashMap::with_capacity(1);
766 named.insert(name.to_string(), value);
767 row.named = Some(named);
768}
769
770fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
771 if let Some(named) = row.named.as_ref() {
772 named
773 .iter()
774 .map(|(key, value)| (key.clone(), value.clone()))
775 .collect()
776 } else if let Some(schema) = row.schema.as_ref() {
777 schema
778 .iter()
779 .cloned()
780 .zip(row.columns.iter().cloned())
781 .collect()
782 } else {
783 Vec::new()
784 }
785}
786
787fn apply_row_field_assignments_raw<I>(
788 row: &mut crate::storage::unified::entity::RowData,
789 field_assignments: I,
790) where
791 I: IntoIterator<Item = (String, Value)>,
792{
793 for (column, value) in field_assignments {
794 set_row_field(row, &column, value);
795 }
796}
797
798fn apply_row_field_assignments_incremental<I>(
799 collection: &str,
800 row: &mut crate::storage::unified::entity::RowData,
801 field_assignments: I,
802 row_contract_plan: Option<&RowUpdateContractPlan>,
803) -> RedDBResult<()>
804where
805 I: IntoIterator<Item = (String, Value)>,
806{
807 for (column, value) in field_assignments {
808 let value = normalize_row_update_assignment_with_plan(
809 collection,
810 &column,
811 value,
812 row_contract_plan,
813 )?;
814
815 set_row_field(row, &column, value);
816 }
817
818 Ok(())
819}
820
821fn resolved_uniqueness_rules(
822 contract: &crate::physical::CollectionContract,
823) -> Vec<UniquenessRule> {
824 let mut rules = Vec::new();
825
826 if let Some(table_def) = &contract.table_def {
827 if !table_def.primary_key.is_empty() {
828 rules.push(UniquenessRule {
829 name: "primary_key".to_string(),
830 columns: table_def.primary_key.clone(),
831 primary_key: true,
832 });
833 }
834
835 for constraint in &table_def.constraints {
836 if matches!(
837 constraint.constraint_type,
838 crate::storage::schema::ConstraintType::PrimaryKey
839 ) && !constraint.columns.is_empty()
840 {
841 rules.push(UniquenessRule {
842 name: constraint.name.clone(),
843 columns: constraint.columns.clone(),
844 primary_key: true,
845 });
846 } else if matches!(
847 constraint.constraint_type,
848 crate::storage::schema::ConstraintType::Unique
849 ) && !constraint.columns.is_empty()
850 {
851 rules.push(UniquenessRule {
852 name: constraint.name.clone(),
853 columns: constraint.columns.clone(),
854 primary_key: false,
855 });
856 }
857 }
858 } else {
859 for column in &contract.declared_columns {
860 if column.primary_key {
861 rules.push(UniquenessRule {
862 name: format!("pk_{}", column.name),
863 columns: vec![column.name.clone()],
864 primary_key: true,
865 });
866 } else if column.unique {
867 rules.push(UniquenessRule {
868 name: format!("uniq_{}", column.name),
869 columns: vec![column.name.clone()],
870 primary_key: false,
871 });
872 }
873 }
874 }
875
876 let mut dedup = std::collections::BTreeSet::new();
877 rules
878 .into_iter()
879 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
880 .collect()
881}
882
883fn row_fields_from_entity(
884 entity: &crate::storage::UnifiedEntity,
885) -> Option<std::collections::BTreeMap<String, Value>> {
886 match &entity.data {
887 crate::storage::EntityData::Row(row) => {
888 if let Some(named) = &row.named {
889 Some(
890 named
891 .iter()
892 .map(|(key, value)| (key.clone(), value.clone()))
893 .collect(),
894 )
895 } else {
896 row.schema.as_ref().map(|schema| {
897 schema
898 .iter()
899 .cloned()
900 .zip(row.columns.iter().cloned())
901 .collect()
902 })
903 }
904 }
905 _ => None,
906 }
907}
908
909fn value_signature(value: &Value) -> String {
910 format!("{value:?}")
911}
912
913fn normalize_contract_value(
914 collection: &str,
915 column: &ResolvedColumnRule,
916 value: Value,
917) -> RedDBResult<Value> {
918 if matches!(value, Value::Null) {
919 if column.not_null {
920 return Err(crate::RedDBError::Query(format!(
921 "column '{}' in collection '{}' cannot be null",
922 column.name, collection
923 )));
924 }
925 return Ok(Value::Null);
926 }
927
928 let target = column.data_type;
929 if value_matches_declared_type(&value, target) {
930 return Ok(value);
931 }
932
933 let Some(raw) = value_to_coercion_input(&value) else {
934 return Err(crate::RedDBError::Query(format!(
935 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
936 column.name, collection, column.data_type_name
937 )));
938 };
939
940 coerce_contract_literal(collection, &column.name, column, &raw)
941}
942
943fn coerce_contract_literal(
944 collection: &str,
945 column_name: &str,
946 column: &ResolvedColumnRule,
947 raw: &str,
948) -> RedDBResult<Value> {
949 let target = column.data_type;
950 match target {
951 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
952 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
953 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
954 crate::RedDBError::Query(format!(
955 "failed to coerce column '{}' in collection '{}' to '{}': {}",
956 column_name, collection, column.data_type_name, err
957 ))
958 }),
959 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
960 crate::RedDBError::Query(format!(
961 "failed to coerce column '{}' in collection '{}' to '{}': {}",
962 column_name, collection, column.data_type_name, err
963 ))
964 }),
965 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
966 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
967 column_name, collection, column.data_type_name
968 ))),
969 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
970 |err| {
971 crate::RedDBError::Query(format!(
972 "failed to coerce column '{}' in collection '{}' to '{}': {}",
973 column_name, collection, column.data_type_name, err
974 ))
975 },
976 ),
977 }
978}
979
980struct ResolvedColumnRule {
981 name: String,
982 data_type: DataType,
983 data_type_name: String,
984 not_null: bool,
985 default: Option<String>,
986 enum_variants: Vec<String>,
987}
988
989fn resolved_contract_columns(
990 contract: &crate::physical::CollectionContract,
991) -> RedDBResult<Vec<ResolvedColumnRule>> {
992 if let Some(table_def) = &contract.table_def {
993 return Ok(table_def
994 .columns
995 .iter()
996 .map(|column| ResolvedColumnRule {
997 name: column.name.clone(),
998 data_type: column.data_type,
999 data_type_name: data_type_name(column.data_type).to_string(),
1000 not_null: !column.nullable,
1001 default: column
1002 .default
1003 .as_ref()
1004 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
1005 enum_variants: column.enum_variants.clone(),
1006 })
1007 .collect());
1008 }
1009
1010 contract
1011 .declared_columns
1012 .iter()
1013 .map(|column| {
1014 let data_type = column
1015 .sql_type
1016 .as_ref()
1017 .map(crate::storage::query::resolve_sql_type_name)
1018 .transpose()
1019 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1020 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1021 Ok(ResolvedColumnRule {
1022 name: column.name.clone(),
1023 data_type,
1024 data_type_name: column.data_type.clone(),
1025 not_null: column.not_null,
1026 default: column.default.clone(),
1027 enum_variants: column.enum_variants.clone(),
1028 })
1029 })
1030 .collect()
1031}
1032
1033fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1034 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1035}
1036
1037fn data_type_name(data_type: DataType) -> &'static str {
1038 match data_type {
1039 DataType::Integer => "integer",
1040 DataType::UnsignedInteger => "unsigned_integer",
1041 DataType::Float => "float",
1042 DataType::Text => "text",
1043 DataType::Blob => "blob",
1044 DataType::Boolean => "boolean",
1045 DataType::Timestamp => "timestamp",
1046 DataType::Duration => "duration",
1047 DataType::IpAddr => "ipaddr",
1048 DataType::MacAddr => "macaddr",
1049 DataType::Vector => "vector",
1050 DataType::Nullable => "nullable",
1051 DataType::Unknown => "unknown",
1052 DataType::Json => "json",
1053 DataType::Uuid => "uuid",
1054 DataType::NodeRef => "noderef",
1055 DataType::EdgeRef => "edgeref",
1056 DataType::VectorRef => "vectorref",
1057 DataType::RowRef => "rowref",
1058 DataType::Color => "color",
1059 DataType::Email => "email",
1060 DataType::Url => "url",
1061 DataType::Phone => "phone",
1062 DataType::Semver => "semver",
1063 DataType::Cidr => "cidr",
1064 DataType::Date => "date",
1065 DataType::Time => "time",
1066 DataType::Decimal => "decimal",
1067 DataType::Enum => "enum",
1068 DataType::Array => "array",
1069 DataType::TimestampMs => "timestamp_ms",
1070 DataType::Ipv4 => "ipv4",
1071 DataType::Ipv6 => "ipv6",
1072 DataType::Subnet => "subnet",
1073 DataType::Port => "port",
1074 DataType::Latitude => "latitude",
1075 DataType::Longitude => "longitude",
1076 DataType::GeoPoint => "geopoint",
1077 DataType::Country2 => "country2",
1078 DataType::Country3 => "country3",
1079 DataType::Lang2 => "lang2",
1080 DataType::Lang5 => "lang5",
1081 DataType::Currency => "currency",
1082 DataType::AssetCode => "asset_code",
1083 DataType::Money => "money",
1084 DataType::ColorAlpha => "color_alpha",
1085 DataType::BigInt => "bigint",
1086 DataType::KeyRef => "keyref",
1087 DataType::DocRef => "docref",
1088 DataType::TableRef => "tableref",
1089 DataType::PageRef => "pageref",
1090 DataType::Secret => "secret",
1091 DataType::Password => "password",
1092 DataType::TextZstd => "text",
1093 DataType::BlobZstd => "blob",
1094 }
1095}
1096
1097fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1098 matches!(
1099 (value, target),
1100 (Value::Null, _)
1101 | (Value::Integer(_), DataType::Integer)
1102 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1103 | (Value::Float(_), DataType::Float)
1104 | (Value::Text(_), DataType::Text)
1105 | (Value::Blob(_), DataType::Blob)
1106 | (Value::Boolean(_), DataType::Boolean)
1107 | (Value::Timestamp(_), DataType::Timestamp)
1108 | (Value::Duration(_), DataType::Duration)
1109 | (Value::IpAddr(_), DataType::IpAddr)
1110 | (Value::MacAddr(_), DataType::MacAddr)
1111 | (Value::Vector(_), DataType::Vector)
1112 | (Value::Json(_), DataType::Json)
1113 | (Value::Uuid(_), DataType::Uuid)
1114 | (Value::NodeRef(_), DataType::NodeRef)
1115 | (Value::EdgeRef(_), DataType::EdgeRef)
1116 | (Value::VectorRef(_, _), DataType::VectorRef)
1117 | (Value::RowRef(_, _), DataType::RowRef)
1118 | (Value::Color(_), DataType::Color)
1119 | (Value::Email(_), DataType::Email)
1120 | (Value::Url(_), DataType::Url)
1121 | (Value::Phone(_), DataType::Phone)
1122 | (Value::Semver(_), DataType::Semver)
1123 | (Value::Cidr(_, _), DataType::Cidr)
1124 | (Value::Date(_), DataType::Date)
1125 | (Value::Time(_), DataType::Time)
1126 | (Value::Decimal(_), DataType::Decimal)
1127 | (Value::EnumValue(_), DataType::Enum)
1128 | (Value::Array(_), DataType::Array)
1129 | (Value::TimestampMs(_), DataType::TimestampMs)
1130 | (Value::Ipv4(_), DataType::Ipv4)
1131 | (Value::Ipv6(_), DataType::Ipv6)
1132 | (Value::Subnet(_, _), DataType::Subnet)
1133 | (Value::Port(_), DataType::Port)
1134 | (Value::Latitude(_), DataType::Latitude)
1135 | (Value::Longitude(_), DataType::Longitude)
1136 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1137 | (Value::Country2(_), DataType::Country2)
1138 | (Value::Country3(_), DataType::Country3)
1139 | (Value::Lang2(_), DataType::Lang2)
1140 | (Value::Lang5(_), DataType::Lang5)
1141 | (Value::Currency(_), DataType::Currency)
1142 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1143 | (Value::BigInt(_), DataType::BigInt)
1144 | (Value::KeyRef(_, _), DataType::KeyRef)
1145 | (Value::DocRef(_, _), DataType::DocRef)
1146 | (Value::TableRef(_), DataType::TableRef)
1147 | (Value::PageRef(_), DataType::PageRef)
1148 | (Value::Secret(_), DataType::Secret)
1149 | (Value::Password(_), DataType::Password)
1150 )
1151}
1152
1153fn value_to_coercion_input(value: &Value) -> Option<String> {
1154 match value {
1155 Value::Null => None,
1156 Value::Integer(value) => Some(value.to_string()),
1157 Value::UnsignedInteger(value) => Some(value.to_string()),
1158 Value::Float(value) => Some(value.to_string()),
1159 Value::Text(value) => Some(value.to_string()),
1160 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1161 Value::Boolean(value) => Some(value.to_string()),
1162 Value::Timestamp(value) => Some(value.to_string()),
1163 Value::Duration(value) => Some(value.to_string()),
1164 Value::IpAddr(value) => Some(value.to_string()),
1165 Value::MacAddr(value) => Some(format!(
1166 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1167 value[0], value[1], value[2], value[3], value[4], value[5]
1168 )),
1169 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1170 Value::Email(value) => Some(value.clone()),
1171 Value::Url(value) => Some(value.clone()),
1172 Value::Phone(value) => Some(value.to_string()),
1173 Value::Semver(value) => Some(format!(
1174 "{}.{}.{}",
1175 value / 1_000_000,
1176 (value / 1_000) % 1_000,
1177 value % 1_000
1178 )),
1179 Value::Date(value) => Some(value.to_string()),
1180 Value::Time(value) => Some(value.to_string()),
1181 Value::Decimal(value) => Some(value.to_string()),
1182 Value::TimestampMs(value) => Some(value.to_string()),
1183 Value::Ipv4(value) => Some(format!(
1184 "{}.{}.{}.{}",
1185 (value >> 24) & 0xFF,
1186 (value >> 16) & 0xFF,
1187 (value >> 8) & 0xFF,
1188 value & 0xFF
1189 )),
1190 Value::Port(value) => Some(value.to_string()),
1191 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1192 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1193 Value::GeoPoint(lat, lon) => Some(format!(
1194 "{},{}",
1195 *lat as f64 / 1_000_000.0,
1196 *lon as f64 / 1_000_000.0
1197 )),
1198 Value::BigInt(value) => Some(value.to_string()),
1199 Value::TableRef(value) => Some(value.clone()),
1200 Value::PageRef(value) => Some(value.to_string()),
1201 Value::Password(value) => Some(value.clone()),
1202 _ => None,
1203 }
1204}
1205
1206fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1207 if modified_columns.is_empty() {
1208 return modified_columns;
1209 }
1210
1211 let mut unique = Vec::with_capacity(modified_columns.len());
1212 for column in modified_columns.drain(..) {
1213 if !unique
1214 .iter()
1215 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1216 {
1217 unique.push(column);
1218 }
1219 }
1220 unique
1221}
1222
1223fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1224 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1225 return Err(crate::RedDBError::Query(
1226 "array positional document patch paths are unsupported; replace the array or full document body instead"
1227 .to_string(),
1228 ));
1229 }
1230 Ok(())
1231}
1232
1233fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1234 match fields.get("body") {
1235 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1236 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1237 }),
1238 Some(_) => Err(crate::RedDBError::Query(
1239 "document body field must contain JSON".to_string(),
1240 )),
1241 None => Ok(JsonValue::Object(Default::default())),
1242 }
1243}
1244
1245fn replace_document_row_body(
1246 fields: &mut HashMap<String, Value>,
1247 body: JsonValue,
1248 modified_columns: &mut Vec<String>,
1249) -> RedDBResult<()> {
1250 modified_columns.push("body".to_string());
1251
1252 let old_keys: Vec<String> = fields
1253 .keys()
1254 .filter(|key| key.as_str() != "body")
1255 .cloned()
1256 .collect();
1257 for key in old_keys {
1258 fields.remove(&key);
1259 modified_columns.push(key);
1260 }
1261
1262 let body_bytes = json_to_vec(&body).map_err(|err| {
1263 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1264 })?;
1265 fields.insert("body".to_string(), Value::Json(body_bytes));
1266
1267 if let JsonValue::Object(map) = &body {
1268 for (key, value) in map {
1269 fields.insert(key.clone(), json_to_storage_value(value)?);
1270 modified_columns.push(key.clone());
1271 }
1272 }
1273
1274 Ok(())
1275}
1276
1277impl RedDBRuntime {
1278 pub(crate) fn apply_loaded_patch_entity_core(
1279 &self,
1280 collection: String,
1281 mut entity: crate::storage::UnifiedEntity,
1282 payload: JsonValue,
1283 operations: Vec<PatchEntityOperation>,
1284 ) -> RedDBResult<AppliedEntityMutation> {
1285 let id = entity.id;
1286 let operations = normalize_ttl_patch_operations(operations)?;
1287 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1290
1291 let db = self.db();
1292 let store = db.store();
1293 let Some(manager) = store.get_collection(&collection) else {
1294 return Err(crate::RedDBError::NotFound(format!(
1295 "collection not found: {collection}"
1296 )));
1297 };
1298
1299 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1300 let mut metadata_changed = false;
1301 let mut modified_columns: Vec<String> = Vec::new();
1302 let mut context_index_dirty = false;
1303 let mut graph_node_type: Option<String> = None;
1304 let mut graph_edge_weight: Option<f32> = None;
1305
1306 let row_contract_timestamps = db
1307 .collection_contract(&collection)
1308 .map(|c| c.timestamps_enabled)
1309 .unwrap_or(false);
1310
1311 match &mut entity.data {
1312 crate::storage::EntityData::Row(row) => {
1313 let is_document_collection = db
1314 .collection_contract(&collection)
1315 .map(|contract| {
1316 contract.declared_model == crate::catalog::CollectionModel::Document
1317 })
1318 .unwrap_or(false);
1319 let mut field_ops = Vec::new();
1320 let mut metadata_ops = Vec::new();
1321 let mut document_body_ops = Vec::new();
1322
1323 for mut op in operations {
1324 let Some(root) = op.path.first().map(String::as_str) else {
1325 return Err(crate::RedDBError::Query(
1326 "patch path cannot be empty".to_string(),
1327 ));
1328 };
1329
1330 match root {
1331 "body" if is_document_collection => {
1332 if op.path.len() < 2 {
1333 return Err(crate::RedDBError::Query(
1334 "document body patch paths require a nested key; use payload.body for full replacement"
1335 .to_string(),
1336 ));
1337 }
1338 op.path.remove(0);
1339 reject_document_array_position_path(&op.path)?;
1340 document_body_ops.push(op);
1341 }
1342 "fields" | "named" => {
1343 if op.path.len() < 2 {
1344 return Err(crate::RedDBError::Query(
1345 "patch path 'fields' requires a nested key".to_string(),
1346 ));
1347 }
1348 if row_contract_timestamps {
1349 let leaf = op.path.get(1).map(String::as_str);
1350 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1351 return Err(crate::RedDBError::Query(format!(
1352 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1353 collection,
1354 leaf.unwrap_or("")
1355 )));
1356 }
1357 }
1358 op.path.remove(0);
1359 field_ops.push(op);
1360 }
1361 "metadata" => {
1362 if op.path.len() < 2 {
1363 return Err(crate::RedDBError::Query(
1364 "patch path 'metadata' requires a nested key".to_string(),
1365 ));
1366 }
1367 op.path.remove(0);
1368 metadata_ops.push(op);
1369 }
1370 _ => {
1371 return Err(crate::RedDBError::Query(format!(
1372 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1373 )));
1374 }
1375 }
1376 }
1377
1378 if !document_body_ops.is_empty() {
1379 context_index_dirty = true;
1380 let named = row.named.get_or_insert_with(Default::default);
1381 let mut body = document_body_from_named(named)?;
1382 apply_patch_operations_to_json(&mut body, &document_body_ops)
1383 .map_err(crate::RedDBError::Query)?;
1384 replace_document_row_body(named, body, &mut modified_columns)?;
1385 }
1386
1387 if is_document_collection {
1388 if let Some(body) = payload.get("body") {
1389 context_index_dirty = true;
1390 let named = row.named.get_or_insert_with(Default::default);
1391 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1392 }
1393 }
1394
1395 if !field_ops.is_empty() {
1396 context_index_dirty = true;
1397 for op in &field_ops {
1398 if let Some(col) = op.path.first() {
1399 modified_columns.push(col.clone());
1400 }
1401 }
1402 let named = row.named.get_or_insert_with(Default::default);
1403 apply_patch_operations_to_storage_map(named, &field_ops)?;
1404 }
1405
1406 if let Some(fields) = payload
1407 .get("fields")
1408 .and_then(crate::json::Value::as_object)
1409 {
1410 if row_contract_timestamps {
1411 for key in fields.keys() {
1412 if key == "created_at" || key == "updated_at" {
1413 return Err(crate::RedDBError::Query(format!(
1414 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1415 collection, key
1416 )));
1417 }
1418 }
1419 }
1420 context_index_dirty = true;
1421 let named = row.named.get_or_insert_with(Default::default);
1422 for (key, value) in fields {
1423 modified_columns.push(key.clone());
1424 named.insert(key.clone(), json_to_storage_value(value)?);
1425 }
1426 }
1427
1428 if !metadata_ops.is_empty() {
1429 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1430 let metadata = patch_metadata.get_or_insert_with(|| {
1431 store.get_metadata(&collection, id).unwrap_or_default()
1432 });
1433 let mut metadata_json = metadata_to_json(metadata);
1434 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1435 .map_err(crate::RedDBError::Query)?;
1436 *metadata = metadata_from_json(&metadata_json)?;
1437 metadata_changed = true;
1438 }
1439
1440 if !modified_columns.is_empty() || row_contract_timestamps {
1441 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1442 let current_fields = if let Some(named) = row.named.take() {
1443 named.into_iter().collect::<Vec<_>>()
1444 } else if let Some(schema) = row.schema.as_ref() {
1445 schema
1446 .iter()
1447 .cloned()
1448 .zip(row.columns.iter().cloned())
1449 .collect::<Vec<_>>()
1450 } else {
1451 Vec::new()
1452 };
1453 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1454 if row_contract_timestamps {
1455 modified_columns.push("updated_at".to_string());
1456 context_index_dirty = true;
1457 }
1458 if contract.requires_uniqueness_check(&modified_columns) {
1459 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1460 }
1461 row.named = Some(normalized_fields.into_iter().collect());
1462 }
1463 }
1464 crate::storage::EntityData::Node(node) => {
1465 let mut field_ops = Vec::new();
1466 let mut metadata_ops = Vec::new();
1467 let mut node_type_ops = Vec::new();
1468
1469 for mut op in operations {
1470 let Some(root) = op.path.first().map(String::as_str) else {
1471 return Err(crate::RedDBError::Query(
1472 "patch path cannot be empty".to_string(),
1473 ));
1474 };
1475
1476 match root {
1477 "fields" | "properties" => {
1478 if op.path.len() < 2 {
1479 return Err(crate::RedDBError::Query(
1480 "patch path 'fields' requires a nested key".to_string(),
1481 ));
1482 }
1483 op.path.remove(0);
1484 field_ops.push(op);
1485 }
1486 "metadata" => {
1487 if op.path.len() < 2 {
1488 return Err(crate::RedDBError::Query(
1489 "patch path 'metadata' requires a nested key".to_string(),
1490 ));
1491 }
1492 op.path.remove(0);
1493 metadata_ops.push(op);
1494 }
1495 "node_type" => {
1496 if op.path.len() != 1 {
1497 return Err(crate::RedDBError::Query(
1498 "patch path 'node_type' does not allow nested keys".to_string(),
1499 ));
1500 }
1501 op.path.clear();
1502 node_type_ops.push(op);
1503 }
1504 _ => {
1505 return Err(crate::RedDBError::Query(format!(
1506 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1507 )));
1508 }
1509 }
1510 }
1511
1512 for op in node_type_ops {
1513 context_index_dirty = true;
1514 let value = op.value.ok_or_else(|| {
1515 crate::RedDBError::Query("node_type operations require a value".to_string())
1516 })?;
1517
1518 match op.op {
1519 PatchEntityOperationType::Unset => {
1520 return Err(crate::RedDBError::Query(
1521 "node_type cannot be unset through patch operations".to_string(),
1522 ));
1523 }
1524 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1525 let Some(node_type) = value.as_str() else {
1526 return Err(crate::RedDBError::Query(
1527 "node_type operation requires a text value".to_string(),
1528 ));
1529 };
1530 graph_node_type = Some(node_type.to_string());
1531 modified_columns.push("node_type".to_string());
1532 }
1533 }
1534 }
1535
1536 if !field_ops.is_empty() {
1537 context_index_dirty = true;
1538 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1539 }
1540
1541 if let Some(fields) = payload
1542 .get("fields")
1543 .and_then(crate::json::Value::as_object)
1544 {
1545 context_index_dirty = true;
1546 for (key, value) in fields {
1547 node.properties
1548 .insert(key.clone(), json_to_storage_value(value)?);
1549 }
1550 }
1551
1552 if !metadata_ops.is_empty() {
1553 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1554 let metadata = patch_metadata.get_or_insert_with(|| {
1555 store.get_metadata(&collection, id).unwrap_or_default()
1556 });
1557 let mut metadata_json = metadata_to_json(metadata);
1558 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1559 .map_err(crate::RedDBError::Query)?;
1560 *metadata = metadata_from_json(&metadata_json)?;
1561 metadata_changed = true;
1562 }
1563 }
1564 crate::storage::EntityData::Edge(edge) => {
1565 let mut field_ops = Vec::new();
1566 let mut metadata_ops = Vec::new();
1567 let mut weight_ops = Vec::new();
1568
1569 for mut op in operations {
1570 let Some(root) = op.path.first().map(String::as_str) else {
1571 return Err(crate::RedDBError::Query(
1572 "patch path cannot be empty".to_string(),
1573 ));
1574 };
1575
1576 match root {
1577 "fields" | "properties" => {
1578 if op.path.len() < 2 {
1579 return Err(crate::RedDBError::Query(
1580 "patch path 'fields' requires a nested key".to_string(),
1581 ));
1582 }
1583 op.path.remove(0);
1584 field_ops.push(op);
1585 }
1586 "weight" => {
1587 if op.path.len() != 1 {
1588 return Err(crate::RedDBError::Query(
1589 "patch path 'weight' does not allow nested keys".to_string(),
1590 ));
1591 }
1592 op.path.clear();
1593 weight_ops.push(op);
1594 }
1595 "metadata" => {
1596 if op.path.len() < 2 {
1597 return Err(crate::RedDBError::Query(
1598 "patch path 'metadata' requires a nested key".to_string(),
1599 ));
1600 }
1601 op.path.remove(0);
1602 metadata_ops.push(op);
1603 }
1604 _ => {
1605 return Err(crate::RedDBError::Query(format!(
1606 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1607 )));
1608 }
1609 }
1610 }
1611
1612 if !field_ops.is_empty() {
1613 context_index_dirty = true;
1614 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1615 }
1616
1617 for op in weight_ops {
1618 context_index_dirty = true;
1619 let value = op.value.ok_or_else(|| {
1620 crate::RedDBError::Query("weight operations require a value".to_string())
1621 })?;
1622
1623 match op.op {
1624 PatchEntityOperationType::Unset => {
1625 return Err(crate::RedDBError::Query(
1626 "weight cannot be unset through patch operations".to_string(),
1627 ));
1628 }
1629 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1630 let Some(weight) = value.as_f64() else {
1631 return Err(crate::RedDBError::Query(
1632 "weight operation requires a numeric value".to_string(),
1633 ));
1634 };
1635 graph_edge_weight = Some(weight as f32);
1636 edge.weight = weight as f32;
1637 modified_columns.push("weight".to_string());
1638 }
1639 }
1640 }
1641
1642 if let Some(fields) = payload
1643 .get("fields")
1644 .and_then(crate::json::Value::as_object)
1645 {
1646 context_index_dirty = true;
1647 for (key, value) in fields {
1648 edge.properties
1649 .insert(key.clone(), json_to_storage_value(value)?);
1650 }
1651 }
1652
1653 if !metadata_ops.is_empty() {
1654 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1655 let metadata = patch_metadata.get_or_insert_with(|| {
1656 store.get_metadata(&collection, id).unwrap_or_default()
1657 });
1658 let mut metadata_json = metadata_to_json(metadata);
1659 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1660 .map_err(crate::RedDBError::Query)?;
1661 *metadata = metadata_from_json(&metadata_json)?;
1662 metadata_changed = true;
1663 }
1664 }
1665 crate::storage::EntityData::Vector(vector) => {
1666 let mut field_ops = Vec::new();
1667 let mut metadata_ops = Vec::new();
1668
1669 for mut op in operations {
1670 let Some(root) = op.path.first().map(String::as_str) else {
1671 return Err(crate::RedDBError::Query(
1672 "patch path cannot be empty".to_string(),
1673 ));
1674 };
1675
1676 match root {
1677 "fields" => {
1678 if op.path.len() < 2 {
1679 return Err(crate::RedDBError::Query(
1680 "patch path 'fields' requires a nested key".to_string(),
1681 ));
1682 }
1683 op.path.remove(0);
1684 let Some(target) = op.path.first().map(String::as_str) else {
1685 return Err(crate::RedDBError::Query(
1686 "patch path requires a target under fields".to_string(),
1687 ));
1688 };
1689 if !matches!(target, "dense" | "content" | "sparse") {
1690 return Err(crate::RedDBError::Query(format!(
1691 "unsupported vector patch target '{target}'"
1692 )));
1693 }
1694 field_ops.push(op);
1695 }
1696 "metadata" => {
1697 if op.path.len() < 2 {
1698 return Err(crate::RedDBError::Query(
1699 "patch path 'metadata' requires a nested key".to_string(),
1700 ));
1701 }
1702 op.path.remove(0);
1703 metadata_ops.push(op);
1704 }
1705 _ => {
1706 return Err(crate::RedDBError::Query(format!(
1707 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1708 )));
1709 }
1710 }
1711 }
1712
1713 if !field_ops.is_empty() {
1714 context_index_dirty = true;
1715 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1716 }
1717
1718 if let Some(fields) = payload
1719 .get("fields")
1720 .and_then(crate::json::Value::as_object)
1721 {
1722 context_index_dirty = true;
1723 if let Some(content) =
1724 fields.get("content").and_then(crate::json::Value::as_str)
1725 {
1726 vector.content = Some(content.to_string());
1727 }
1728 if let Some(dense) = fields.get("dense") {
1729 vector.dense = dense
1730 .as_array()
1731 .ok_or_else(|| {
1732 crate::RedDBError::Query(
1733 "field 'dense' must be an array".to_string(),
1734 )
1735 })?
1736 .iter()
1737 .map(|value| {
1738 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1739 crate::RedDBError::Query(
1740 "field 'dense' must contain only numbers".to_string(),
1741 )
1742 })
1743 })
1744 .collect::<Result<Vec<_>, _>>()?;
1745 }
1746 }
1747
1748 if !metadata_ops.is_empty() {
1749 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1750 let metadata = patch_metadata.get_or_insert_with(|| {
1751 store.get_metadata(&collection, id).unwrap_or_default()
1752 });
1753 let mut metadata_json = metadata_to_json(metadata);
1754 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1755 .map_err(crate::RedDBError::Query)?;
1756 *metadata = metadata_from_json(&metadata_json)?;
1757 metadata_changed = true;
1758 }
1759 }
1760 crate::storage::EntityData::TimeSeries(_)
1761 | crate::storage::EntityData::QueueMessage(_) => {
1762 return Err(crate::RedDBError::Query(
1763 "patch operations are not supported for TimeSeries or QueueMessage entities"
1764 .to_string(),
1765 ));
1766 }
1767 }
1768
1769 if let Some(node_type) = graph_node_type {
1770 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1771 node.node_type = node_type;
1772 }
1773 }
1774 if let Some(weight) = graph_edge_weight {
1775 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1776 edge.weight = (weight * 1000.0) as u32;
1777 }
1778 }
1779
1780 if let Some(metadata) = payload
1781 .get("metadata")
1782 .and_then(crate::json::Value::as_object)
1783 {
1784 let patch_metadata = patch_metadata
1785 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1786 for (key, value) in metadata {
1787 ensure_non_tree_reserved_metadata_key(key)?;
1788 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1789 }
1790 metadata_changed = true;
1791 }
1792
1793 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1794 let patch_metadata = patch_metadata
1795 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1796 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1797 patch_metadata.remove(&key);
1798 } else {
1799 patch_metadata.set(key, value);
1800 }
1801 metadata_changed = true;
1802 }
1803
1804 entity.updated_at = std::time::SystemTime::now()
1805 .duration_since(std::time::UNIX_EPOCH)
1806 .unwrap_or_default()
1807 .as_secs();
1808
1809 modified_columns = dedupe_modified_columns(modified_columns);
1810
1811 Ok(AppliedEntityMutation {
1812 id,
1813 collection,
1814 entity,
1815 metadata: patch_metadata,
1816 modified_columns,
1817 persist_metadata: metadata_changed,
1818 context_index_dirty,
1819 replaced_entity: None,
1820 replaced_entity_previous_xmax: 0,
1821 pre_mutation_fields,
1822 })
1823 }
1824
1825 pub(crate) fn apply_loaded_sql_update_row_core(
1826 &self,
1827 collection: String,
1828 mut entity: crate::storage::UnifiedEntity,
1829 static_field_assignments: &[(String, Value)],
1830 dynamic_field_assignments: Vec<(String, Value)>,
1831 static_metadata_assignments: &[(String, MetadataValue)],
1832 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1833 row_contract_plan: Option<&RowUpdateContractPlan>,
1834 row_modified_columns_template: &[String],
1835 row_touches_unique_columns: bool,
1836 ) -> RedDBResult<AppliedEntityMutation> {
1837 let id = entity.id;
1838 let previous_xmax = entity.xmax;
1839 let db = self.db();
1840 let store = db.store();
1841 let Some(_) = store.get_collection(&collection) else {
1842 return Err(crate::RedDBError::NotFound(format!(
1843 "collection not found: {collection}"
1844 )));
1845 };
1846
1847 let versioned_update_xid = match self.current_xid() {
1848 Some(xid) => Some(xid),
1849 None => {
1850 let snapshot_manager = self.snapshot_manager();
1851 let xid = snapshot_manager.begin();
1852 snapshot_manager.commit(xid);
1853 Some(xid)
1854 }
1855 };
1856 let mut replaced_entity = versioned_update_xid.map(|xid| {
1857 let mut old = entity.clone();
1858 old.set_xmax(xid);
1859 old
1860 });
1861
1862 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1863 let row_contract_timestamps = row_contract_plan
1864 .map(|plan| plan.timestamps_enabled)
1865 .unwrap_or(false);
1866 let mut metadata_changed = false;
1867 let mut modified_columns = row_modified_columns_template.to_vec();
1868 let mut context_index_dirty = !modified_columns.is_empty();
1869
1870 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1874
1875 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1876 return Err(crate::RedDBError::Query(
1877 "SQL row update fast path requires a row entity".to_string(),
1878 ));
1879 };
1880
1881 let _ = row_contract_plan;
1882 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1883 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1884
1885 for (key, value) in static_metadata_assignments
1886 .iter()
1887 .cloned()
1888 .chain(dynamic_metadata_assignments)
1889 {
1890 ensure_non_tree_reserved_metadata_key(&key)?;
1891 patch_metadata
1892 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1893 .set(key, value);
1894 metadata_changed = true;
1895 }
1896
1897 if !modified_columns.is_empty() || row_contract_timestamps {
1898 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1899 if row_contract_timestamps {
1900 context_index_dirty = true;
1901 set_row_field(
1902 row,
1903 "updated_at",
1904 Value::UnsignedInteger(current_unix_ms_u64()),
1905 );
1906 modified_columns.push("updated_at".to_string());
1907 }
1908 if row_touches_unique_columns {
1909 let current_fields = collect_row_fields(row);
1910 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1911 }
1912 }
1913
1914 entity.updated_at = std::time::SystemTime::now()
1915 .duration_since(std::time::UNIX_EPOCH)
1916 .unwrap_or_default()
1917 .as_secs();
1918
1919 if let Some(xid) = versioned_update_xid {
1920 let logical_id = entity.logical_id();
1921 entity.id = store.next_entity_id();
1922 entity.set_logical_id(logical_id);
1923 entity.set_xmin(xid);
1924 entity.set_xmax(0);
1925 if let Some(old) = replaced_entity.as_mut() {
1926 old.set_xmax(xid);
1927 }
1928 }
1929
1930 modified_columns = dedupe_modified_columns(modified_columns);
1931
1932 Ok(AppliedEntityMutation {
1933 id: entity.id,
1934 collection,
1935 entity,
1936 metadata: patch_metadata,
1937 modified_columns,
1938 persist_metadata: metadata_changed,
1939 context_index_dirty,
1940 replaced_entity,
1941 replaced_entity_previous_xmax: previous_xmax,
1942 pre_mutation_fields,
1943 })
1944 }
1945
1946 pub(crate) fn persist_applied_entity_mutations(
1947 &self,
1948 applied: &[AppliedEntityMutation],
1949 ) -> RedDBResult<()> {
1950 if applied.is_empty() {
1951 return Ok(());
1952 }
1953
1954 let store = self.db().store();
1955 let collection = &applied[0].collection;
1956 let Some(manager) = store.get_collection(collection) else {
1957 return Err(crate::RedDBError::NotFound(format!(
1958 "collection not found: {collection}"
1959 )));
1960 };
1961
1962 let mut ordinary = Vec::with_capacity(applied.len());
1963 for item in applied {
1964 if let Some(old_version) = item.replaced_entity.as_ref() {
1965 store
1966 .install_versioned_table_row_update(
1967 collection,
1968 old_version.clone(),
1969 item.entity.clone(),
1970 if item.persist_metadata {
1971 item.metadata.as_ref()
1972 } else {
1973 None
1974 },
1975 )
1976 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1977 if self.current_xid().is_some() {
1978 self.record_pending_versioned_update(
1979 crate::runtime::impl_core::current_connection_id(),
1980 collection,
1981 old_version.id,
1982 item.entity.id,
1983 old_version.xmax,
1984 item.replaced_entity_previous_xmax,
1985 );
1986 }
1987 } else {
1988 ordinary.push(item);
1989 }
1990 }
1991 if ordinary.is_empty() {
1992 return Ok(());
1993 }
1994
1995 manager
1996 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1997 (
1998 &item.entity,
1999 item.modified_columns.as_slice(),
2000 if item.persist_metadata {
2001 item.metadata.as_ref()
2002 } else {
2003 None
2004 },
2005 )
2006 }))
2007 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
2008
2009 let indexed_cols = self
2018 .index_store_ref()
2019 .indexed_columns_set(collection.as_str());
2020 let all_hot = !indexed_cols.is_empty()
2021 && ordinary.iter().all(|item| {
2022 !item.persist_metadata
2023 && !item
2024 .modified_columns
2025 .iter()
2026 .any(|c| indexed_cols.contains(c))
2027 })
2028 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2029
2030 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2034 ordinary.iter().map(|item| &item.entity).collect();
2035 let persist_fn = if all_hot {
2036 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2037 } else {
2038 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2039 };
2040 persist_fn(store.as_ref(), collection, &entity_refs)
2041 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2042 }
2043
2044 pub(crate) fn flush_applied_entity_mutation(
2045 &self,
2046 applied: &AppliedEntityMutation,
2047 ) -> RedDBResult<()> {
2048 let store = self.db().store();
2049 if applied.context_index_dirty {
2050 store
2051 .context_index()
2052 .index_entity(&applied.collection, &applied.entity);
2053 }
2054 let mut changed_columns: Option<Vec<String>> = None;
2062 if !applied.pre_mutation_fields.is_empty() {
2063 let post = entity_row_fields_snapshot(&applied.entity);
2064 if !post.is_empty() {
2065 let damage = crate::application::entity::row_damage_vector(
2066 &applied.pre_mutation_fields,
2067 &post,
2068 );
2069 if !damage.is_empty() {
2070 changed_columns = Some(
2071 damage
2072 .touched_columns()
2073 .into_iter()
2074 .map(str::to_string)
2075 .collect(),
2076 );
2077 }
2078
2079 let indexed_cols: std::collections::HashSet<String> = self
2088 .index_store_ref()
2089 .list_indices(applied.collection.as_str())
2090 .into_iter()
2091 .filter_map(|idx| idx.columns.first().cloned())
2092 .collect();
2093 let modified_cols: std::collections::HashSet<String> = damage
2094 .touched_columns()
2095 .into_iter()
2096 .map(str::to_string)
2097 .collect();
2098 if let Some(old_version) = applied.replaced_entity.as_ref() {
2099 let old_index_fields: Vec<(String, Value)> = applied
2100 .pre_mutation_fields
2101 .iter()
2102 .filter(|(col, _)| indexed_cols.contains(col))
2103 .cloned()
2104 .collect();
2105 let new_index_fields: Vec<(String, Value)> = post
2106 .iter()
2107 .filter(|(col, _)| indexed_cols.contains(col))
2108 .cloned()
2109 .collect();
2110 if !old_index_fields.is_empty() {
2111 self.index_store_ref()
2112 .index_entity_delete(
2113 &applied.collection,
2114 old_version.id,
2115 &old_index_fields,
2116 )
2117 .map_err(crate::RedDBError::Internal)?;
2118 }
2119 if !new_index_fields.is_empty() {
2120 self.index_store_ref()
2121 .index_entity_insert(
2122 &applied.collection,
2123 applied.entity.id,
2124 &new_index_fields,
2125 )
2126 .map_err(crate::RedDBError::Internal)?;
2127 }
2128 } else {
2129 let decision = crate::storage::engine::hot_update::decide(
2130 &crate::storage::engine::hot_update::HotUpdateInputs {
2131 collection: applied.collection.as_str(),
2132 indexed_columns: &indexed_cols,
2133 modified_columns: &modified_cols,
2134 new_tuple_size: 0,
2138 page_free_space: usize::MAX,
2139 },
2140 );
2141 if !decision.can_hot {
2142 self.index_store_ref()
2143 .index_entity_update(
2144 &applied.collection,
2145 applied.id,
2146 &applied.pre_mutation_fields,
2147 &post,
2148 )
2149 .map_err(crate::RedDBError::Internal)?;
2150 } else {
2151 tracing::debug!(
2155 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2156 "hot_update fast-path: skipped index_entity_update"
2157 );
2158 }
2159 }
2160 }
2161 }
2162 self.cdc_emit_prebuilt_with_columns(
2163 crate::replication::cdc::ChangeOperation::Update,
2164 &applied.collection,
2165 &applied.entity,
2166 "entity",
2167 applied.metadata.as_ref(),
2168 true,
2169 changed_columns,
2170 );
2171 Ok(())
2172 }
2173
2174 pub(crate) fn apply_loaded_patch_entity(
2175 &self,
2176 collection: String,
2177 entity: crate::storage::UnifiedEntity,
2178 payload: JsonValue,
2179 operations: Vec<PatchEntityOperation>,
2180 ) -> RedDBResult<CreateEntityOutput> {
2181 let applied =
2182 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2183 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2184 self.flush_applied_entity_mutation(&applied)?;
2185 Ok(CreateEntityOutput {
2186 id: applied.id,
2187 entity: Some(applied.entity),
2188 })
2189 }
2190}
2191
2192fn ensure_non_tree_reserved_metadata_patch_paths(
2193 operations: &[PatchEntityOperation],
2194) -> RedDBResult<()> {
2195 for operation in operations {
2196 let Some(key) = operation.path.first().map(String::as_str) else {
2197 continue;
2198 };
2199 ensure_non_tree_reserved_metadata_key(key)?;
2200 }
2201 Ok(())
2202}
2203
2204fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2205 if key.starts_with(TREE_METADATA_PREFIX) {
2206 return Err(crate::RedDBError::Query(format!(
2207 "metadata key '{}' is reserved for managed trees",
2208 key
2209 )));
2210 }
2211 Ok(())
2212}
2213
2214fn ensure_non_tree_reserved_metadata_entries(
2215 metadata: &[(String, MetadataValue)],
2216) -> RedDBResult<()> {
2217 for (key, _) in metadata {
2218 ensure_non_tree_reserved_metadata_key(key)?;
2219 }
2220 Ok(())
2221}
2222
2223fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2224 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2225 return Err(crate::RedDBError::Query(format!(
2226 "edge label '{}' is reserved for managed trees",
2227 TREE_CHILD_EDGE_LABEL
2228 )));
2229 }
2230 Ok(())
2231}
2232
2233impl RedDBRuntime {
2234 pub(crate) fn create_node_unchecked(
2235 &self,
2236 input: CreateNodeInput,
2237 ) -> RedDBResult<CreateEntityOutput> {
2238 let db = self.db();
2239 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2240 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2241 let mut metadata = input.metadata;
2242 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2243 let mut builder = db.node(&input.collection, &input.label);
2244
2245 if let Some(node_type) = input.node_type {
2246 builder = builder.node_type(node_type);
2247 }
2248
2249 for (key, value) in input.properties {
2250 builder = builder.property(key, value);
2251 }
2252
2253 for (key, value) in metadata {
2254 builder = builder.metadata(key, value);
2255 }
2256
2257 for embedding in input.embeddings {
2258 if let Some(model) = embedding.model {
2259 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2260 } else {
2261 builder = builder.embedding(embedding.name, embedding.vector);
2262 }
2263 }
2264
2265 for link in input.table_links {
2266 builder = builder.link_to_table(link.key, link.table);
2267 }
2268
2269 for link in input.node_links {
2270 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2271 }
2272
2273 let id = builder.save()?;
2274 self.stamp_xmin_if_in_txn(&input.collection, id);
2277 refresh_context_index(&db, &input.collection, id)?;
2278 self.cdc_emit(
2279 crate::replication::cdc::ChangeOperation::Insert,
2280 &input.collection,
2281 id.raw(),
2282 "graph_node",
2283 );
2284 Ok(CreateEntityOutput {
2285 id,
2286 entity: db.store().get(&input.collection, id),
2287 })
2288 }
2289
2290 pub(crate) fn create_edge_unchecked(
2291 &self,
2292 input: CreateEdgeInput,
2293 ) -> RedDBResult<CreateEntityOutput> {
2294 let db = self.db();
2295 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2296 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2297 let mut metadata = input.metadata;
2298 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2299 let mut builder = db
2300 .edge(&input.collection, &input.label)
2301 .from(input.from)
2302 .to(input.to);
2303
2304 if let Some(weight) = input.weight {
2305 builder = builder.weight(weight);
2306 }
2307
2308 for (key, value) in input.properties {
2309 builder = builder.property(key, value);
2310 }
2311
2312 for (key, value) in metadata {
2313 builder = builder.metadata(key, value);
2314 }
2315
2316 let id = builder.save()?;
2317 self.stamp_xmin_if_in_txn(&input.collection, id);
2320 refresh_context_index(&db, &input.collection, id)?;
2321 self.cdc_emit(
2322 crate::replication::cdc::ChangeOperation::Insert,
2323 &input.collection,
2324 id.raw(),
2325 "graph_edge",
2326 );
2327 Ok(CreateEntityOutput {
2328 id,
2329 entity: db.store().get(&input.collection, id),
2330 })
2331 }
2332}
2333
2334fn create_rows_batch_prevalidated_columnar_with_outputs(
2335 runtime: &RedDBRuntime,
2336 collection: String,
2337 column_names: std::sync::Arc<Vec<String>>,
2338 rows: Vec<Vec<crate::storage::schema::Value>>,
2339) -> RedDBResult<Vec<CreateEntityOutput>> {
2340 use crate::storage::{
2341 unified::{EntityData, EntityKind, RowData},
2342 EntityId, UnifiedEntity,
2343 };
2344 use std::sync::Arc;
2345
2346 if rows.is_empty() {
2347 return Ok(Vec::new());
2348 }
2349 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2350 runtime.check_batch_size(rows.len())?;
2351 runtime.check_db_size()?;
2352
2353 let db = runtime.db();
2354 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2355 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2356
2357 let store = db.store();
2358 let table_arc: Arc<str> = Arc::from(collection.as_str());
2359
2360 let indexed_cols = runtime
2361 .index_store_ref()
2362 .indexed_columns_set(collection.as_str());
2363 let has_secondary_indexes = !indexed_cols.is_empty();
2364 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2365 if has_secondary_indexes {
2366 Vec::with_capacity(rows.len())
2367 } else {
2368 Vec::new()
2369 };
2370
2371 let entities: Vec<UnifiedEntity> = rows
2372 .into_iter()
2373 .map(|values| {
2374 if has_secondary_indexes {
2375 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2376 Vec::with_capacity(indexed_cols.len());
2377 for (name, val) in column_names.iter().zip(values.iter()) {
2378 if indexed_cols.contains(name) {
2379 snap.push((name.clone(), val.clone()));
2380 }
2381 }
2382 field_snapshots.push(snap);
2383 }
2384 let mut row = RowData::new(values);
2385 row.schema = Some(Arc::clone(&column_names));
2386 UnifiedEntity::new(
2387 EntityId::new(0),
2388 EntityKind::TableRow {
2389 table: Arc::clone(&table_arc),
2390 row_id: 0,
2391 },
2392 EntityData::Row(row),
2393 )
2394 })
2395 .collect();
2396
2397 let ids = store
2398 .bulk_insert(&collection, entities)
2399 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2400
2401 if has_secondary_indexes {
2402 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2403 .iter()
2404 .zip(field_snapshots)
2405 .map(|(id, fields)| (*id, fields))
2406 .collect();
2407 runtime
2408 .index_store_ref()
2409 .index_entity_insert_batch(&collection, &index_rows)
2410 .map_err(crate::RedDBError::Internal)?;
2411 }
2412
2413 runtime.invalidate_result_cache();
2414 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2415
2416 Ok(ids
2417 .into_iter()
2418 .map(|id| CreateEntityOutput { id, entity: None })
2419 .collect())
2420}
2421
2422impl RuntimeEntityPort for RedDBRuntime {
2423 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2424 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2425 let db = self.db();
2426 let CreateRowInput {
2427 collection,
2428 fields,
2429 metadata: input_metadata,
2430 node_links,
2431 vector_links,
2432 } = input;
2433 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2434 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2435 let mut metadata = input_metadata;
2436 apply_collection_default_ttl(&db, &collection, &mut metadata);
2437 let fields = contract.normalize_insert_fields(fields)?;
2438 contract.enforce_row_uniqueness(&fields, None)?;
2439 let engine = self.mutation_engine();
2441 let result = engine.apply(
2442 collection.clone(),
2443 vec![crate::runtime::mutation::MutationRow {
2444 fields,
2445 metadata,
2446 node_links,
2447 vector_links,
2448 }],
2449 )?;
2450 let id = result.ids[0];
2451 Ok(CreateEntityOutput {
2456 id,
2457 entity: db.store().get(&collection, id),
2458 })
2459 }
2460
2461 fn create_rows_batch(
2462 &self,
2463 input: CreateRowsBatchInput,
2464 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2465 if input.rows.is_empty() {
2466 return Ok(Vec::new());
2467 }
2468 self.check_batch_size(input.rows.len())?;
2469 self.check_db_size()?;
2470
2471 let db = self.db();
2472 let collection = input.collection;
2473 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2474 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2475
2476 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2477 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2478 for row in input.rows {
2479 if row.collection != collection {
2480 return Err(crate::RedDBError::Query(format!(
2481 "batch row collection mismatch: expected '{}', got '{}'",
2482 collection, row.collection
2483 )));
2484 }
2485
2486 let mut metadata = row.metadata;
2487 apply_collection_default_ttl(&db, &collection, &mut metadata);
2488 let fields = contract.normalize_insert_fields(row.fields)?;
2489 contract.enforce_row_uniqueness(&fields, None)?;
2490 uniqueness_rows.push(fields.clone());
2491 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2492 }
2493
2494 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2495
2496 let engine = {
2499 let e = self.mutation_engine();
2500 if input.suppress_events {
2501 e.with_suppress_events()
2502 } else {
2503 e
2504 }
2505 };
2506 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2507 .into_iter()
2508 .map(|(fields, metadata, node_links, vector_links)| {
2509 crate::runtime::mutation::MutationRow {
2510 fields,
2511 metadata,
2512 node_links,
2513 vector_links,
2514 }
2515 })
2516 .collect();
2517
2518 let result = engine
2519 .apply(collection.clone(), mutation_rows)
2520 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2521
2522 let store = db.store();
2523 Ok(result
2524 .ids
2525 .into_iter()
2526 .map(|id| CreateEntityOutput {
2527 id,
2528 entity: store.get(&collection, id),
2529 })
2530 .collect())
2531 }
2532
2533 fn create_rows_batch_prevalidated_columnar(
2534 &self,
2535 collection: String,
2536 column_names: std::sync::Arc<Vec<String>>,
2537 rows: Vec<Vec<crate::storage::schema::Value>>,
2538 ) -> RedDBResult<usize> {
2539 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2540 .map(|outputs| outputs.len())
2541 }
2542
2543 fn create_rows_batch_columnar(
2544 &self,
2545 collection: String,
2546 column_names: std::sync::Arc<Vec<String>>,
2547 rows: Vec<Vec<crate::storage::schema::Value>>,
2548 ) -> RedDBResult<usize> {
2549 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2550 .map(|outputs| outputs.len())
2551 }
2552
2553 fn create_rows_batch_columnar_with_outputs(
2554 &self,
2555 collection: String,
2556 column_names: std::sync::Arc<Vec<String>>,
2557 rows: Vec<Vec<crate::storage::schema::Value>>,
2558 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2559 if rows.is_empty() {
2560 return Ok(Vec::new());
2561 }
2562 self.check_batch_size(rows.len())?;
2563 self.check_db_size()?;
2564
2565 let db = self.db();
2566 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2567 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2568
2569 let needs_normalisation = match db.collection_contract(&collection) {
2579 Some(c) => {
2580 c.declared_model == crate::catalog::CollectionModel::Table
2581 && (!c.declared_columns.is_empty()
2582 || c.table_def
2583 .as_ref()
2584 .map(|t| !t.columns.is_empty())
2585 .unwrap_or(false))
2586 }
2587 None => false,
2588 };
2589 if !needs_normalisation {
2590 return create_rows_batch_prevalidated_columnar_with_outputs(
2591 self,
2592 collection,
2593 column_names,
2594 rows,
2595 );
2596 }
2597
2598 let ncols = column_names.len();
2605 let tuple_rows: Vec<CreateRowInput> = rows
2606 .into_iter()
2607 .map(|values| {
2608 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2609 Vec::with_capacity(ncols);
2610 for (name, value) in column_names.iter().zip(values) {
2611 fields.push((name.clone(), value));
2612 }
2613 CreateRowInput {
2614 collection: collection.clone(),
2615 fields,
2616 metadata: Vec::new(),
2617 node_links: Vec::new(),
2618 vector_links: Vec::new(),
2619 }
2620 })
2621 .collect();
2622 self.create_rows_batch(CreateRowsBatchInput {
2623 collection,
2624 rows: tuple_rows,
2625 suppress_events: false,
2626 })
2627 }
2628
2629 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2630 if input.rows.is_empty() {
2631 return Ok(0);
2632 }
2633 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2634 self.check_batch_size(input.rows.len())?;
2635 self.check_db_size()?;
2636
2637 let db = self.db();
2638 let collection = input.collection;
2639 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2644 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2645
2646 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2652
2653 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2654 Vec::with_capacity(input.rows.len());
2655 let mut mutation_rows = mutation_rows;
2656 for row in input.rows {
2657 if row.collection != collection {
2658 return Err(crate::RedDBError::Query(format!(
2659 "batch row collection mismatch: expected '{}', got '{}'",
2660 collection, row.collection
2661 )));
2662 }
2663 let mut metadata = row.metadata;
2664 if let Some(ttl) = default_ttl_ms {
2665 if !has_internal_ttl_metadata(&metadata) {
2666 metadata.push((
2667 "_ttl_ms".to_string(),
2668 if ttl <= i64::MAX as u64 {
2669 MetadataValue::Int(ttl as i64)
2670 } else {
2671 MetadataValue::Timestamp(ttl)
2672 },
2673 ));
2674 }
2675 }
2676 mutation_rows.push(crate::runtime::mutation::MutationRow {
2677 fields: row.fields,
2678 metadata,
2679 node_links: row.node_links,
2680 vector_links: row.vector_links,
2681 });
2682 }
2683
2684 let engine = self.mutation_engine();
2685 let result = engine
2686 .apply(collection, mutation_rows)
2687 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2688 Ok(result.ids.len())
2689 }
2690
2691 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2692 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2693 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2694 input.properties.iter().map(|(key, _)| key.as_str()),
2695 &format!("node '{}'", input.collection),
2696 )?;
2697 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2698 self.create_node_unchecked(input)
2699 }
2700
2701 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2702 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2703 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2704 input.properties.iter().map(|(key, _)| key.as_str()),
2705 &format!("edge '{}'", input.collection),
2706 )?;
2707 ensure_non_tree_structural_edge_label(&input.label)?;
2708 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2709 self.create_edge_unchecked(input)
2710 }
2711
2712 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2713 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2714 let db = self.db();
2715 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2716 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2717 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2718 let mut metadata = input.metadata;
2719 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2720 let mut builder = db.vector(&input.collection).dense(input.dense);
2721
2722 if let Some(content) = input.content {
2723 builder = builder.content(content);
2724 }
2725
2726 for (key, value) in metadata {
2727 builder = builder.metadata(key, value);
2728 }
2729
2730 if let Some(link_row) = input.link_row {
2731 builder = builder.link_to_table(link_row);
2732 }
2733
2734 if let Some(link_node) = input.link_node {
2735 builder = builder.link_to_node(link_node);
2736 }
2737
2738 let id = builder.save()?;
2739 let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2740 Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2741 _ => None,
2742 };
2743 self.stamp_xmin_if_in_txn(&input.collection, id);
2746 refresh_context_index(&db, &input.collection, id)?;
2747 if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2753 state.ensure_populated(&db.store(), &input.collection);
2757 use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2759 fire(InjectionPoint::BeforeWalFsync);
2760 let _ = db
2761 .store()
2762 .append_vector_insert_record(&input.collection, id.raw(), &dense);
2763 fire(InjectionPoint::BeforeIndexCommit);
2764 let mut index = state.index.lock();
2765 index.insert(id, dense.clone());
2766 fire(InjectionPoint::BeforeExtentFsync);
2773 if let Some(extent) = state.extent.lock().as_mut() {
2774 let packed = index.encode_for_extent(&dense);
2775 let _ = extent.append(&packed);
2776 }
2777 }
2778 self.cdc_emit(
2779 crate::replication::cdc::ChangeOperation::Insert,
2780 &input.collection,
2781 id.raw(),
2782 "vector",
2783 );
2784 Ok(CreateEntityOutput {
2785 id,
2786 entity: db.store().get(&input.collection, id),
2787 })
2788 }
2789
2790 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2791 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2792 let db = self.db();
2793 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2794 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2795
2796 if let JsonValue::Object(ref map) = input.body {
2797 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2798 map.keys().map(String::as_str),
2799 &format!("document '{}'", input.collection),
2800 )?;
2801 }
2802
2803 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2805 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2806 })?;
2807 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2808 "body".to_string(),
2809 crate::storage::schema::Value::Json(body_bytes),
2810 )];
2811
2812 if let JsonValue::Object(ref map) = input.body {
2814 for (key, value) in map {
2815 let storage_value = json_to_storage_value(value)?;
2816 fields.push((key.clone(), storage_value));
2817 }
2818 }
2819
2820 let mut metadata = input.metadata;
2821 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2822 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2823 .iter()
2824 .map(|(key, value)| (key.as_str(), value.clone()))
2825 .collect();
2826 let mut builder = db.row(&input.collection, columns);
2827
2828 for (key, value) in metadata {
2829 builder = builder.metadata(key, value);
2830 }
2831
2832 for node in input.node_links {
2833 builder = builder.link_to_node(node);
2834 }
2835
2836 for vector in input.vector_links {
2837 builder = builder.link_to_vector(vector);
2838 }
2839
2840 let id = builder.save()?;
2841 self.stamp_xmin_if_in_txn(&input.collection, id);
2843 refresh_context_index(&db, &input.collection, id)?;
2844 self.cdc_emit(
2845 crate::replication::cdc::ChangeOperation::Insert,
2846 &input.collection,
2847 id.raw(),
2848 "document",
2849 );
2850 Ok(CreateEntityOutput {
2851 id,
2852 entity: db.store().get(&input.collection, id),
2853 })
2854 }
2855
2856 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2857 let db = self.db();
2858 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2859 let declared_model = db
2860 .collection_contract(&input.collection)
2861 .map(|contract| contract.declared_model);
2862 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2863 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2864 self.seal_vault_value(&input.collection, input.value)?
2865 } else {
2866 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2867 input.value
2868 };
2869 let fields = vec![
2870 (
2871 "key".to_string(),
2872 crate::storage::schema::Value::text(input.key),
2873 ),
2874 ("value".to_string(), value),
2875 ];
2876 let collection = input.collection;
2877 let result = self.mutation_engine().apply(
2878 collection.clone(),
2879 vec![crate::runtime::mutation::MutationRow {
2880 fields,
2881 metadata: input.metadata,
2882 node_links: Vec::new(),
2883 vector_links: Vec::new(),
2884 }],
2885 )?;
2886 let id = result.ids[0];
2887 Ok(CreateEntityOutput {
2888 id,
2889 entity: db.store().get(&collection, id),
2890 })
2891 }
2892
2893 fn create_timeseries_point(
2894 &self,
2895 input: CreateTimeSeriesPointInput,
2896 ) -> RedDBResult<CreateEntityOutput> {
2897 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2898 let db = self.db();
2899 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2900 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2901
2902 let mut fields = vec![
2903 (
2904 "metric".to_string(),
2905 crate::storage::schema::Value::text(input.metric),
2906 ),
2907 (
2908 "value".to_string(),
2909 crate::storage::schema::Value::Float(input.value),
2910 ),
2911 ];
2912
2913 if let Some(timestamp_ns) = input.timestamp_ns {
2914 fields.push((
2915 "timestamp".to_string(),
2916 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2917 ));
2918 }
2919
2920 if !input.tags.is_empty() {
2921 let tags_json = JsonValue::Object(
2922 input
2923 .tags
2924 .into_iter()
2925 .map(|(key, value)| (key, JsonValue::String(value)))
2926 .collect(),
2927 );
2928 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2929 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2930 })?;
2931 fields.push((
2932 "tags".to_string(),
2933 crate::storage::schema::Value::Json(tags_bytes),
2934 ));
2935 }
2936
2937 let collection = input.collection;
2938 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2939 self.stamp_xmin_if_in_txn(&collection, id);
2942 refresh_context_index(&db, &collection, id)?;
2943
2944 Ok(CreateEntityOutput {
2945 id,
2946 entity: db.store().get(&collection, id),
2947 })
2948 }
2949
2950 fn get_kv(
2951 &self,
2952 collection: &str,
2953 key: &str,
2954 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2955 let db = self.db();
2956 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2957 let store = db.store();
2958 let Some(manager) = store.get_collection(collection) else {
2959 return Ok(None);
2960 };
2961 let entities = manager.query_all(|_| true);
2962 for entity in entities {
2963 if let crate::storage::EntityData::Row(ref row) = entity.data {
2964 if let Some(ref named) = row.named {
2965 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2966 if &**k == key {
2967 let value = named
2968 .get("value")
2969 .cloned()
2970 .unwrap_or(crate::storage::schema::Value::Null);
2971 return Ok(Some((value, entity.id)));
2972 }
2973 }
2974 }
2975 }
2976 }
2977 Ok(None)
2978 }
2979
2980 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2981 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2982 let found = self.get_kv(collection, key)?;
2983 if let Some((_, id)) = found {
2984 let db = self.db();
2985 let store = db.store();
2986 let deleted = store
2987 .delete(collection, id)
2988 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2989 if deleted {
2990 store.context_index().remove_entity(id);
2991 }
2992 Ok(deleted)
2993 } else {
2994 Ok(false)
2995 }
2996 }
2997
2998 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2999 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3000 let PatchEntityInput {
3001 collection,
3002 id,
3003 payload,
3004 operations,
3005 } = input;
3006 let db = self.db();
3007 let store = db.store();
3008 let Some(manager) = store.get_collection(&collection) else {
3009 return Err(crate::RedDBError::NotFound(format!(
3010 "collection not found: {collection}"
3011 )));
3012 };
3013 let Some(entity) = manager.get(id) else {
3014 return Err(crate::RedDBError::NotFound(format!(
3015 "entity not found: {}",
3016 id.raw()
3017 )));
3018 };
3019 self.apply_loaded_patch_entity(collection, entity, payload, operations)
3020 }
3021
3022 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3023 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3024 let store = self.db().store();
3025 let pre_delete_fields = store
3029 .get(&input.collection, input.id)
3030 .as_ref()
3031 .map(entity_row_fields_snapshot)
3032 .unwrap_or_default();
3033 let deleted = store
3037 .delete(&input.collection, input.id)
3038 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3039 if deleted {
3040 store.context_index().remove_entity(input.id);
3041 if !pre_delete_fields.is_empty() {
3044 self.index_store_ref()
3045 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3046 .map_err(crate::RedDBError::Internal)?;
3047 }
3048 self.cdc_emit(
3049 crate::replication::cdc::ChangeOperation::Delete,
3050 &input.collection,
3051 input.id.raw(),
3052 "entity",
3053 );
3054 }
3055 Ok(DeleteEntityOutput {
3056 deleted,
3057 id: input.id,
3058 })
3059 }
3060}