1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 vector_dimension: None,
112 vector_metric: None,
113 context_index_fields: Vec::new(),
114 declared_columns: Vec::new(),
115 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117 timestamps_enabled: false,
118 context_index_enabled: false,
119 append_only: false,
122 subscriptions: Vec::new(),
123 })
124 .map(|_| ())
125 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
126}
127
128fn implicit_contract_unix_ms() -> u128 {
129 std::time::SystemTime::now()
130 .duration_since(std::time::UNIX_EPOCH)
131 .unwrap_or_default()
132 .as_millis()
133}
134
135fn collection_model_allows(
136 declared_model: crate::catalog::CollectionModel,
137 requested_model: crate::catalog::CollectionModel,
138) -> bool {
139 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
140}
141
142fn ensure_vector_dimension_contract(
143 db: &crate::storage::unified::devx::RedDB,
144 collection: &str,
145 actual_dimension: usize,
146) -> RedDBResult<()> {
147 let Some(expected_dimension) = db
148 .collection_contract(collection)
149 .and_then(|contract| contract.vector_dimension)
150 else {
151 return Ok(());
152 };
153 if expected_dimension == actual_dimension {
154 return Ok(());
155 }
156 Err(crate::RedDBError::Query(format!(
157 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
158 )))
159}
160
161fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
162 match model {
163 crate::catalog::CollectionModel::Table => "table",
164 crate::catalog::CollectionModel::Document => "document",
165 crate::catalog::CollectionModel::Graph => "graph",
166 crate::catalog::CollectionModel::Vector => "vector",
167 crate::catalog::CollectionModel::Hll => "hll",
168 crate::catalog::CollectionModel::Sketch => "sketch",
169 crate::catalog::CollectionModel::Filter => "filter",
170 crate::catalog::CollectionModel::Kv => "kv",
171 crate::catalog::CollectionModel::Config => "config",
172 crate::catalog::CollectionModel::Vault => "vault",
173 crate::catalog::CollectionModel::Mixed => "mixed",
174 crate::catalog::CollectionModel::TimeSeries => "timeseries",
175 crate::catalog::CollectionModel::Queue => "queue",
176 }
177}
178
179#[derive(Clone)]
180struct UniquenessRule {
181 name: String,
182 columns: Vec<String>,
183 primary_key: bool,
184}
185
186#[derive(Copy, Clone, PartialEq, Eq)]
187enum NormalizeMode {
188 Insert,
192 Update,
196}
197
198mod collection_contract_enforcement {
199 use super::*;
200
201 pub(super) struct CollectionContractWriteEnforcer<'a> {
202 db: &'a crate::storage::unified::devx::RedDB,
203 collection: &'a str,
204 }
205
206 impl<'a> CollectionContractWriteEnforcer<'a> {
207 pub(super) fn new(
208 db: &'a crate::storage::unified::devx::RedDB,
209 collection: &'a str,
210 ) -> Self {
211 Self { db, collection }
212 }
213
214 pub(super) fn ensure_model(
215 &self,
216 requested_model: crate::catalog::CollectionModel,
217 ) -> RedDBResult<()> {
218 ensure_collection_model_contract(self.db, self.collection, requested_model)
219 }
220
221 pub(super) fn normalize_insert_fields(
222 &self,
223 fields: Vec<(String, Value)>,
224 ) -> RedDBResult<Vec<(String, Value)>> {
225 normalize_row_fields_for_contract_with_mode(
226 self.db,
227 self.collection,
228 fields,
229 NormalizeMode::Insert,
230 )
231 }
232
233 pub(super) fn normalize_update_fields(
234 &self,
235 fields: Vec<(String, Value)>,
236 ) -> RedDBResult<Vec<(String, Value)>> {
237 normalize_row_fields_for_contract_with_mode(
238 self.db,
239 self.collection,
240 fields,
241 NormalizeMode::Update,
242 )
243 }
244
245 pub(super) fn enforce_row_uniqueness(
246 &self,
247 fields: &[(String, Value)],
248 exclude_id: Option<crate::storage::EntityId>,
249 ) -> RedDBResult<()> {
250 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
251 }
252
253 pub(super) fn enforce_batch_uniqueness(
254 &self,
255 rows: &[Vec<(String, Value)>],
256 ) -> RedDBResult<()> {
257 enforce_row_batch_uniqueness(self.db, self.collection, rows)
258 }
259
260 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
261 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
262 }
263 }
264}
265
266use collection_contract_enforcement::CollectionContractWriteEnforcer;
267
268fn normalize_row_fields_for_contract_with_mode(
269 db: &crate::storage::unified::devx::RedDB,
270 collection: &str,
271 fields: Vec<(String, Value)>,
272 mode: NormalizeMode,
273) -> RedDBResult<Vec<(String, Value)>> {
274 let Some(contract) = db.collection_contract(collection) else {
275 return Ok(fields);
276 };
277
278 if contract.declared_model != crate::catalog::CollectionModel::Table
279 || (contract.declared_columns.is_empty()
280 && contract
281 .table_def
282 .as_ref()
283 .map(|table| table.columns.is_empty())
284 .unwrap_or(true))
285 {
286 return Ok(fields);
287 }
288
289 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
299 fields
300 .iter()
301 .find(|(n, _)| n == "created_at")
302 .map(|(_, v)| v.clone())
303 } else {
304 None
305 };
306
307 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
312 for (name, _) in &fields {
313 if name == "created_at" || name == "updated_at" {
314 return Err(crate::RedDBError::Query(format!(
315 "collection '{}' manages '{}' automatically — do not set it in INSERT",
316 collection, name
317 )));
318 }
319 }
320 }
321
322 let mut provided = std::collections::BTreeMap::new();
323 for (name, value) in &fields {
324 provided.insert(name.clone(), value.clone());
325 }
326
327 let resolved_columns = resolved_contract_columns(&contract)?;
328 let declared_names: std::collections::BTreeSet<String> = resolved_columns
329 .iter()
330 .map(|column| column.name.clone())
331 .collect();
332 let unknown_fields: Vec<String> = fields
333 .iter()
334 .filter(|(name, _)| !declared_names.contains(name))
335 .map(|(name, _)| name.clone())
336 .collect();
337 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
338 && !unknown_fields.is_empty()
339 {
340 return Err(crate::RedDBError::Query(format!(
341 "collection '{}' is strict and does not allow undeclared fields: {}",
342 collection,
343 unknown_fields.join(", ")
344 )));
345 }
346 let mut normalized = Vec::new();
347 let now_ms = current_unix_ms_u64();
348
349 for column in &resolved_columns {
350 match provided.remove(&column.name) {
351 Some(value) => {
352 if contract.timestamps_enabled && mode == NormalizeMode::Update {
357 match column.name.as_str() {
358 "created_at" => {
359 normalized.push((
360 column.name.clone(),
361 existing_created_at
362 .clone()
363 .unwrap_or(Value::UnsignedInteger(now_ms)),
364 ));
365 continue;
366 }
367 "updated_at" => {
368 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
369 continue;
370 }
371 _ => {}
372 }
373 }
374 normalized.push((
375 column.name.clone(),
376 normalize_contract_value(collection, column, value)?,
377 ));
378 }
379 None => {
380 if contract.timestamps_enabled
384 && (column.name == "created_at" || column.name == "updated_at")
385 {
386 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
387 continue;
388 }
389 if let Some(default) = &column.default {
390 normalized.push((
391 column.name.clone(),
392 coerce_contract_literal(collection, &column.name, column, default)?,
393 ));
394 } else if column.not_null {
395 return Err(crate::RedDBError::Query(format!(
396 "missing required column '{}' for collection '{}'",
397 column.name, collection
398 )));
399 }
400 }
401 }
402 }
403
404 for (name, value) in fields {
405 if !declared_names.contains(&name) {
406 normalized.push((name, value));
407 }
408 }
409
410 Ok(normalized)
411}
412
413fn current_unix_ms_u64() -> u64 {
414 std::time::SystemTime::now()
415 .duration_since(std::time::UNIX_EPOCH)
416 .map(|d| d.as_millis() as u64)
417 .unwrap_or(0)
418}
419
420fn enforce_row_uniqueness(
421 db: &crate::storage::unified::devx::RedDB,
422 collection: &str,
423 fields: &[(String, Value)],
424 exclude_id: Option<crate::storage::EntityId>,
425) -> RedDBResult<()> {
426 let Some(contract) = db.collection_contract(collection) else {
427 return Ok(());
428 };
429 if contract.declared_model != crate::catalog::CollectionModel::Table
430 && contract.declared_model != crate::catalog::CollectionModel::Mixed
431 {
432 return Ok(());
433 }
434
435 let rules = resolved_uniqueness_rules(&contract);
436 if rules.is_empty() {
437 return Ok(());
438 }
439
440 let store = db.store();
441 let Some(manager) = store.get_collection(collection) else {
442 return Ok(());
443 };
444
445 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
446
447 for rule in &rules {
448 let mut expected_signatures = Vec::new();
449 let mut skip_rule = false;
450
451 for column in &rule.columns {
452 match input_fields.get(column) {
453 Some(Value::Null) | None if rule.primary_key => {
454 return Err(crate::RedDBError::Query(format!(
455 "primary key '{}' in collection '{}' requires non-null column '{}'",
456 rule.name, collection, column
457 )))
458 }
459 Some(Value::Null) | None => {
460 skip_rule = true;
461 break;
462 }
463 Some(value) => {
464 expected_signatures.push((column.clone(), value_signature(value)));
465 }
466 }
467 }
468
469 if skip_rule {
470 continue;
471 }
472
473 for entity in manager.query_all(|_| true) {
474 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
475 continue;
476 }
477 let Some(existing_fields) = row_fields_from_entity(&entity) else {
478 continue;
479 };
480
481 let duplicate = expected_signatures.iter().all(|(column, expected)| {
482 existing_fields
483 .get(column)
484 .map(|value| value_signature(value) == *expected)
485 .unwrap_or(false)
486 });
487
488 if duplicate {
489 let qualifier = if rule.primary_key {
490 "primary key"
491 } else {
492 "unique constraint"
493 };
494 return Err(crate::RedDBError::Query(format!(
495 "{} '{}' violated on collection '{}' for columns [{}]",
496 qualifier,
497 rule.name,
498 collection,
499 rule.columns.join(", ")
500 )));
501 }
502 }
503 }
504
505 Ok(())
506}
507
508fn enforce_row_batch_uniqueness(
509 db: &crate::storage::unified::devx::RedDB,
510 collection: &str,
511 rows: &[Vec<(String, Value)>],
512) -> RedDBResult<()> {
513 let Some(contract) = db.collection_contract(collection) else {
514 return Ok(());
515 };
516 if contract.declared_model != crate::catalog::CollectionModel::Table
517 && contract.declared_model != crate::catalog::CollectionModel::Mixed
518 {
519 return Ok(());
520 }
521
522 let rules = resolved_uniqueness_rules(&contract);
523 if rules.is_empty() {
524 return Ok(());
525 }
526
527 for rule in &rules {
528 let mut seen = std::collections::HashMap::<String, usize>::new();
529 for (row_index, fields) in rows.iter().enumerate() {
530 let input_fields: std::collections::BTreeMap<String, Value> =
531 fields.iter().cloned().collect();
532 let mut signatures = Vec::new();
533 let mut skip_rule = false;
534
535 for column in &rule.columns {
536 match input_fields.get(column) {
537 Some(Value::Null) | None if rule.primary_key => {
538 return Err(crate::RedDBError::Query(format!(
539 "primary key '{}' in collection '{}' requires non-null column '{}'",
540 rule.name, collection, column
541 )))
542 }
543 Some(Value::Null) | None => {
544 skip_rule = true;
545 break;
546 }
547 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
548 }
549 }
550
551 if skip_rule {
552 continue;
553 }
554
555 let signature = signatures.join("|");
556 if let Some(previous_index) = seen.insert(signature, row_index) {
557 return Err(crate::RedDBError::Query(format!(
558 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
559 rule.name,
560 collection,
561 previous_index + 1,
562 row_index + 1
563 )));
564 }
565 }
566 }
567
568 Ok(())
569}
570
571fn row_update_requires_uniqueness_check(
572 db: &crate::storage::unified::devx::RedDB,
573 collection: &str,
574 modified_columns: &[String],
575) -> bool {
576 if modified_columns.is_empty() {
577 return false;
578 }
579
580 let Some(contract) = db.collection_contract(collection) else {
581 return false;
582 };
583 if contract.declared_model != crate::catalog::CollectionModel::Table
584 && contract.declared_model != crate::catalog::CollectionModel::Mixed
585 {
586 return false;
587 }
588
589 let rules = resolved_uniqueness_rules(&contract);
590 if rules.is_empty() {
591 return false;
592 }
593
594 rules.iter().any(|rule| {
595 rule.columns.iter().any(|column| {
596 modified_columns
597 .iter()
598 .any(|modified| modified.eq_ignore_ascii_case(column))
599 })
600 })
601}
602
603pub(crate) fn build_row_update_contract_plan(
604 db: &crate::storage::unified::devx::RedDB,
605 collection: &str,
606) -> RedDBResult<Option<RowUpdateContractPlan>> {
607 let Some(contract) = db.collection_contract(collection) else {
608 return Ok(None);
609 };
610
611 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
612 && !(contract.declared_columns.is_empty()
613 && contract
614 .table_def
615 .as_ref()
616 .map(|table| table.columns.is_empty())
617 .unwrap_or(true))
618 {
619 resolved_contract_columns(&contract)?
620 .into_iter()
621 .map(|rule| {
622 (
623 rule.name.clone(),
624 RowUpdateColumnRule {
625 name: rule.name,
626 data_type: rule.data_type,
627 data_type_name: rule.data_type_name,
628 not_null: rule.not_null,
629 enum_variants: rule.enum_variants,
630 },
631 )
632 })
633 .collect()
634 } else {
635 HashMap::new()
636 };
637
638 let unique_columns = if matches!(
639 contract.declared_model,
640 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
641 ) {
642 resolved_uniqueness_rules(&contract)
643 .into_iter()
644 .flat_map(|rule| rule.columns.into_iter())
645 .map(|column| (column, ()))
646 .collect()
647 } else {
648 HashMap::new()
649 };
650
651 Ok(Some(RowUpdateContractPlan {
652 timestamps_enabled: contract.timestamps_enabled,
653 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
654 declared_rules,
655 unique_columns,
656 }))
657}
658
659pub(crate) fn normalize_row_update_assignment_with_plan(
660 collection: &str,
661 column: &str,
662 value: Value,
663 row_contract_plan: Option<&RowUpdateContractPlan>,
664) -> RedDBResult<Value> {
665 let Some(plan) = row_contract_plan else {
666 return Ok(value);
667 };
668
669 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
670 return Err(crate::RedDBError::Query(format!(
671 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
672 collection, column
673 )));
674 }
675
676 if let Some(rule) = plan.declared_rules.get(column) {
677 let rule = ResolvedColumnRule {
678 name: rule.name.clone(),
679 data_type: rule.data_type,
680 data_type_name: rule.data_type_name.clone(),
681 not_null: rule.not_null,
682 default: None,
683 enum_variants: rule.enum_variants.clone(),
684 };
685 normalize_contract_value(collection, &rule, value)
686 } else if plan.strict_schema {
687 Err(crate::RedDBError::Query(format!(
688 "collection '{}' is strict and does not allow undeclared fields: {}",
689 collection, column
690 )))
691 } else {
692 Ok(value)
693 }
694}
695
696pub(crate) fn normalize_row_update_value_for_rule(
697 collection: &str,
698 value: Value,
699 row_rule: Option<&RowUpdateColumnRule>,
700) -> RedDBResult<Value> {
701 let Some(rule) = row_rule else {
702 return Ok(value);
703 };
704
705 let rule = ResolvedColumnRule {
706 name: rule.name.clone(),
707 data_type: rule.data_type,
708 data_type_name: rule.data_type_name.clone(),
709 not_null: rule.not_null,
710 default: None,
711 enum_variants: rule.enum_variants.clone(),
712 };
713 normalize_contract_value(collection, &rule, value)
714}
715
716fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
717 if let Some(named) = row.named.as_mut() {
718 named.insert(name.to_string(), value);
719 return;
720 }
721
722 if let Some(schema) = row.schema.as_ref() {
723 if let Some(index) = schema.iter().position(|column| column == name) {
724 if let Some(slot) = row.columns.get_mut(index) {
725 *slot = value;
726 return;
727 }
728 }
729
730 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
731 for (column, current) in schema.iter().zip(row.columns.iter()) {
732 named.insert(column.clone(), current.clone());
733 }
734 named.insert(name.to_string(), value);
735 row.named = Some(named);
736 return;
737 }
738
739 let mut named = HashMap::with_capacity(1);
740 named.insert(name.to_string(), value);
741 row.named = Some(named);
742}
743
744fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
745 if let Some(named) = row.named.as_ref() {
746 named
747 .iter()
748 .map(|(key, value)| (key.clone(), value.clone()))
749 .collect()
750 } else if let Some(schema) = row.schema.as_ref() {
751 schema
752 .iter()
753 .cloned()
754 .zip(row.columns.iter().cloned())
755 .collect()
756 } else {
757 Vec::new()
758 }
759}
760
761fn apply_row_field_assignments_raw<I>(
762 row: &mut crate::storage::unified::entity::RowData,
763 field_assignments: I,
764) where
765 I: IntoIterator<Item = (String, Value)>,
766{
767 for (column, value) in field_assignments {
768 set_row_field(row, &column, value);
769 }
770}
771
772fn apply_row_field_assignments_incremental<I>(
773 collection: &str,
774 row: &mut crate::storage::unified::entity::RowData,
775 field_assignments: I,
776 row_contract_plan: Option<&RowUpdateContractPlan>,
777) -> RedDBResult<()>
778where
779 I: IntoIterator<Item = (String, Value)>,
780{
781 for (column, value) in field_assignments {
782 let value = normalize_row_update_assignment_with_plan(
783 collection,
784 &column,
785 value,
786 row_contract_plan,
787 )?;
788
789 set_row_field(row, &column, value);
790 }
791
792 Ok(())
793}
794
795fn resolved_uniqueness_rules(
796 contract: &crate::physical::CollectionContract,
797) -> Vec<UniquenessRule> {
798 let mut rules = Vec::new();
799
800 if let Some(table_def) = &contract.table_def {
801 if !table_def.primary_key.is_empty() {
802 rules.push(UniquenessRule {
803 name: "primary_key".to_string(),
804 columns: table_def.primary_key.clone(),
805 primary_key: true,
806 });
807 }
808
809 for constraint in &table_def.constraints {
810 if matches!(
811 constraint.constraint_type,
812 crate::storage::schema::ConstraintType::PrimaryKey
813 ) && !constraint.columns.is_empty()
814 {
815 rules.push(UniquenessRule {
816 name: constraint.name.clone(),
817 columns: constraint.columns.clone(),
818 primary_key: true,
819 });
820 } else if matches!(
821 constraint.constraint_type,
822 crate::storage::schema::ConstraintType::Unique
823 ) && !constraint.columns.is_empty()
824 {
825 rules.push(UniquenessRule {
826 name: constraint.name.clone(),
827 columns: constraint.columns.clone(),
828 primary_key: false,
829 });
830 }
831 }
832 } else {
833 for column in &contract.declared_columns {
834 if column.primary_key {
835 rules.push(UniquenessRule {
836 name: format!("pk_{}", column.name),
837 columns: vec![column.name.clone()],
838 primary_key: true,
839 });
840 } else if column.unique {
841 rules.push(UniquenessRule {
842 name: format!("uniq_{}", column.name),
843 columns: vec![column.name.clone()],
844 primary_key: false,
845 });
846 }
847 }
848 }
849
850 let mut dedup = std::collections::BTreeSet::new();
851 rules
852 .into_iter()
853 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
854 .collect()
855}
856
857fn row_fields_from_entity(
858 entity: &crate::storage::UnifiedEntity,
859) -> Option<std::collections::BTreeMap<String, Value>> {
860 match &entity.data {
861 crate::storage::EntityData::Row(row) => {
862 if let Some(named) = &row.named {
863 Some(
864 named
865 .iter()
866 .map(|(key, value)| (key.clone(), value.clone()))
867 .collect(),
868 )
869 } else {
870 row.schema.as_ref().map(|schema| {
871 schema
872 .iter()
873 .cloned()
874 .zip(row.columns.iter().cloned())
875 .collect()
876 })
877 }
878 }
879 _ => None,
880 }
881}
882
883fn value_signature(value: &Value) -> String {
884 format!("{value:?}")
885}
886
887fn normalize_contract_value(
888 collection: &str,
889 column: &ResolvedColumnRule,
890 value: Value,
891) -> RedDBResult<Value> {
892 if matches!(value, Value::Null) {
893 if column.not_null {
894 return Err(crate::RedDBError::Query(format!(
895 "column '{}' in collection '{}' cannot be null",
896 column.name, collection
897 )));
898 }
899 return Ok(Value::Null);
900 }
901
902 let target = column.data_type;
903 if value_matches_declared_type(&value, target) {
904 return Ok(value);
905 }
906
907 let Some(raw) = value_to_coercion_input(&value) else {
908 return Err(crate::RedDBError::Query(format!(
909 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
910 column.name, collection, column.data_type_name
911 )));
912 };
913
914 coerce_contract_literal(collection, &column.name, column, &raw)
915}
916
917fn coerce_contract_literal(
918 collection: &str,
919 column_name: &str,
920 column: &ResolvedColumnRule,
921 raw: &str,
922) -> RedDBResult<Value> {
923 let target = column.data_type;
924 match target {
925 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
926 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
927 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
928 crate::RedDBError::Query(format!(
929 "failed to coerce column '{}' in collection '{}' to '{}': {}",
930 column_name, collection, column.data_type_name, err
931 ))
932 }),
933 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
934 crate::RedDBError::Query(format!(
935 "failed to coerce column '{}' in collection '{}' to '{}': {}",
936 column_name, collection, column.data_type_name, err
937 ))
938 }),
939 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
940 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
941 column_name, collection, column.data_type_name
942 ))),
943 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
944 |err| {
945 crate::RedDBError::Query(format!(
946 "failed to coerce column '{}' in collection '{}' to '{}': {}",
947 column_name, collection, column.data_type_name, err
948 ))
949 },
950 ),
951 }
952}
953
954struct ResolvedColumnRule {
955 name: String,
956 data_type: DataType,
957 data_type_name: String,
958 not_null: bool,
959 default: Option<String>,
960 enum_variants: Vec<String>,
961}
962
963fn resolved_contract_columns(
964 contract: &crate::physical::CollectionContract,
965) -> RedDBResult<Vec<ResolvedColumnRule>> {
966 if let Some(table_def) = &contract.table_def {
967 return Ok(table_def
968 .columns
969 .iter()
970 .map(|column| ResolvedColumnRule {
971 name: column.name.clone(),
972 data_type: column.data_type,
973 data_type_name: data_type_name(column.data_type).to_string(),
974 not_null: !column.nullable,
975 default: column
976 .default
977 .as_ref()
978 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
979 enum_variants: column.enum_variants.clone(),
980 })
981 .collect());
982 }
983
984 contract
985 .declared_columns
986 .iter()
987 .map(|column| {
988 let data_type = column
989 .sql_type
990 .as_ref()
991 .map(crate::storage::query::resolve_sql_type_name)
992 .transpose()
993 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
994 .unwrap_or(parse_declared_data_type(&column.data_type)?);
995 Ok(ResolvedColumnRule {
996 name: column.name.clone(),
997 data_type,
998 data_type_name: column.data_type.clone(),
999 not_null: column.not_null,
1000 default: column.default.clone(),
1001 enum_variants: column.enum_variants.clone(),
1002 })
1003 })
1004 .collect()
1005}
1006
1007fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1008 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1009}
1010
1011fn data_type_name(data_type: DataType) -> &'static str {
1012 match data_type {
1013 DataType::Integer => "integer",
1014 DataType::UnsignedInteger => "unsigned_integer",
1015 DataType::Float => "float",
1016 DataType::Text => "text",
1017 DataType::Blob => "blob",
1018 DataType::Boolean => "boolean",
1019 DataType::Timestamp => "timestamp",
1020 DataType::Duration => "duration",
1021 DataType::IpAddr => "ipaddr",
1022 DataType::MacAddr => "macaddr",
1023 DataType::Vector => "vector",
1024 DataType::Nullable => "nullable",
1025 DataType::Unknown => "unknown",
1026 DataType::Json => "json",
1027 DataType::Uuid => "uuid",
1028 DataType::NodeRef => "noderef",
1029 DataType::EdgeRef => "edgeref",
1030 DataType::VectorRef => "vectorref",
1031 DataType::RowRef => "rowref",
1032 DataType::Color => "color",
1033 DataType::Email => "email",
1034 DataType::Url => "url",
1035 DataType::Phone => "phone",
1036 DataType::Semver => "semver",
1037 DataType::Cidr => "cidr",
1038 DataType::Date => "date",
1039 DataType::Time => "time",
1040 DataType::Decimal => "decimal",
1041 DataType::Enum => "enum",
1042 DataType::Array => "array",
1043 DataType::TimestampMs => "timestamp_ms",
1044 DataType::Ipv4 => "ipv4",
1045 DataType::Ipv6 => "ipv6",
1046 DataType::Subnet => "subnet",
1047 DataType::Port => "port",
1048 DataType::Latitude => "latitude",
1049 DataType::Longitude => "longitude",
1050 DataType::GeoPoint => "geopoint",
1051 DataType::Country2 => "country2",
1052 DataType::Country3 => "country3",
1053 DataType::Lang2 => "lang2",
1054 DataType::Lang5 => "lang5",
1055 DataType::Currency => "currency",
1056 DataType::AssetCode => "asset_code",
1057 DataType::Money => "money",
1058 DataType::ColorAlpha => "color_alpha",
1059 DataType::BigInt => "bigint",
1060 DataType::KeyRef => "keyref",
1061 DataType::DocRef => "docref",
1062 DataType::TableRef => "tableref",
1063 DataType::PageRef => "pageref",
1064 DataType::Secret => "secret",
1065 DataType::Password => "password",
1066 DataType::TextZstd => "text",
1067 DataType::BlobZstd => "blob",
1068 }
1069}
1070
1071fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1072 matches!(
1073 (value, target),
1074 (Value::Null, _)
1075 | (Value::Integer(_), DataType::Integer)
1076 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1077 | (Value::Float(_), DataType::Float)
1078 | (Value::Text(_), DataType::Text)
1079 | (Value::Blob(_), DataType::Blob)
1080 | (Value::Boolean(_), DataType::Boolean)
1081 | (Value::Timestamp(_), DataType::Timestamp)
1082 | (Value::Duration(_), DataType::Duration)
1083 | (Value::IpAddr(_), DataType::IpAddr)
1084 | (Value::MacAddr(_), DataType::MacAddr)
1085 | (Value::Vector(_), DataType::Vector)
1086 | (Value::Json(_), DataType::Json)
1087 | (Value::Uuid(_), DataType::Uuid)
1088 | (Value::NodeRef(_), DataType::NodeRef)
1089 | (Value::EdgeRef(_), DataType::EdgeRef)
1090 | (Value::VectorRef(_, _), DataType::VectorRef)
1091 | (Value::RowRef(_, _), DataType::RowRef)
1092 | (Value::Color(_), DataType::Color)
1093 | (Value::Email(_), DataType::Email)
1094 | (Value::Url(_), DataType::Url)
1095 | (Value::Phone(_), DataType::Phone)
1096 | (Value::Semver(_), DataType::Semver)
1097 | (Value::Cidr(_, _), DataType::Cidr)
1098 | (Value::Date(_), DataType::Date)
1099 | (Value::Time(_), DataType::Time)
1100 | (Value::Decimal(_), DataType::Decimal)
1101 | (Value::EnumValue(_), DataType::Enum)
1102 | (Value::Array(_), DataType::Array)
1103 | (Value::TimestampMs(_), DataType::TimestampMs)
1104 | (Value::Ipv4(_), DataType::Ipv4)
1105 | (Value::Ipv6(_), DataType::Ipv6)
1106 | (Value::Subnet(_, _), DataType::Subnet)
1107 | (Value::Port(_), DataType::Port)
1108 | (Value::Latitude(_), DataType::Latitude)
1109 | (Value::Longitude(_), DataType::Longitude)
1110 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1111 | (Value::Country2(_), DataType::Country2)
1112 | (Value::Country3(_), DataType::Country3)
1113 | (Value::Lang2(_), DataType::Lang2)
1114 | (Value::Lang5(_), DataType::Lang5)
1115 | (Value::Currency(_), DataType::Currency)
1116 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1117 | (Value::BigInt(_), DataType::BigInt)
1118 | (Value::KeyRef(_, _), DataType::KeyRef)
1119 | (Value::DocRef(_, _), DataType::DocRef)
1120 | (Value::TableRef(_), DataType::TableRef)
1121 | (Value::PageRef(_), DataType::PageRef)
1122 | (Value::Secret(_), DataType::Secret)
1123 | (Value::Password(_), DataType::Password)
1124 )
1125}
1126
1127fn value_to_coercion_input(value: &Value) -> Option<String> {
1128 match value {
1129 Value::Null => None,
1130 Value::Integer(value) => Some(value.to_string()),
1131 Value::UnsignedInteger(value) => Some(value.to_string()),
1132 Value::Float(value) => Some(value.to_string()),
1133 Value::Text(value) => Some(value.to_string()),
1134 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1135 Value::Boolean(value) => Some(value.to_string()),
1136 Value::Timestamp(value) => Some(value.to_string()),
1137 Value::Duration(value) => Some(value.to_string()),
1138 Value::IpAddr(value) => Some(value.to_string()),
1139 Value::MacAddr(value) => Some(format!(
1140 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1141 value[0], value[1], value[2], value[3], value[4], value[5]
1142 )),
1143 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1144 Value::Email(value) => Some(value.clone()),
1145 Value::Url(value) => Some(value.clone()),
1146 Value::Phone(value) => Some(value.to_string()),
1147 Value::Semver(value) => Some(format!(
1148 "{}.{}.{}",
1149 value / 1_000_000,
1150 (value / 1_000) % 1_000,
1151 value % 1_000
1152 )),
1153 Value::Date(value) => Some(value.to_string()),
1154 Value::Time(value) => Some(value.to_string()),
1155 Value::Decimal(value) => Some(value.to_string()),
1156 Value::TimestampMs(value) => Some(value.to_string()),
1157 Value::Ipv4(value) => Some(format!(
1158 "{}.{}.{}.{}",
1159 (value >> 24) & 0xFF,
1160 (value >> 16) & 0xFF,
1161 (value >> 8) & 0xFF,
1162 value & 0xFF
1163 )),
1164 Value::Port(value) => Some(value.to_string()),
1165 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1166 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1167 Value::GeoPoint(lat, lon) => Some(format!(
1168 "{},{}",
1169 *lat as f64 / 1_000_000.0,
1170 *lon as f64 / 1_000_000.0
1171 )),
1172 Value::BigInt(value) => Some(value.to_string()),
1173 Value::TableRef(value) => Some(value.clone()),
1174 Value::PageRef(value) => Some(value.to_string()),
1175 Value::Password(value) => Some(value.clone()),
1176 _ => None,
1177 }
1178}
1179
1180fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1181 if modified_columns.is_empty() {
1182 return modified_columns;
1183 }
1184
1185 let mut unique = Vec::with_capacity(modified_columns.len());
1186 for column in modified_columns.drain(..) {
1187 if !unique
1188 .iter()
1189 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1190 {
1191 unique.push(column);
1192 }
1193 }
1194 unique
1195}
1196
1197impl RedDBRuntime {
1198 pub(crate) fn apply_loaded_patch_entity_core(
1199 &self,
1200 collection: String,
1201 mut entity: crate::storage::UnifiedEntity,
1202 payload: JsonValue,
1203 operations: Vec<PatchEntityOperation>,
1204 ) -> RedDBResult<AppliedEntityMutation> {
1205 let id = entity.id;
1206 let operations = normalize_ttl_patch_operations(operations)?;
1207 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1210
1211 let db = self.db();
1212 let store = db.store();
1213 let Some(manager) = store.get_collection(&collection) else {
1214 return Err(crate::RedDBError::NotFound(format!(
1215 "collection not found: {collection}"
1216 )));
1217 };
1218
1219 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1220 let mut metadata_changed = false;
1221 let mut modified_columns: Vec<String> = Vec::new();
1222 let mut context_index_dirty = false;
1223
1224 let row_contract_timestamps = db
1225 .collection_contract(&collection)
1226 .map(|c| c.timestamps_enabled)
1227 .unwrap_or(false);
1228
1229 match &mut entity.data {
1230 crate::storage::EntityData::Row(row) => {
1231 let mut field_ops = Vec::new();
1232 let mut metadata_ops = Vec::new();
1233
1234 for mut op in operations {
1235 let Some(root) = op.path.first().map(String::as_str) else {
1236 return Err(crate::RedDBError::Query(
1237 "patch path cannot be empty".to_string(),
1238 ));
1239 };
1240
1241 match root {
1242 "fields" | "named" => {
1243 if op.path.len() < 2 {
1244 return Err(crate::RedDBError::Query(
1245 "patch path 'fields' requires a nested key".to_string(),
1246 ));
1247 }
1248 if row_contract_timestamps {
1249 let leaf = op.path.get(1).map(String::as_str);
1250 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1251 return Err(crate::RedDBError::Query(format!(
1252 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1253 collection,
1254 leaf.unwrap_or("")
1255 )));
1256 }
1257 }
1258 op.path.remove(0);
1259 field_ops.push(op);
1260 }
1261 "metadata" => {
1262 if op.path.len() < 2 {
1263 return Err(crate::RedDBError::Query(
1264 "patch path 'metadata' requires a nested key".to_string(),
1265 ));
1266 }
1267 op.path.remove(0);
1268 metadata_ops.push(op);
1269 }
1270 _ => {
1271 return Err(crate::RedDBError::Query(format!(
1272 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1273 )));
1274 }
1275 }
1276 }
1277
1278 if !field_ops.is_empty() {
1279 context_index_dirty = true;
1280 for op in &field_ops {
1281 if let Some(col) = op.path.first() {
1282 modified_columns.push(col.clone());
1283 }
1284 }
1285 let named = row.named.get_or_insert_with(Default::default);
1286 apply_patch_operations_to_storage_map(named, &field_ops)?;
1287 }
1288
1289 if let Some(fields) = payload
1290 .get("fields")
1291 .and_then(crate::json::Value::as_object)
1292 {
1293 if row_contract_timestamps {
1294 for key in fields.keys() {
1295 if key == "created_at" || key == "updated_at" {
1296 return Err(crate::RedDBError::Query(format!(
1297 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1298 collection, key
1299 )));
1300 }
1301 }
1302 }
1303 context_index_dirty = true;
1304 let named = row.named.get_or_insert_with(Default::default);
1305 for (key, value) in fields {
1306 modified_columns.push(key.clone());
1307 named.insert(key.clone(), json_to_storage_value(value)?);
1308 }
1309 }
1310
1311 if !metadata_ops.is_empty() {
1312 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1313 let metadata = patch_metadata.get_or_insert_with(|| {
1314 store.get_metadata(&collection, id).unwrap_or_default()
1315 });
1316 let mut metadata_json = metadata_to_json(metadata);
1317 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1318 .map_err(crate::RedDBError::Query)?;
1319 *metadata = metadata_from_json(&metadata_json)?;
1320 metadata_changed = true;
1321 }
1322
1323 if !modified_columns.is_empty() || row_contract_timestamps {
1324 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1325 let current_fields = if let Some(named) = row.named.take() {
1326 named.into_iter().collect::<Vec<_>>()
1327 } else if let Some(schema) = row.schema.as_ref() {
1328 schema
1329 .iter()
1330 .cloned()
1331 .zip(row.columns.iter().cloned())
1332 .collect::<Vec<_>>()
1333 } else {
1334 Vec::new()
1335 };
1336 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1337 if row_contract_timestamps {
1338 modified_columns.push("updated_at".to_string());
1339 context_index_dirty = true;
1340 }
1341 if contract.requires_uniqueness_check(&modified_columns) {
1342 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1343 }
1344 row.named = Some(normalized_fields.into_iter().collect());
1345 }
1346 }
1347 crate::storage::EntityData::Node(node) => {
1348 let mut field_ops = Vec::new();
1349 let mut metadata_ops = Vec::new();
1350
1351 for mut op in operations {
1352 let Some(root) = op.path.first().map(String::as_str) else {
1353 return Err(crate::RedDBError::Query(
1354 "patch path cannot be empty".to_string(),
1355 ));
1356 };
1357
1358 match root {
1359 "fields" | "properties" => {
1360 if op.path.len() < 2 {
1361 return Err(crate::RedDBError::Query(
1362 "patch path 'fields' requires a nested key".to_string(),
1363 ));
1364 }
1365 op.path.remove(0);
1366 field_ops.push(op);
1367 }
1368 "metadata" => {
1369 if op.path.len() < 2 {
1370 return Err(crate::RedDBError::Query(
1371 "patch path 'metadata' requires a nested key".to_string(),
1372 ));
1373 }
1374 op.path.remove(0);
1375 metadata_ops.push(op);
1376 }
1377 _ => {
1378 return Err(crate::RedDBError::Query(format!(
1379 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, or metadata/*"
1380 )));
1381 }
1382 }
1383 }
1384
1385 if !field_ops.is_empty() {
1386 context_index_dirty = true;
1387 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1388 }
1389
1390 if let Some(fields) = payload
1391 .get("fields")
1392 .and_then(crate::json::Value::as_object)
1393 {
1394 context_index_dirty = true;
1395 for (key, value) in fields {
1396 node.properties
1397 .insert(key.clone(), json_to_storage_value(value)?);
1398 }
1399 }
1400
1401 if !metadata_ops.is_empty() {
1402 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1403 let metadata = patch_metadata.get_or_insert_with(|| {
1404 store.get_metadata(&collection, id).unwrap_or_default()
1405 });
1406 let mut metadata_json = metadata_to_json(metadata);
1407 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1408 .map_err(crate::RedDBError::Query)?;
1409 *metadata = metadata_from_json(&metadata_json)?;
1410 metadata_changed = true;
1411 }
1412 }
1413 crate::storage::EntityData::Edge(edge) => {
1414 let mut field_ops = Vec::new();
1415 let mut metadata_ops = Vec::new();
1416 let mut weight_ops = Vec::new();
1417
1418 for mut op in operations {
1419 let Some(root) = op.path.first().map(String::as_str) else {
1420 return Err(crate::RedDBError::Query(
1421 "patch path cannot be empty".to_string(),
1422 ));
1423 };
1424
1425 match root {
1426 "fields" | "properties" => {
1427 if op.path.len() < 2 {
1428 return Err(crate::RedDBError::Query(
1429 "patch path 'fields' requires a nested key".to_string(),
1430 ));
1431 }
1432 op.path.remove(0);
1433 field_ops.push(op);
1434 }
1435 "weight" => {
1436 if op.path.len() != 1 {
1437 return Err(crate::RedDBError::Query(
1438 "patch path 'weight' does not allow nested keys".to_string(),
1439 ));
1440 }
1441 op.path.clear();
1442 weight_ops.push(op);
1443 }
1444 "metadata" => {
1445 if op.path.len() < 2 {
1446 return Err(crate::RedDBError::Query(
1447 "patch path 'metadata' requires a nested key".to_string(),
1448 ));
1449 }
1450 op.path.remove(0);
1451 metadata_ops.push(op);
1452 }
1453 _ => {
1454 return Err(crate::RedDBError::Query(format!(
1455 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1456 )));
1457 }
1458 }
1459 }
1460
1461 if !field_ops.is_empty() {
1462 context_index_dirty = true;
1463 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1464 }
1465
1466 for op in weight_ops {
1467 context_index_dirty = true;
1468 let value = op.value.ok_or_else(|| {
1469 crate::RedDBError::Query("weight operations require a value".to_string())
1470 })?;
1471
1472 match op.op {
1473 PatchEntityOperationType::Unset => {
1474 return Err(crate::RedDBError::Query(
1475 "weight cannot be unset through patch operations".to_string(),
1476 ));
1477 }
1478 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1479 let Some(weight) = value.as_f64() else {
1480 return Err(crate::RedDBError::Query(
1481 "weight operation requires a numeric value".to_string(),
1482 ));
1483 };
1484 edge.weight = weight as f32;
1485 }
1486 }
1487 }
1488
1489 if let Some(fields) = payload
1490 .get("fields")
1491 .and_then(crate::json::Value::as_object)
1492 {
1493 context_index_dirty = true;
1494 for (key, value) in fields {
1495 edge.properties
1496 .insert(key.clone(), json_to_storage_value(value)?);
1497 }
1498 }
1499
1500 if !metadata_ops.is_empty() {
1501 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1502 let metadata = patch_metadata.get_or_insert_with(|| {
1503 store.get_metadata(&collection, id).unwrap_or_default()
1504 });
1505 let mut metadata_json = metadata_to_json(metadata);
1506 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1507 .map_err(crate::RedDBError::Query)?;
1508 *metadata = metadata_from_json(&metadata_json)?;
1509 metadata_changed = true;
1510 }
1511 }
1512 crate::storage::EntityData::Vector(vector) => {
1513 let mut field_ops = Vec::new();
1514 let mut metadata_ops = Vec::new();
1515
1516 for mut op in operations {
1517 let Some(root) = op.path.first().map(String::as_str) else {
1518 return Err(crate::RedDBError::Query(
1519 "patch path cannot be empty".to_string(),
1520 ));
1521 };
1522
1523 match root {
1524 "fields" => {
1525 if op.path.len() < 2 {
1526 return Err(crate::RedDBError::Query(
1527 "patch path 'fields' requires a nested key".to_string(),
1528 ));
1529 }
1530 op.path.remove(0);
1531 let Some(target) = op.path.first().map(String::as_str) else {
1532 return Err(crate::RedDBError::Query(
1533 "patch path requires a target under fields".to_string(),
1534 ));
1535 };
1536 if !matches!(target, "dense" | "content" | "sparse") {
1537 return Err(crate::RedDBError::Query(format!(
1538 "unsupported vector patch target '{target}'"
1539 )));
1540 }
1541 field_ops.push(op);
1542 }
1543 "metadata" => {
1544 if op.path.len() < 2 {
1545 return Err(crate::RedDBError::Query(
1546 "patch path 'metadata' requires a nested key".to_string(),
1547 ));
1548 }
1549 op.path.remove(0);
1550 metadata_ops.push(op);
1551 }
1552 _ => {
1553 return Err(crate::RedDBError::Query(format!(
1554 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1555 )));
1556 }
1557 }
1558 }
1559
1560 if !field_ops.is_empty() {
1561 context_index_dirty = true;
1562 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1563 }
1564
1565 if let Some(fields) = payload
1566 .get("fields")
1567 .and_then(crate::json::Value::as_object)
1568 {
1569 context_index_dirty = true;
1570 if let Some(content) =
1571 fields.get("content").and_then(crate::json::Value::as_str)
1572 {
1573 vector.content = Some(content.to_string());
1574 }
1575 if let Some(dense) = fields.get("dense") {
1576 vector.dense = dense
1577 .as_array()
1578 .ok_or_else(|| {
1579 crate::RedDBError::Query(
1580 "field 'dense' must be an array".to_string(),
1581 )
1582 })?
1583 .iter()
1584 .map(|value| {
1585 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1586 crate::RedDBError::Query(
1587 "field 'dense' must contain only numbers".to_string(),
1588 )
1589 })
1590 })
1591 .collect::<Result<Vec<_>, _>>()?;
1592 }
1593 }
1594
1595 if !metadata_ops.is_empty() {
1596 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1597 let metadata = patch_metadata.get_or_insert_with(|| {
1598 store.get_metadata(&collection, id).unwrap_or_default()
1599 });
1600 let mut metadata_json = metadata_to_json(metadata);
1601 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1602 .map_err(crate::RedDBError::Query)?;
1603 *metadata = metadata_from_json(&metadata_json)?;
1604 metadata_changed = true;
1605 }
1606 }
1607 crate::storage::EntityData::TimeSeries(_)
1608 | crate::storage::EntityData::QueueMessage(_) => {
1609 return Err(crate::RedDBError::Query(
1610 "patch operations are not supported for TimeSeries or QueueMessage entities"
1611 .to_string(),
1612 ));
1613 }
1614 }
1615
1616 if let Some(metadata) = payload
1617 .get("metadata")
1618 .and_then(crate::json::Value::as_object)
1619 {
1620 let patch_metadata = patch_metadata
1621 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1622 for (key, value) in metadata {
1623 ensure_non_tree_reserved_metadata_key(key)?;
1624 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1625 }
1626 metadata_changed = true;
1627 }
1628
1629 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1630 let patch_metadata = patch_metadata
1631 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1632 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1633 patch_metadata.remove(&key);
1634 } else {
1635 patch_metadata.set(key, value);
1636 }
1637 metadata_changed = true;
1638 }
1639
1640 entity.updated_at = std::time::SystemTime::now()
1641 .duration_since(std::time::UNIX_EPOCH)
1642 .unwrap_or_default()
1643 .as_secs();
1644
1645 modified_columns = dedupe_modified_columns(modified_columns);
1646
1647 Ok(AppliedEntityMutation {
1648 id,
1649 collection,
1650 entity,
1651 metadata: patch_metadata,
1652 modified_columns,
1653 persist_metadata: metadata_changed,
1654 context_index_dirty,
1655 replaced_entity: None,
1656 replaced_entity_previous_xmax: 0,
1657 pre_mutation_fields,
1658 })
1659 }
1660
1661 pub(crate) fn apply_loaded_sql_update_row_core(
1662 &self,
1663 collection: String,
1664 mut entity: crate::storage::UnifiedEntity,
1665 static_field_assignments: &[(String, Value)],
1666 dynamic_field_assignments: Vec<(String, Value)>,
1667 static_metadata_assignments: &[(String, MetadataValue)],
1668 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1669 row_contract_plan: Option<&RowUpdateContractPlan>,
1670 row_modified_columns_template: &[String],
1671 row_touches_unique_columns: bool,
1672 ) -> RedDBResult<AppliedEntityMutation> {
1673 let id = entity.id;
1674 let previous_xmax = entity.xmax;
1675 let db = self.db();
1676 let store = db.store();
1677 let Some(_) = store.get_collection(&collection) else {
1678 return Err(crate::RedDBError::NotFound(format!(
1679 "collection not found: {collection}"
1680 )));
1681 };
1682
1683 let versioned_update_xid = match self.current_xid() {
1684 Some(xid) => Some(xid),
1685 None => {
1686 let snapshot_manager = self.snapshot_manager();
1687 let xid = snapshot_manager.begin();
1688 snapshot_manager.commit(xid);
1689 Some(xid)
1690 }
1691 };
1692 let mut replaced_entity = versioned_update_xid.map(|xid| {
1693 let mut old = entity.clone();
1694 old.set_xmax(xid);
1695 old
1696 });
1697
1698 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1699 let row_contract_timestamps = row_contract_plan
1700 .map(|plan| plan.timestamps_enabled)
1701 .unwrap_or(false);
1702 let mut metadata_changed = false;
1703 let mut modified_columns = row_modified_columns_template.to_vec();
1704 let mut context_index_dirty = !modified_columns.is_empty();
1705
1706 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1710
1711 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1712 return Err(crate::RedDBError::Query(
1713 "SQL row update fast path requires a row entity".to_string(),
1714 ));
1715 };
1716
1717 let _ = row_contract_plan;
1718 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1719 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1720
1721 for (key, value) in static_metadata_assignments
1722 .iter()
1723 .cloned()
1724 .chain(dynamic_metadata_assignments)
1725 {
1726 ensure_non_tree_reserved_metadata_key(&key)?;
1727 patch_metadata
1728 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1729 .set(key, value);
1730 metadata_changed = true;
1731 }
1732
1733 if !modified_columns.is_empty() || row_contract_timestamps {
1734 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1735 if row_contract_timestamps {
1736 context_index_dirty = true;
1737 set_row_field(
1738 row,
1739 "updated_at",
1740 Value::UnsignedInteger(current_unix_ms_u64()),
1741 );
1742 modified_columns.push("updated_at".to_string());
1743 }
1744 if row_touches_unique_columns {
1745 let current_fields = collect_row_fields(row);
1746 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1747 }
1748 }
1749
1750 entity.updated_at = std::time::SystemTime::now()
1751 .duration_since(std::time::UNIX_EPOCH)
1752 .unwrap_or_default()
1753 .as_secs();
1754
1755 if let Some(xid) = versioned_update_xid {
1756 let logical_id = entity.logical_id();
1757 entity.id = store.next_entity_id();
1758 entity.set_logical_id(logical_id);
1759 entity.set_xmin(xid);
1760 entity.set_xmax(0);
1761 if let Some(old) = replaced_entity.as_mut() {
1762 old.set_xmax(xid);
1763 }
1764 }
1765
1766 modified_columns = dedupe_modified_columns(modified_columns);
1767
1768 Ok(AppliedEntityMutation {
1769 id: entity.id,
1770 collection,
1771 entity,
1772 metadata: patch_metadata,
1773 modified_columns,
1774 persist_metadata: metadata_changed,
1775 context_index_dirty,
1776 replaced_entity,
1777 replaced_entity_previous_xmax: previous_xmax,
1778 pre_mutation_fields,
1779 })
1780 }
1781
1782 pub(crate) fn persist_applied_entity_mutations(
1783 &self,
1784 applied: &[AppliedEntityMutation],
1785 ) -> RedDBResult<()> {
1786 if applied.is_empty() {
1787 return Ok(());
1788 }
1789
1790 let store = self.db().store();
1791 let collection = &applied[0].collection;
1792 let Some(manager) = store.get_collection(collection) else {
1793 return Err(crate::RedDBError::NotFound(format!(
1794 "collection not found: {collection}"
1795 )));
1796 };
1797
1798 let mut ordinary = Vec::with_capacity(applied.len());
1799 for item in applied {
1800 if let Some(old_version) = item.replaced_entity.as_ref() {
1801 store
1802 .install_versioned_table_row_update(
1803 collection,
1804 old_version.clone(),
1805 item.entity.clone(),
1806 if item.persist_metadata {
1807 item.metadata.as_ref()
1808 } else {
1809 None
1810 },
1811 )
1812 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1813 if self.current_xid().is_some() {
1814 self.record_pending_versioned_update(
1815 crate::runtime::impl_core::current_connection_id(),
1816 collection,
1817 old_version.id,
1818 item.entity.id,
1819 old_version.xmax,
1820 item.replaced_entity_previous_xmax,
1821 );
1822 }
1823 } else {
1824 ordinary.push(item);
1825 }
1826 }
1827 if ordinary.is_empty() {
1828 return Ok(());
1829 }
1830
1831 manager
1832 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1833 (
1834 &item.entity,
1835 item.modified_columns.as_slice(),
1836 if item.persist_metadata {
1837 item.metadata.as_ref()
1838 } else {
1839 None
1840 },
1841 )
1842 }))
1843 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1844
1845 let indexed_cols = self
1854 .index_store_ref()
1855 .indexed_columns_set(collection.as_str());
1856 let all_hot = !indexed_cols.is_empty()
1857 && ordinary.iter().all(|item| {
1858 !item.persist_metadata
1859 && !item
1860 .modified_columns
1861 .iter()
1862 .any(|c| indexed_cols.contains(c))
1863 })
1864 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
1865
1866 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
1870 ordinary.iter().map(|item| &item.entity).collect();
1871 let persist_fn = if all_hot {
1872 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
1873 } else {
1874 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
1875 };
1876 persist_fn(store.as_ref(), collection, &entity_refs)
1877 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
1878 }
1879
1880 pub(crate) fn flush_applied_entity_mutation(
1881 &self,
1882 applied: &AppliedEntityMutation,
1883 ) -> RedDBResult<()> {
1884 let store = self.db().store();
1885 if applied.context_index_dirty {
1886 store
1887 .context_index()
1888 .index_entity(&applied.collection, &applied.entity);
1889 }
1890 let mut changed_columns: Option<Vec<String>> = None;
1898 if !applied.pre_mutation_fields.is_empty() {
1899 let post = entity_row_fields_snapshot(&applied.entity);
1900 if !post.is_empty() {
1901 let damage = crate::application::entity::row_damage_vector(
1902 &applied.pre_mutation_fields,
1903 &post,
1904 );
1905 if !damage.is_empty() {
1906 changed_columns = Some(
1907 damage
1908 .touched_columns()
1909 .into_iter()
1910 .map(str::to_string)
1911 .collect(),
1912 );
1913 }
1914
1915 let indexed_cols: std::collections::HashSet<String> = self
1924 .index_store_ref()
1925 .list_indices(applied.collection.as_str())
1926 .into_iter()
1927 .filter_map(|idx| idx.columns.first().cloned())
1928 .collect();
1929 let modified_cols: std::collections::HashSet<String> = damage
1930 .touched_columns()
1931 .into_iter()
1932 .map(str::to_string)
1933 .collect();
1934 if let Some(old_version) = applied.replaced_entity.as_ref() {
1935 let old_index_fields: Vec<(String, Value)> = applied
1936 .pre_mutation_fields
1937 .iter()
1938 .filter(|(col, _)| indexed_cols.contains(col))
1939 .cloned()
1940 .collect();
1941 let new_index_fields: Vec<(String, Value)> = post
1942 .iter()
1943 .filter(|(col, _)| indexed_cols.contains(col))
1944 .cloned()
1945 .collect();
1946 if !old_index_fields.is_empty() {
1947 self.index_store_ref()
1948 .index_entity_delete(
1949 &applied.collection,
1950 old_version.id,
1951 &old_index_fields,
1952 )
1953 .map_err(crate::RedDBError::Internal)?;
1954 }
1955 if !new_index_fields.is_empty() {
1956 self.index_store_ref()
1957 .index_entity_insert(
1958 &applied.collection,
1959 applied.entity.id,
1960 &new_index_fields,
1961 )
1962 .map_err(crate::RedDBError::Internal)?;
1963 }
1964 } else {
1965 let decision = crate::storage::engine::hot_update::decide(
1966 &crate::storage::engine::hot_update::HotUpdateInputs {
1967 collection: applied.collection.as_str(),
1968 indexed_columns: &indexed_cols,
1969 modified_columns: &modified_cols,
1970 new_tuple_size: 0,
1974 page_free_space: usize::MAX,
1975 },
1976 );
1977 if !decision.can_hot {
1978 self.index_store_ref()
1979 .index_entity_update(
1980 &applied.collection,
1981 applied.id,
1982 &applied.pre_mutation_fields,
1983 &post,
1984 )
1985 .map_err(crate::RedDBError::Internal)?;
1986 } else {
1987 tracing::debug!(
1991 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
1992 "hot_update fast-path: skipped index_entity_update"
1993 );
1994 }
1995 }
1996 }
1997 }
1998 self.cdc_emit_prebuilt_with_columns(
1999 crate::replication::cdc::ChangeOperation::Update,
2000 &applied.collection,
2001 &applied.entity,
2002 "entity",
2003 applied.metadata.as_ref(),
2004 true,
2005 changed_columns,
2006 );
2007 Ok(())
2008 }
2009
2010 pub(crate) fn apply_loaded_patch_entity(
2011 &self,
2012 collection: String,
2013 entity: crate::storage::UnifiedEntity,
2014 payload: JsonValue,
2015 operations: Vec<PatchEntityOperation>,
2016 ) -> RedDBResult<CreateEntityOutput> {
2017 let applied =
2018 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2019 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2020 self.flush_applied_entity_mutation(&applied)?;
2021 Ok(CreateEntityOutput {
2022 id: applied.id,
2023 entity: Some(applied.entity),
2024 })
2025 }
2026}
2027
2028fn ensure_non_tree_reserved_metadata_patch_paths(
2029 operations: &[PatchEntityOperation],
2030) -> RedDBResult<()> {
2031 for operation in operations {
2032 let Some(key) = operation.path.first().map(String::as_str) else {
2033 continue;
2034 };
2035 ensure_non_tree_reserved_metadata_key(key)?;
2036 }
2037 Ok(())
2038}
2039
2040fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2041 if key.starts_with(TREE_METADATA_PREFIX) {
2042 return Err(crate::RedDBError::Query(format!(
2043 "metadata key '{}' is reserved for managed trees",
2044 key
2045 )));
2046 }
2047 Ok(())
2048}
2049
2050fn ensure_non_tree_reserved_metadata_entries(
2051 metadata: &[(String, MetadataValue)],
2052) -> RedDBResult<()> {
2053 for (key, _) in metadata {
2054 ensure_non_tree_reserved_metadata_key(key)?;
2055 }
2056 Ok(())
2057}
2058
2059fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2060 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2061 return Err(crate::RedDBError::Query(format!(
2062 "edge label '{}' is reserved for managed trees",
2063 TREE_CHILD_EDGE_LABEL
2064 )));
2065 }
2066 Ok(())
2067}
2068
2069impl RedDBRuntime {
2070 pub(crate) fn create_node_unchecked(
2071 &self,
2072 input: CreateNodeInput,
2073 ) -> RedDBResult<CreateEntityOutput> {
2074 let db = self.db();
2075 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2076 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2077 let mut metadata = input.metadata;
2078 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2079 let mut builder = db.node(&input.collection, &input.label);
2080
2081 if let Some(node_type) = input.node_type {
2082 builder = builder.node_type(node_type);
2083 }
2084
2085 for (key, value) in input.properties {
2086 builder = builder.property(key, value);
2087 }
2088
2089 for (key, value) in metadata {
2090 builder = builder.metadata(key, value);
2091 }
2092
2093 for embedding in input.embeddings {
2094 if let Some(model) = embedding.model {
2095 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2096 } else {
2097 builder = builder.embedding(embedding.name, embedding.vector);
2098 }
2099 }
2100
2101 for link in input.table_links {
2102 builder = builder.link_to_table(link.key, link.table);
2103 }
2104
2105 for link in input.node_links {
2106 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2107 }
2108
2109 let id = builder.save()?;
2110 self.stamp_xmin_if_in_txn(&input.collection, id);
2113 refresh_context_index(&db, &input.collection, id)?;
2114 self.cdc_emit(
2115 crate::replication::cdc::ChangeOperation::Insert,
2116 &input.collection,
2117 id.raw(),
2118 "graph_node",
2119 );
2120 Ok(CreateEntityOutput {
2121 id,
2122 entity: db.store().get(&input.collection, id),
2123 })
2124 }
2125
2126 pub(crate) fn create_edge_unchecked(
2127 &self,
2128 input: CreateEdgeInput,
2129 ) -> RedDBResult<CreateEntityOutput> {
2130 let db = self.db();
2131 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2132 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2133 let mut metadata = input.metadata;
2134 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2135 let mut builder = db
2136 .edge(&input.collection, &input.label)
2137 .from(input.from)
2138 .to(input.to);
2139
2140 if let Some(weight) = input.weight {
2141 builder = builder.weight(weight);
2142 }
2143
2144 for (key, value) in input.properties {
2145 builder = builder.property(key, value);
2146 }
2147
2148 for (key, value) in metadata {
2149 builder = builder.metadata(key, value);
2150 }
2151
2152 let id = builder.save()?;
2153 self.stamp_xmin_if_in_txn(&input.collection, id);
2156 refresh_context_index(&db, &input.collection, id)?;
2157 self.cdc_emit(
2158 crate::replication::cdc::ChangeOperation::Insert,
2159 &input.collection,
2160 id.raw(),
2161 "graph_edge",
2162 );
2163 Ok(CreateEntityOutput {
2164 id,
2165 entity: db.store().get(&input.collection, id),
2166 })
2167 }
2168}
2169
2170fn create_rows_batch_prevalidated_columnar_with_outputs(
2171 runtime: &RedDBRuntime,
2172 collection: String,
2173 column_names: std::sync::Arc<Vec<String>>,
2174 rows: Vec<Vec<crate::storage::schema::Value>>,
2175) -> RedDBResult<Vec<CreateEntityOutput>> {
2176 use crate::storage::{
2177 unified::{EntityData, EntityKind, RowData},
2178 EntityId, UnifiedEntity,
2179 };
2180 use std::sync::Arc;
2181
2182 if rows.is_empty() {
2183 return Ok(Vec::new());
2184 }
2185 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2186 runtime.check_batch_size(rows.len())?;
2187 runtime.check_db_size()?;
2188
2189 let db = runtime.db();
2190 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2191 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2192
2193 let store = db.store();
2194 let table_arc: Arc<str> = Arc::from(collection.as_str());
2195
2196 let indexed_cols = runtime
2197 .index_store_ref()
2198 .indexed_columns_set(collection.as_str());
2199 let has_secondary_indexes = !indexed_cols.is_empty();
2200 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2201 if has_secondary_indexes {
2202 Vec::with_capacity(rows.len())
2203 } else {
2204 Vec::new()
2205 };
2206
2207 let entities: Vec<UnifiedEntity> = rows
2208 .into_iter()
2209 .map(|values| {
2210 if has_secondary_indexes {
2211 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2212 Vec::with_capacity(indexed_cols.len());
2213 for (name, val) in column_names.iter().zip(values.iter()) {
2214 if indexed_cols.contains(name) {
2215 snap.push((name.clone(), val.clone()));
2216 }
2217 }
2218 field_snapshots.push(snap);
2219 }
2220 let mut row = RowData::new(values);
2221 row.schema = Some(Arc::clone(&column_names));
2222 UnifiedEntity::new(
2223 EntityId::new(0),
2224 EntityKind::TableRow {
2225 table: Arc::clone(&table_arc),
2226 row_id: 0,
2227 },
2228 EntityData::Row(row),
2229 )
2230 })
2231 .collect();
2232
2233 let ids = store
2234 .bulk_insert(&collection, entities)
2235 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2236
2237 if has_secondary_indexes {
2238 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2239 .iter()
2240 .zip(field_snapshots)
2241 .map(|(id, fields)| (*id, fields))
2242 .collect();
2243 runtime
2244 .index_store_ref()
2245 .index_entity_insert_batch(&collection, &index_rows)
2246 .map_err(crate::RedDBError::Internal)?;
2247 }
2248
2249 runtime.invalidate_result_cache();
2250 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2251
2252 Ok(ids
2253 .into_iter()
2254 .map(|id| CreateEntityOutput { id, entity: None })
2255 .collect())
2256}
2257
2258impl RuntimeEntityPort for RedDBRuntime {
2259 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2260 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2261 let db = self.db();
2262 let CreateRowInput {
2263 collection,
2264 fields,
2265 metadata: input_metadata,
2266 node_links,
2267 vector_links,
2268 } = input;
2269 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2270 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2271 let mut metadata = input_metadata;
2272 apply_collection_default_ttl(&db, &collection, &mut metadata);
2273 let fields = contract.normalize_insert_fields(fields)?;
2274 contract.enforce_row_uniqueness(&fields, None)?;
2275 let engine = self.mutation_engine();
2277 let result = engine.apply(
2278 collection.clone(),
2279 vec![crate::runtime::mutation::MutationRow {
2280 fields,
2281 metadata,
2282 node_links,
2283 vector_links,
2284 }],
2285 )?;
2286 let id = result.ids[0];
2287 Ok(CreateEntityOutput {
2292 id,
2293 entity: db.store().get(&collection, id),
2294 })
2295 }
2296
2297 fn create_rows_batch(
2298 &self,
2299 input: CreateRowsBatchInput,
2300 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2301 if input.rows.is_empty() {
2302 return Ok(Vec::new());
2303 }
2304 self.check_batch_size(input.rows.len())?;
2305 self.check_db_size()?;
2306
2307 let db = self.db();
2308 let collection = input.collection;
2309 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2310 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2311
2312 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2313 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2314 for row in input.rows {
2315 if row.collection != collection {
2316 return Err(crate::RedDBError::Query(format!(
2317 "batch row collection mismatch: expected '{}', got '{}'",
2318 collection, row.collection
2319 )));
2320 }
2321
2322 let mut metadata = row.metadata;
2323 apply_collection_default_ttl(&db, &collection, &mut metadata);
2324 let fields = contract.normalize_insert_fields(row.fields)?;
2325 contract.enforce_row_uniqueness(&fields, None)?;
2326 uniqueness_rows.push(fields.clone());
2327 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2328 }
2329
2330 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2331
2332 let engine = {
2335 let e = self.mutation_engine();
2336 if input.suppress_events {
2337 e.with_suppress_events()
2338 } else {
2339 e
2340 }
2341 };
2342 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2343 .into_iter()
2344 .map(|(fields, metadata, node_links, vector_links)| {
2345 crate::runtime::mutation::MutationRow {
2346 fields,
2347 metadata,
2348 node_links,
2349 vector_links,
2350 }
2351 })
2352 .collect();
2353
2354 let result = engine
2355 .apply(collection.clone(), mutation_rows)
2356 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2357
2358 let store = db.store();
2359 Ok(result
2360 .ids
2361 .into_iter()
2362 .map(|id| CreateEntityOutput {
2363 id,
2364 entity: store.get(&collection, id),
2365 })
2366 .collect())
2367 }
2368
2369 fn create_rows_batch_prevalidated_columnar(
2370 &self,
2371 collection: String,
2372 column_names: std::sync::Arc<Vec<String>>,
2373 rows: Vec<Vec<crate::storage::schema::Value>>,
2374 ) -> RedDBResult<usize> {
2375 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2376 .map(|outputs| outputs.len())
2377 }
2378
2379 fn create_rows_batch_columnar(
2380 &self,
2381 collection: String,
2382 column_names: std::sync::Arc<Vec<String>>,
2383 rows: Vec<Vec<crate::storage::schema::Value>>,
2384 ) -> RedDBResult<usize> {
2385 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2386 .map(|outputs| outputs.len())
2387 }
2388
2389 fn create_rows_batch_columnar_with_outputs(
2390 &self,
2391 collection: String,
2392 column_names: std::sync::Arc<Vec<String>>,
2393 rows: Vec<Vec<crate::storage::schema::Value>>,
2394 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2395 if rows.is_empty() {
2396 return Ok(Vec::new());
2397 }
2398 self.check_batch_size(rows.len())?;
2399 self.check_db_size()?;
2400
2401 let db = self.db();
2402 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2403 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2404
2405 let needs_normalisation = match db.collection_contract(&collection) {
2415 Some(c) => {
2416 c.declared_model == crate::catalog::CollectionModel::Table
2417 && (!c.declared_columns.is_empty()
2418 || c.table_def
2419 .as_ref()
2420 .map(|t| !t.columns.is_empty())
2421 .unwrap_or(false))
2422 }
2423 None => false,
2424 };
2425 if !needs_normalisation {
2426 return create_rows_batch_prevalidated_columnar_with_outputs(
2427 self,
2428 collection,
2429 column_names,
2430 rows,
2431 );
2432 }
2433
2434 let ncols = column_names.len();
2441 let tuple_rows: Vec<CreateRowInput> = rows
2442 .into_iter()
2443 .map(|values| {
2444 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2445 Vec::with_capacity(ncols);
2446 for (name, value) in column_names.iter().zip(values) {
2447 fields.push((name.clone(), value));
2448 }
2449 CreateRowInput {
2450 collection: collection.clone(),
2451 fields,
2452 metadata: Vec::new(),
2453 node_links: Vec::new(),
2454 vector_links: Vec::new(),
2455 }
2456 })
2457 .collect();
2458 self.create_rows_batch(CreateRowsBatchInput {
2459 collection,
2460 rows: tuple_rows,
2461 suppress_events: false,
2462 })
2463 }
2464
2465 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2466 if input.rows.is_empty() {
2467 return Ok(0);
2468 }
2469 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2470 self.check_batch_size(input.rows.len())?;
2471 self.check_db_size()?;
2472
2473 let db = self.db();
2474 let collection = input.collection;
2475 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2480 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2481
2482 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2488
2489 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2490 Vec::with_capacity(input.rows.len());
2491 let mut mutation_rows = mutation_rows;
2492 for row in input.rows {
2493 if row.collection != collection {
2494 return Err(crate::RedDBError::Query(format!(
2495 "batch row collection mismatch: expected '{}', got '{}'",
2496 collection, row.collection
2497 )));
2498 }
2499 let mut metadata = row.metadata;
2500 if let Some(ttl) = default_ttl_ms {
2501 if !has_internal_ttl_metadata(&metadata) {
2502 metadata.push((
2503 "_ttl_ms".to_string(),
2504 if ttl <= i64::MAX as u64 {
2505 MetadataValue::Int(ttl as i64)
2506 } else {
2507 MetadataValue::Timestamp(ttl)
2508 },
2509 ));
2510 }
2511 }
2512 mutation_rows.push(crate::runtime::mutation::MutationRow {
2513 fields: row.fields,
2514 metadata,
2515 node_links: row.node_links,
2516 vector_links: row.vector_links,
2517 });
2518 }
2519
2520 let engine = self.mutation_engine();
2521 let result = engine
2522 .apply(collection, mutation_rows)
2523 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2524 Ok(result.ids.len())
2525 }
2526
2527 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2528 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2529 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2530 self.create_node_unchecked(input)
2531 }
2532
2533 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2534 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2535 ensure_non_tree_structural_edge_label(&input.label)?;
2536 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2537 self.create_edge_unchecked(input)
2538 }
2539
2540 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2541 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2542 let db = self.db();
2543 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2544 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2545 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2546 let mut metadata = input.metadata;
2547 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2548 let mut builder = db.vector(&input.collection).dense(input.dense);
2549
2550 if let Some(content) = input.content {
2551 builder = builder.content(content);
2552 }
2553
2554 for (key, value) in metadata {
2555 builder = builder.metadata(key, value);
2556 }
2557
2558 if let Some(link_row) = input.link_row {
2559 builder = builder.link_to_table(link_row);
2560 }
2561
2562 if let Some(link_node) = input.link_node {
2563 builder = builder.link_to_node(link_node);
2564 }
2565
2566 let id = builder.save()?;
2567 self.stamp_xmin_if_in_txn(&input.collection, id);
2570 refresh_context_index(&db, &input.collection, id)?;
2571 self.cdc_emit(
2572 crate::replication::cdc::ChangeOperation::Insert,
2573 &input.collection,
2574 id.raw(),
2575 "vector",
2576 );
2577 Ok(CreateEntityOutput {
2578 id,
2579 entity: db.store().get(&input.collection, id),
2580 })
2581 }
2582
2583 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2584 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2585 let db = self.db();
2586 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2587 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2588
2589 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2591 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2592 })?;
2593 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2594 "body".to_string(),
2595 crate::storage::schema::Value::Json(body_bytes),
2596 )];
2597
2598 if let JsonValue::Object(ref map) = input.body {
2600 for (key, value) in map {
2601 let storage_value = json_to_storage_value(value)?;
2602 fields.push((key.clone(), storage_value));
2603 }
2604 }
2605
2606 let mut metadata = input.metadata;
2607 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2608 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2609 .iter()
2610 .map(|(key, value)| (key.as_str(), value.clone()))
2611 .collect();
2612 let mut builder = db.row(&input.collection, columns);
2613
2614 for (key, value) in metadata {
2615 builder = builder.metadata(key, value);
2616 }
2617
2618 for node in input.node_links {
2619 builder = builder.link_to_node(node);
2620 }
2621
2622 for vector in input.vector_links {
2623 builder = builder.link_to_vector(vector);
2624 }
2625
2626 let id = builder.save()?;
2627 self.stamp_xmin_if_in_txn(&input.collection, id);
2629 refresh_context_index(&db, &input.collection, id)?;
2630 self.cdc_emit(
2631 crate::replication::cdc::ChangeOperation::Insert,
2632 &input.collection,
2633 id.raw(),
2634 "document",
2635 );
2636 Ok(CreateEntityOutput {
2637 id,
2638 entity: db.store().get(&input.collection, id),
2639 })
2640 }
2641
2642 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2643 let db = self.db();
2644 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2645 let declared_model = db
2646 .collection_contract(&input.collection)
2647 .map(|contract| contract.declared_model);
2648 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2649 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2650 self.seal_vault_value(&input.collection, input.value)?
2651 } else {
2652 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2653 input.value
2654 };
2655 let fields = vec![
2656 (
2657 "key".to_string(),
2658 crate::storage::schema::Value::text(input.key),
2659 ),
2660 ("value".to_string(), value),
2661 ];
2662 let collection = input.collection;
2663 let result = self.mutation_engine().apply(
2664 collection.clone(),
2665 vec![crate::runtime::mutation::MutationRow {
2666 fields,
2667 metadata: input.metadata,
2668 node_links: Vec::new(),
2669 vector_links: Vec::new(),
2670 }],
2671 )?;
2672 let id = result.ids[0];
2673 Ok(CreateEntityOutput {
2674 id,
2675 entity: db.store().get(&collection, id),
2676 })
2677 }
2678
2679 fn create_timeseries_point(
2680 &self,
2681 input: CreateTimeSeriesPointInput,
2682 ) -> RedDBResult<CreateEntityOutput> {
2683 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2684 let db = self.db();
2685 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2686 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2687
2688 let mut fields = vec![
2689 (
2690 "metric".to_string(),
2691 crate::storage::schema::Value::text(input.metric),
2692 ),
2693 (
2694 "value".to_string(),
2695 crate::storage::schema::Value::Float(input.value),
2696 ),
2697 ];
2698
2699 if let Some(timestamp_ns) = input.timestamp_ns {
2700 fields.push((
2701 "timestamp".to_string(),
2702 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2703 ));
2704 }
2705
2706 if !input.tags.is_empty() {
2707 let tags_json = JsonValue::Object(
2708 input
2709 .tags
2710 .into_iter()
2711 .map(|(key, value)| (key, JsonValue::String(value)))
2712 .collect(),
2713 );
2714 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2715 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2716 })?;
2717 fields.push((
2718 "tags".to_string(),
2719 crate::storage::schema::Value::Json(tags_bytes),
2720 ));
2721 }
2722
2723 let collection = input.collection;
2724 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2725 self.stamp_xmin_if_in_txn(&collection, id);
2728 refresh_context_index(&db, &collection, id)?;
2729
2730 Ok(CreateEntityOutput {
2731 id,
2732 entity: db.store().get(&collection, id),
2733 })
2734 }
2735
2736 fn get_kv(
2737 &self,
2738 collection: &str,
2739 key: &str,
2740 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2741 let db = self.db();
2742 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2743 let store = db.store();
2744 let Some(manager) = store.get_collection(collection) else {
2745 return Ok(None);
2746 };
2747 let entities = manager.query_all(|_| true);
2748 for entity in entities {
2749 if let crate::storage::EntityData::Row(ref row) = entity.data {
2750 if let Some(ref named) = row.named {
2751 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2752 if &**k == key {
2753 let value = named
2754 .get("value")
2755 .cloned()
2756 .unwrap_or(crate::storage::schema::Value::Null);
2757 return Ok(Some((value, entity.id)));
2758 }
2759 }
2760 }
2761 }
2762 }
2763 Ok(None)
2764 }
2765
2766 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2767 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2768 let found = self.get_kv(collection, key)?;
2769 if let Some((_, id)) = found {
2770 let db = self.db();
2771 let store = db.store();
2772 let deleted = store
2773 .delete(collection, id)
2774 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2775 if deleted {
2776 store.context_index().remove_entity(id);
2777 }
2778 Ok(deleted)
2779 } else {
2780 Ok(false)
2781 }
2782 }
2783
2784 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2785 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2786 let PatchEntityInput {
2787 collection,
2788 id,
2789 payload,
2790 operations,
2791 } = input;
2792 let db = self.db();
2793 let store = db.store();
2794 let Some(manager) = store.get_collection(&collection) else {
2795 return Err(crate::RedDBError::NotFound(format!(
2796 "collection not found: {collection}"
2797 )));
2798 };
2799 let Some(entity) = manager.get(id) else {
2800 return Err(crate::RedDBError::NotFound(format!(
2801 "entity not found: {}",
2802 id.raw()
2803 )));
2804 };
2805 self.apply_loaded_patch_entity(collection, entity, payload, operations)
2806 }
2807
2808 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2809 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2810 let store = self.db().store();
2811 let pre_delete_fields = store
2815 .get(&input.collection, input.id)
2816 .as_ref()
2817 .map(entity_row_fields_snapshot)
2818 .unwrap_or_default();
2819 let deleted = store
2823 .delete(&input.collection, input.id)
2824 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2825 if deleted {
2826 store.context_index().remove_entity(input.id);
2827 if !pre_delete_fields.is_empty() {
2830 self.index_store_ref()
2831 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2832 .map_err(crate::RedDBError::Internal)?;
2833 }
2834 self.cdc_emit(
2835 crate::replication::cdc::ChangeOperation::Delete,
2836 &input.collection,
2837 input.id.raw(),
2838 "entity",
2839 );
2840 }
2841 Ok(DeleteEntityOutput {
2842 deleted,
2843 id: input.id,
2844 })
2845 }
2846}