1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 context_index_fields: Vec::new(),
112 declared_columns: Vec::new(),
113 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
114 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
115 timestamps_enabled: false,
116 context_index_enabled: false,
117 append_only: false,
120 subscriptions: Vec::new(),
121 })
122 .map(|_| ())
123 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
124}
125
126fn implicit_contract_unix_ms() -> u128 {
127 std::time::SystemTime::now()
128 .duration_since(std::time::UNIX_EPOCH)
129 .unwrap_or_default()
130 .as_millis()
131}
132
133fn collection_model_allows(
134 declared_model: crate::catalog::CollectionModel,
135 requested_model: crate::catalog::CollectionModel,
136) -> bool {
137 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
138}
139
140fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
141 match model {
142 crate::catalog::CollectionModel::Table => "table",
143 crate::catalog::CollectionModel::Document => "document",
144 crate::catalog::CollectionModel::Graph => "graph",
145 crate::catalog::CollectionModel::Vector => "vector",
146 crate::catalog::CollectionModel::Kv => "kv",
147 crate::catalog::CollectionModel::Config => "config",
148 crate::catalog::CollectionModel::Vault => "vault",
149 crate::catalog::CollectionModel::Mixed => "mixed",
150 crate::catalog::CollectionModel::TimeSeries => "timeseries",
151 crate::catalog::CollectionModel::Queue => "queue",
152 }
153}
154
155#[derive(Clone)]
156struct UniquenessRule {
157 name: String,
158 columns: Vec<String>,
159 primary_key: bool,
160}
161
162#[derive(Copy, Clone, PartialEq, Eq)]
163enum NormalizeMode {
164 Insert,
168 Update,
172}
173
174mod collection_contract_enforcement {
175 use super::*;
176
177 pub(super) struct CollectionContractWriteEnforcer<'a> {
178 db: &'a crate::storage::unified::devx::RedDB,
179 collection: &'a str,
180 }
181
182 impl<'a> CollectionContractWriteEnforcer<'a> {
183 pub(super) fn new(
184 db: &'a crate::storage::unified::devx::RedDB,
185 collection: &'a str,
186 ) -> Self {
187 Self { db, collection }
188 }
189
190 pub(super) fn ensure_model(
191 &self,
192 requested_model: crate::catalog::CollectionModel,
193 ) -> RedDBResult<()> {
194 ensure_collection_model_contract(self.db, self.collection, requested_model)
195 }
196
197 pub(super) fn normalize_insert_fields(
198 &self,
199 fields: Vec<(String, Value)>,
200 ) -> RedDBResult<Vec<(String, Value)>> {
201 normalize_row_fields_for_contract_with_mode(
202 self.db,
203 self.collection,
204 fields,
205 NormalizeMode::Insert,
206 )
207 }
208
209 pub(super) fn normalize_update_fields(
210 &self,
211 fields: Vec<(String, Value)>,
212 ) -> RedDBResult<Vec<(String, Value)>> {
213 normalize_row_fields_for_contract_with_mode(
214 self.db,
215 self.collection,
216 fields,
217 NormalizeMode::Update,
218 )
219 }
220
221 pub(super) fn enforce_row_uniqueness(
222 &self,
223 fields: &[(String, Value)],
224 exclude_id: Option<crate::storage::EntityId>,
225 ) -> RedDBResult<()> {
226 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
227 }
228
229 pub(super) fn enforce_batch_uniqueness(
230 &self,
231 rows: &[Vec<(String, Value)>],
232 ) -> RedDBResult<()> {
233 enforce_row_batch_uniqueness(self.db, self.collection, rows)
234 }
235
236 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
237 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
238 }
239 }
240}
241
242use collection_contract_enforcement::CollectionContractWriteEnforcer;
243
244fn normalize_row_fields_for_contract_with_mode(
245 db: &crate::storage::unified::devx::RedDB,
246 collection: &str,
247 fields: Vec<(String, Value)>,
248 mode: NormalizeMode,
249) -> RedDBResult<Vec<(String, Value)>> {
250 let Some(contract) = db.collection_contract(collection) else {
251 return Ok(fields);
252 };
253
254 if contract.declared_model != crate::catalog::CollectionModel::Table
255 || (contract.declared_columns.is_empty()
256 && contract
257 .table_def
258 .as_ref()
259 .map(|table| table.columns.is_empty())
260 .unwrap_or(true))
261 {
262 return Ok(fields);
263 }
264
265 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
275 fields
276 .iter()
277 .find(|(n, _)| n == "created_at")
278 .map(|(_, v)| v.clone())
279 } else {
280 None
281 };
282
283 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
288 for (name, _) in &fields {
289 if name == "created_at" || name == "updated_at" {
290 return Err(crate::RedDBError::Query(format!(
291 "collection '{}' manages '{}' automatically — do not set it in INSERT",
292 collection, name
293 )));
294 }
295 }
296 }
297
298 let mut provided = std::collections::BTreeMap::new();
299 for (name, value) in &fields {
300 provided.insert(name.clone(), value.clone());
301 }
302
303 let resolved_columns = resolved_contract_columns(&contract)?;
304 let declared_names: std::collections::BTreeSet<String> = resolved_columns
305 .iter()
306 .map(|column| column.name.clone())
307 .collect();
308 let unknown_fields: Vec<String> = fields
309 .iter()
310 .filter(|(name, _)| !declared_names.contains(name))
311 .map(|(name, _)| name.clone())
312 .collect();
313 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
314 && !unknown_fields.is_empty()
315 {
316 return Err(crate::RedDBError::Query(format!(
317 "collection '{}' is strict and does not allow undeclared fields: {}",
318 collection,
319 unknown_fields.join(", ")
320 )));
321 }
322 let mut normalized = Vec::new();
323 let now_ms = current_unix_ms_u64();
324
325 for column in &resolved_columns {
326 match provided.remove(&column.name) {
327 Some(value) => {
328 if contract.timestamps_enabled && mode == NormalizeMode::Update {
333 match column.name.as_str() {
334 "created_at" => {
335 normalized.push((
336 column.name.clone(),
337 existing_created_at
338 .clone()
339 .unwrap_or(Value::UnsignedInteger(now_ms)),
340 ));
341 continue;
342 }
343 "updated_at" => {
344 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
345 continue;
346 }
347 _ => {}
348 }
349 }
350 normalized.push((
351 column.name.clone(),
352 normalize_contract_value(collection, column, value)?,
353 ));
354 }
355 None => {
356 if contract.timestamps_enabled
360 && (column.name == "created_at" || column.name == "updated_at")
361 {
362 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
363 continue;
364 }
365 if let Some(default) = &column.default {
366 normalized.push((
367 column.name.clone(),
368 coerce_contract_literal(collection, &column.name, column, default)?,
369 ));
370 } else if column.not_null {
371 return Err(crate::RedDBError::Query(format!(
372 "missing required column '{}' for collection '{}'",
373 column.name, collection
374 )));
375 }
376 }
377 }
378 }
379
380 for (name, value) in fields {
381 if !declared_names.contains(&name) {
382 normalized.push((name, value));
383 }
384 }
385
386 Ok(normalized)
387}
388
389fn current_unix_ms_u64() -> u64 {
390 std::time::SystemTime::now()
391 .duration_since(std::time::UNIX_EPOCH)
392 .map(|d| d.as_millis() as u64)
393 .unwrap_or(0)
394}
395
396fn enforce_row_uniqueness(
397 db: &crate::storage::unified::devx::RedDB,
398 collection: &str,
399 fields: &[(String, Value)],
400 exclude_id: Option<crate::storage::EntityId>,
401) -> RedDBResult<()> {
402 let Some(contract) = db.collection_contract(collection) else {
403 return Ok(());
404 };
405 if contract.declared_model != crate::catalog::CollectionModel::Table
406 && contract.declared_model != crate::catalog::CollectionModel::Mixed
407 {
408 return Ok(());
409 }
410
411 let rules = resolved_uniqueness_rules(&contract);
412 if rules.is_empty() {
413 return Ok(());
414 }
415
416 let store = db.store();
417 let Some(manager) = store.get_collection(collection) else {
418 return Ok(());
419 };
420
421 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
422
423 for rule in &rules {
424 let mut expected_signatures = Vec::new();
425 let mut skip_rule = false;
426
427 for column in &rule.columns {
428 match input_fields.get(column) {
429 Some(Value::Null) | None if rule.primary_key => {
430 return Err(crate::RedDBError::Query(format!(
431 "primary key '{}' in collection '{}' requires non-null column '{}'",
432 rule.name, collection, column
433 )))
434 }
435 Some(Value::Null) | None => {
436 skip_rule = true;
437 break;
438 }
439 Some(value) => {
440 expected_signatures.push((column.clone(), value_signature(value)));
441 }
442 }
443 }
444
445 if skip_rule {
446 continue;
447 }
448
449 for entity in manager.query_all(|_| true) {
450 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
451 continue;
452 }
453 let Some(existing_fields) = row_fields_from_entity(&entity) else {
454 continue;
455 };
456
457 let duplicate = expected_signatures.iter().all(|(column, expected)| {
458 existing_fields
459 .get(column)
460 .map(|value| value_signature(value) == *expected)
461 .unwrap_or(false)
462 });
463
464 if duplicate {
465 let qualifier = if rule.primary_key {
466 "primary key"
467 } else {
468 "unique constraint"
469 };
470 return Err(crate::RedDBError::Query(format!(
471 "{} '{}' violated on collection '{}' for columns [{}]",
472 qualifier,
473 rule.name,
474 collection,
475 rule.columns.join(", ")
476 )));
477 }
478 }
479 }
480
481 Ok(())
482}
483
484fn enforce_row_batch_uniqueness(
485 db: &crate::storage::unified::devx::RedDB,
486 collection: &str,
487 rows: &[Vec<(String, Value)>],
488) -> RedDBResult<()> {
489 let Some(contract) = db.collection_contract(collection) else {
490 return Ok(());
491 };
492 if contract.declared_model != crate::catalog::CollectionModel::Table
493 && contract.declared_model != crate::catalog::CollectionModel::Mixed
494 {
495 return Ok(());
496 }
497
498 let rules = resolved_uniqueness_rules(&contract);
499 if rules.is_empty() {
500 return Ok(());
501 }
502
503 for rule in &rules {
504 let mut seen = std::collections::HashMap::<String, usize>::new();
505 for (row_index, fields) in rows.iter().enumerate() {
506 let input_fields: std::collections::BTreeMap<String, Value> =
507 fields.iter().cloned().collect();
508 let mut signatures = Vec::new();
509 let mut skip_rule = false;
510
511 for column in &rule.columns {
512 match input_fields.get(column) {
513 Some(Value::Null) | None if rule.primary_key => {
514 return Err(crate::RedDBError::Query(format!(
515 "primary key '{}' in collection '{}' requires non-null column '{}'",
516 rule.name, collection, column
517 )))
518 }
519 Some(Value::Null) | None => {
520 skip_rule = true;
521 break;
522 }
523 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
524 }
525 }
526
527 if skip_rule {
528 continue;
529 }
530
531 let signature = signatures.join("|");
532 if let Some(previous_index) = seen.insert(signature, row_index) {
533 return Err(crate::RedDBError::Query(format!(
534 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
535 rule.name,
536 collection,
537 previous_index + 1,
538 row_index + 1
539 )));
540 }
541 }
542 }
543
544 Ok(())
545}
546
547fn row_update_requires_uniqueness_check(
548 db: &crate::storage::unified::devx::RedDB,
549 collection: &str,
550 modified_columns: &[String],
551) -> bool {
552 if modified_columns.is_empty() {
553 return false;
554 }
555
556 let Some(contract) = db.collection_contract(collection) else {
557 return false;
558 };
559 if contract.declared_model != crate::catalog::CollectionModel::Table
560 && contract.declared_model != crate::catalog::CollectionModel::Mixed
561 {
562 return false;
563 }
564
565 let rules = resolved_uniqueness_rules(&contract);
566 if rules.is_empty() {
567 return false;
568 }
569
570 rules.iter().any(|rule| {
571 rule.columns.iter().any(|column| {
572 modified_columns
573 .iter()
574 .any(|modified| modified.eq_ignore_ascii_case(column))
575 })
576 })
577}
578
579pub(crate) fn build_row_update_contract_plan(
580 db: &crate::storage::unified::devx::RedDB,
581 collection: &str,
582) -> RedDBResult<Option<RowUpdateContractPlan>> {
583 let Some(contract) = db.collection_contract(collection) else {
584 return Ok(None);
585 };
586
587 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
588 && !(contract.declared_columns.is_empty()
589 && contract
590 .table_def
591 .as_ref()
592 .map(|table| table.columns.is_empty())
593 .unwrap_or(true))
594 {
595 resolved_contract_columns(&contract)?
596 .into_iter()
597 .map(|rule| {
598 (
599 rule.name.clone(),
600 RowUpdateColumnRule {
601 name: rule.name,
602 data_type: rule.data_type,
603 data_type_name: rule.data_type_name,
604 not_null: rule.not_null,
605 enum_variants: rule.enum_variants,
606 },
607 )
608 })
609 .collect()
610 } else {
611 HashMap::new()
612 };
613
614 let unique_columns = if matches!(
615 contract.declared_model,
616 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
617 ) {
618 resolved_uniqueness_rules(&contract)
619 .into_iter()
620 .flat_map(|rule| rule.columns.into_iter())
621 .map(|column| (column, ()))
622 .collect()
623 } else {
624 HashMap::new()
625 };
626
627 Ok(Some(RowUpdateContractPlan {
628 timestamps_enabled: contract.timestamps_enabled,
629 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
630 declared_rules,
631 unique_columns,
632 }))
633}
634
635pub(crate) fn normalize_row_update_assignment_with_plan(
636 collection: &str,
637 column: &str,
638 value: Value,
639 row_contract_plan: Option<&RowUpdateContractPlan>,
640) -> RedDBResult<Value> {
641 let Some(plan) = row_contract_plan else {
642 return Ok(value);
643 };
644
645 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
646 return Err(crate::RedDBError::Query(format!(
647 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
648 collection, column
649 )));
650 }
651
652 if let Some(rule) = plan.declared_rules.get(column) {
653 let rule = ResolvedColumnRule {
654 name: rule.name.clone(),
655 data_type: rule.data_type,
656 data_type_name: rule.data_type_name.clone(),
657 not_null: rule.not_null,
658 default: None,
659 enum_variants: rule.enum_variants.clone(),
660 };
661 normalize_contract_value(collection, &rule, value)
662 } else if plan.strict_schema {
663 Err(crate::RedDBError::Query(format!(
664 "collection '{}' is strict and does not allow undeclared fields: {}",
665 collection, column
666 )))
667 } else {
668 Ok(value)
669 }
670}
671
672pub(crate) fn normalize_row_update_value_for_rule(
673 collection: &str,
674 value: Value,
675 row_rule: Option<&RowUpdateColumnRule>,
676) -> RedDBResult<Value> {
677 let Some(rule) = row_rule else {
678 return Ok(value);
679 };
680
681 let rule = ResolvedColumnRule {
682 name: rule.name.clone(),
683 data_type: rule.data_type,
684 data_type_name: rule.data_type_name.clone(),
685 not_null: rule.not_null,
686 default: None,
687 enum_variants: rule.enum_variants.clone(),
688 };
689 normalize_contract_value(collection, &rule, value)
690}
691
692fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
693 if let Some(named) = row.named.as_mut() {
694 named.insert(name.to_string(), value);
695 return;
696 }
697
698 if let Some(schema) = row.schema.as_ref() {
699 if let Some(index) = schema.iter().position(|column| column == name) {
700 if let Some(slot) = row.columns.get_mut(index) {
701 *slot = value;
702 return;
703 }
704 }
705
706 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
707 for (column, current) in schema.iter().zip(row.columns.iter()) {
708 named.insert(column.clone(), current.clone());
709 }
710 named.insert(name.to_string(), value);
711 row.named = Some(named);
712 return;
713 }
714
715 let mut named = HashMap::with_capacity(1);
716 named.insert(name.to_string(), value);
717 row.named = Some(named);
718}
719
720fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
721 if let Some(named) = row.named.as_ref() {
722 named
723 .iter()
724 .map(|(key, value)| (key.clone(), value.clone()))
725 .collect()
726 } else if let Some(schema) = row.schema.as_ref() {
727 schema
728 .iter()
729 .cloned()
730 .zip(row.columns.iter().cloned())
731 .collect()
732 } else {
733 Vec::new()
734 }
735}
736
737fn apply_row_field_assignments_raw<I>(
738 row: &mut crate::storage::unified::entity::RowData,
739 field_assignments: I,
740) where
741 I: IntoIterator<Item = (String, Value)>,
742{
743 for (column, value) in field_assignments {
744 set_row_field(row, &column, value);
745 }
746}
747
748fn apply_row_field_assignments_incremental<I>(
749 collection: &str,
750 row: &mut crate::storage::unified::entity::RowData,
751 field_assignments: I,
752 row_contract_plan: Option<&RowUpdateContractPlan>,
753) -> RedDBResult<()>
754where
755 I: IntoIterator<Item = (String, Value)>,
756{
757 for (column, value) in field_assignments {
758 let value = normalize_row_update_assignment_with_plan(
759 collection,
760 &column,
761 value,
762 row_contract_plan,
763 )?;
764
765 set_row_field(row, &column, value);
766 }
767
768 Ok(())
769}
770
771fn resolved_uniqueness_rules(
772 contract: &crate::physical::CollectionContract,
773) -> Vec<UniquenessRule> {
774 let mut rules = Vec::new();
775
776 if let Some(table_def) = &contract.table_def {
777 if !table_def.primary_key.is_empty() {
778 rules.push(UniquenessRule {
779 name: "primary_key".to_string(),
780 columns: table_def.primary_key.clone(),
781 primary_key: true,
782 });
783 }
784
785 for constraint in &table_def.constraints {
786 if matches!(
787 constraint.constraint_type,
788 crate::storage::schema::ConstraintType::PrimaryKey
789 ) && !constraint.columns.is_empty()
790 {
791 rules.push(UniquenessRule {
792 name: constraint.name.clone(),
793 columns: constraint.columns.clone(),
794 primary_key: true,
795 });
796 } else if matches!(
797 constraint.constraint_type,
798 crate::storage::schema::ConstraintType::Unique
799 ) && !constraint.columns.is_empty()
800 {
801 rules.push(UniquenessRule {
802 name: constraint.name.clone(),
803 columns: constraint.columns.clone(),
804 primary_key: false,
805 });
806 }
807 }
808 } else {
809 for column in &contract.declared_columns {
810 if column.primary_key {
811 rules.push(UniquenessRule {
812 name: format!("pk_{}", column.name),
813 columns: vec![column.name.clone()],
814 primary_key: true,
815 });
816 } else if column.unique {
817 rules.push(UniquenessRule {
818 name: format!("uniq_{}", column.name),
819 columns: vec![column.name.clone()],
820 primary_key: false,
821 });
822 }
823 }
824 }
825
826 let mut dedup = std::collections::BTreeSet::new();
827 rules
828 .into_iter()
829 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
830 .collect()
831}
832
833fn row_fields_from_entity(
834 entity: &crate::storage::UnifiedEntity,
835) -> Option<std::collections::BTreeMap<String, Value>> {
836 match &entity.data {
837 crate::storage::EntityData::Row(row) => {
838 if let Some(named) = &row.named {
839 Some(
840 named
841 .iter()
842 .map(|(key, value)| (key.clone(), value.clone()))
843 .collect(),
844 )
845 } else {
846 row.schema.as_ref().map(|schema| {
847 schema
848 .iter()
849 .cloned()
850 .zip(row.columns.iter().cloned())
851 .collect()
852 })
853 }
854 }
855 _ => None,
856 }
857}
858
859fn value_signature(value: &Value) -> String {
860 format!("{value:?}")
861}
862
863fn normalize_contract_value(
864 collection: &str,
865 column: &ResolvedColumnRule,
866 value: Value,
867) -> RedDBResult<Value> {
868 if matches!(value, Value::Null) {
869 if column.not_null {
870 return Err(crate::RedDBError::Query(format!(
871 "column '{}' in collection '{}' cannot be null",
872 column.name, collection
873 )));
874 }
875 return Ok(Value::Null);
876 }
877
878 let target = column.data_type;
879 if value_matches_declared_type(&value, target) {
880 return Ok(value);
881 }
882
883 let Some(raw) = value_to_coercion_input(&value) else {
884 return Err(crate::RedDBError::Query(format!(
885 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
886 column.name, collection, column.data_type_name
887 )));
888 };
889
890 coerce_contract_literal(collection, &column.name, column, &raw)
891}
892
893fn coerce_contract_literal(
894 collection: &str,
895 column_name: &str,
896 column: &ResolvedColumnRule,
897 raw: &str,
898) -> RedDBResult<Value> {
899 let target = column.data_type;
900 match target {
901 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
902 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
903 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
904 crate::RedDBError::Query(format!(
905 "failed to coerce column '{}' in collection '{}' to '{}': {}",
906 column_name, collection, column.data_type_name, err
907 ))
908 }),
909 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
910 crate::RedDBError::Query(format!(
911 "failed to coerce column '{}' in collection '{}' to '{}': {}",
912 column_name, collection, column.data_type_name, err
913 ))
914 }),
915 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
916 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
917 column_name, collection, column.data_type_name
918 ))),
919 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
920 |err| {
921 crate::RedDBError::Query(format!(
922 "failed to coerce column '{}' in collection '{}' to '{}': {}",
923 column_name, collection, column.data_type_name, err
924 ))
925 },
926 ),
927 }
928}
929
930struct ResolvedColumnRule {
931 name: String,
932 data_type: DataType,
933 data_type_name: String,
934 not_null: bool,
935 default: Option<String>,
936 enum_variants: Vec<String>,
937}
938
939fn resolved_contract_columns(
940 contract: &crate::physical::CollectionContract,
941) -> RedDBResult<Vec<ResolvedColumnRule>> {
942 if let Some(table_def) = &contract.table_def {
943 return Ok(table_def
944 .columns
945 .iter()
946 .map(|column| ResolvedColumnRule {
947 name: column.name.clone(),
948 data_type: column.data_type,
949 data_type_name: data_type_name(column.data_type).to_string(),
950 not_null: !column.nullable,
951 default: column
952 .default
953 .as_ref()
954 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
955 enum_variants: column.enum_variants.clone(),
956 })
957 .collect());
958 }
959
960 contract
961 .declared_columns
962 .iter()
963 .map(|column| {
964 let data_type = column
965 .sql_type
966 .as_ref()
967 .map(crate::storage::query::resolve_sql_type_name)
968 .transpose()
969 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
970 .unwrap_or(parse_declared_data_type(&column.data_type)?);
971 Ok(ResolvedColumnRule {
972 name: column.name.clone(),
973 data_type,
974 data_type_name: column.data_type.clone(),
975 not_null: column.not_null,
976 default: column.default.clone(),
977 enum_variants: column.enum_variants.clone(),
978 })
979 })
980 .collect()
981}
982
983fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
984 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
985}
986
987fn data_type_name(data_type: DataType) -> &'static str {
988 match data_type {
989 DataType::Integer => "integer",
990 DataType::UnsignedInteger => "unsigned_integer",
991 DataType::Float => "float",
992 DataType::Text => "text",
993 DataType::Blob => "blob",
994 DataType::Boolean => "boolean",
995 DataType::Timestamp => "timestamp",
996 DataType::Duration => "duration",
997 DataType::IpAddr => "ipaddr",
998 DataType::MacAddr => "macaddr",
999 DataType::Vector => "vector",
1000 DataType::Nullable => "nullable",
1001 DataType::Unknown => "unknown",
1002 DataType::Json => "json",
1003 DataType::Uuid => "uuid",
1004 DataType::NodeRef => "noderef",
1005 DataType::EdgeRef => "edgeref",
1006 DataType::VectorRef => "vectorref",
1007 DataType::RowRef => "rowref",
1008 DataType::Color => "color",
1009 DataType::Email => "email",
1010 DataType::Url => "url",
1011 DataType::Phone => "phone",
1012 DataType::Semver => "semver",
1013 DataType::Cidr => "cidr",
1014 DataType::Date => "date",
1015 DataType::Time => "time",
1016 DataType::Decimal => "decimal",
1017 DataType::Enum => "enum",
1018 DataType::Array => "array",
1019 DataType::TimestampMs => "timestamp_ms",
1020 DataType::Ipv4 => "ipv4",
1021 DataType::Ipv6 => "ipv6",
1022 DataType::Subnet => "subnet",
1023 DataType::Port => "port",
1024 DataType::Latitude => "latitude",
1025 DataType::Longitude => "longitude",
1026 DataType::GeoPoint => "geopoint",
1027 DataType::Country2 => "country2",
1028 DataType::Country3 => "country3",
1029 DataType::Lang2 => "lang2",
1030 DataType::Lang5 => "lang5",
1031 DataType::Currency => "currency",
1032 DataType::AssetCode => "asset_code",
1033 DataType::Money => "money",
1034 DataType::ColorAlpha => "color_alpha",
1035 DataType::BigInt => "bigint",
1036 DataType::KeyRef => "keyref",
1037 DataType::DocRef => "docref",
1038 DataType::TableRef => "tableref",
1039 DataType::PageRef => "pageref",
1040 DataType::Secret => "secret",
1041 DataType::Password => "password",
1042 DataType::TextZstd => "text",
1043 DataType::BlobZstd => "blob",
1044 }
1045}
1046
1047fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1048 matches!(
1049 (value, target),
1050 (Value::Null, _)
1051 | (Value::Integer(_), DataType::Integer)
1052 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1053 | (Value::Float(_), DataType::Float)
1054 | (Value::Text(_), DataType::Text)
1055 | (Value::Blob(_), DataType::Blob)
1056 | (Value::Boolean(_), DataType::Boolean)
1057 | (Value::Timestamp(_), DataType::Timestamp)
1058 | (Value::Duration(_), DataType::Duration)
1059 | (Value::IpAddr(_), DataType::IpAddr)
1060 | (Value::MacAddr(_), DataType::MacAddr)
1061 | (Value::Vector(_), DataType::Vector)
1062 | (Value::Json(_), DataType::Json)
1063 | (Value::Uuid(_), DataType::Uuid)
1064 | (Value::NodeRef(_), DataType::NodeRef)
1065 | (Value::EdgeRef(_), DataType::EdgeRef)
1066 | (Value::VectorRef(_, _), DataType::VectorRef)
1067 | (Value::RowRef(_, _), DataType::RowRef)
1068 | (Value::Color(_), DataType::Color)
1069 | (Value::Email(_), DataType::Email)
1070 | (Value::Url(_), DataType::Url)
1071 | (Value::Phone(_), DataType::Phone)
1072 | (Value::Semver(_), DataType::Semver)
1073 | (Value::Cidr(_, _), DataType::Cidr)
1074 | (Value::Date(_), DataType::Date)
1075 | (Value::Time(_), DataType::Time)
1076 | (Value::Decimal(_), DataType::Decimal)
1077 | (Value::EnumValue(_), DataType::Enum)
1078 | (Value::Array(_), DataType::Array)
1079 | (Value::TimestampMs(_), DataType::TimestampMs)
1080 | (Value::Ipv4(_), DataType::Ipv4)
1081 | (Value::Ipv6(_), DataType::Ipv6)
1082 | (Value::Subnet(_, _), DataType::Subnet)
1083 | (Value::Port(_), DataType::Port)
1084 | (Value::Latitude(_), DataType::Latitude)
1085 | (Value::Longitude(_), DataType::Longitude)
1086 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1087 | (Value::Country2(_), DataType::Country2)
1088 | (Value::Country3(_), DataType::Country3)
1089 | (Value::Lang2(_), DataType::Lang2)
1090 | (Value::Lang5(_), DataType::Lang5)
1091 | (Value::Currency(_), DataType::Currency)
1092 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1093 | (Value::BigInt(_), DataType::BigInt)
1094 | (Value::KeyRef(_, _), DataType::KeyRef)
1095 | (Value::DocRef(_, _), DataType::DocRef)
1096 | (Value::TableRef(_), DataType::TableRef)
1097 | (Value::PageRef(_), DataType::PageRef)
1098 | (Value::Secret(_), DataType::Secret)
1099 | (Value::Password(_), DataType::Password)
1100 )
1101}
1102
1103fn value_to_coercion_input(value: &Value) -> Option<String> {
1104 match value {
1105 Value::Null => None,
1106 Value::Integer(value) => Some(value.to_string()),
1107 Value::UnsignedInteger(value) => Some(value.to_string()),
1108 Value::Float(value) => Some(value.to_string()),
1109 Value::Text(value) => Some(value.to_string()),
1110 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1111 Value::Boolean(value) => Some(value.to_string()),
1112 Value::Timestamp(value) => Some(value.to_string()),
1113 Value::Duration(value) => Some(value.to_string()),
1114 Value::IpAddr(value) => Some(value.to_string()),
1115 Value::MacAddr(value) => Some(format!(
1116 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1117 value[0], value[1], value[2], value[3], value[4], value[5]
1118 )),
1119 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1120 Value::Email(value) => Some(value.clone()),
1121 Value::Url(value) => Some(value.clone()),
1122 Value::Phone(value) => Some(value.to_string()),
1123 Value::Semver(value) => Some(format!(
1124 "{}.{}.{}",
1125 value / 1_000_000,
1126 (value / 1_000) % 1_000,
1127 value % 1_000
1128 )),
1129 Value::Date(value) => Some(value.to_string()),
1130 Value::Time(value) => Some(value.to_string()),
1131 Value::Decimal(value) => Some(value.to_string()),
1132 Value::TimestampMs(value) => Some(value.to_string()),
1133 Value::Ipv4(value) => Some(format!(
1134 "{}.{}.{}.{}",
1135 (value >> 24) & 0xFF,
1136 (value >> 16) & 0xFF,
1137 (value >> 8) & 0xFF,
1138 value & 0xFF
1139 )),
1140 Value::Port(value) => Some(value.to_string()),
1141 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1142 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1143 Value::GeoPoint(lat, lon) => Some(format!(
1144 "{},{}",
1145 *lat as f64 / 1_000_000.0,
1146 *lon as f64 / 1_000_000.0
1147 )),
1148 Value::BigInt(value) => Some(value.to_string()),
1149 Value::TableRef(value) => Some(value.clone()),
1150 Value::PageRef(value) => Some(value.to_string()),
1151 Value::Password(value) => Some(value.clone()),
1152 _ => None,
1153 }
1154}
1155
1156fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1157 if modified_columns.is_empty() {
1158 return modified_columns;
1159 }
1160
1161 let mut unique = Vec::with_capacity(modified_columns.len());
1162 for column in modified_columns.drain(..) {
1163 if !unique
1164 .iter()
1165 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1166 {
1167 unique.push(column);
1168 }
1169 }
1170 unique
1171}
1172
1173impl RedDBRuntime {
1174 pub(crate) fn apply_loaded_patch_entity_core(
1175 &self,
1176 collection: String,
1177 mut entity: crate::storage::UnifiedEntity,
1178 payload: JsonValue,
1179 operations: Vec<PatchEntityOperation>,
1180 ) -> RedDBResult<AppliedEntityMutation> {
1181 let id = entity.id;
1182 let operations = normalize_ttl_patch_operations(operations)?;
1183 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1186
1187 let db = self.db();
1188 let store = db.store();
1189 let Some(manager) = store.get_collection(&collection) else {
1190 return Err(crate::RedDBError::NotFound(format!(
1191 "collection not found: {collection}"
1192 )));
1193 };
1194
1195 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1196 let mut metadata_changed = false;
1197 let mut modified_columns: Vec<String> = Vec::new();
1198 let mut context_index_dirty = false;
1199
1200 let row_contract_timestamps = db
1201 .collection_contract(&collection)
1202 .map(|c| c.timestamps_enabled)
1203 .unwrap_or(false);
1204
1205 match &mut entity.data {
1206 crate::storage::EntityData::Row(row) => {
1207 let mut field_ops = Vec::new();
1208 let mut metadata_ops = Vec::new();
1209
1210 for mut op in operations {
1211 let Some(root) = op.path.first().map(String::as_str) else {
1212 return Err(crate::RedDBError::Query(
1213 "patch path cannot be empty".to_string(),
1214 ));
1215 };
1216
1217 match root {
1218 "fields" | "named" => {
1219 if op.path.len() < 2 {
1220 return Err(crate::RedDBError::Query(
1221 "patch path 'fields' requires a nested key".to_string(),
1222 ));
1223 }
1224 if row_contract_timestamps {
1225 let leaf = op.path.get(1).map(String::as_str);
1226 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1227 return Err(crate::RedDBError::Query(format!(
1228 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1229 collection,
1230 leaf.unwrap_or("")
1231 )));
1232 }
1233 }
1234 op.path.remove(0);
1235 field_ops.push(op);
1236 }
1237 "metadata" => {
1238 if op.path.len() < 2 {
1239 return Err(crate::RedDBError::Query(
1240 "patch path 'metadata' requires a nested key".to_string(),
1241 ));
1242 }
1243 op.path.remove(0);
1244 metadata_ops.push(op);
1245 }
1246 _ => {
1247 return Err(crate::RedDBError::Query(format!(
1248 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1249 )));
1250 }
1251 }
1252 }
1253
1254 if !field_ops.is_empty() {
1255 context_index_dirty = true;
1256 for op in &field_ops {
1257 if let Some(col) = op.path.first() {
1258 modified_columns.push(col.clone());
1259 }
1260 }
1261 let named = row.named.get_or_insert_with(Default::default);
1262 apply_patch_operations_to_storage_map(named, &field_ops)?;
1263 }
1264
1265 if let Some(fields) = payload
1266 .get("fields")
1267 .and_then(crate::json::Value::as_object)
1268 {
1269 if row_contract_timestamps {
1270 for key in fields.keys() {
1271 if key == "created_at" || key == "updated_at" {
1272 return Err(crate::RedDBError::Query(format!(
1273 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1274 collection, key
1275 )));
1276 }
1277 }
1278 }
1279 context_index_dirty = true;
1280 let named = row.named.get_or_insert_with(Default::default);
1281 for (key, value) in fields {
1282 modified_columns.push(key.clone());
1283 named.insert(key.clone(), json_to_storage_value(value)?);
1284 }
1285 }
1286
1287 if !metadata_ops.is_empty() {
1288 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1289 let metadata = patch_metadata.get_or_insert_with(|| {
1290 store.get_metadata(&collection, id).unwrap_or_default()
1291 });
1292 let mut metadata_json = metadata_to_json(metadata);
1293 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1294 .map_err(crate::RedDBError::Query)?;
1295 *metadata = metadata_from_json(&metadata_json)?;
1296 metadata_changed = true;
1297 }
1298
1299 if !modified_columns.is_empty() || row_contract_timestamps {
1300 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1301 let current_fields = if let Some(named) = row.named.take() {
1302 named.into_iter().collect::<Vec<_>>()
1303 } else if let Some(schema) = row.schema.as_ref() {
1304 schema
1305 .iter()
1306 .cloned()
1307 .zip(row.columns.iter().cloned())
1308 .collect::<Vec<_>>()
1309 } else {
1310 Vec::new()
1311 };
1312 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1313 if row_contract_timestamps {
1314 modified_columns.push("updated_at".to_string());
1315 context_index_dirty = true;
1316 }
1317 if contract.requires_uniqueness_check(&modified_columns) {
1318 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1319 }
1320 row.named = Some(normalized_fields.into_iter().collect());
1321 }
1322 }
1323 crate::storage::EntityData::Node(node) => {
1324 let mut field_ops = Vec::new();
1325 let mut metadata_ops = Vec::new();
1326
1327 for mut op in operations {
1328 let Some(root) = op.path.first().map(String::as_str) else {
1329 return Err(crate::RedDBError::Query(
1330 "patch path cannot be empty".to_string(),
1331 ));
1332 };
1333
1334 match root {
1335 "fields" | "properties" => {
1336 if op.path.len() < 2 {
1337 return Err(crate::RedDBError::Query(
1338 "patch path 'fields' requires a nested key".to_string(),
1339 ));
1340 }
1341 op.path.remove(0);
1342 field_ops.push(op);
1343 }
1344 "metadata" => {
1345 if op.path.len() < 2 {
1346 return Err(crate::RedDBError::Query(
1347 "patch path 'metadata' requires a nested key".to_string(),
1348 ));
1349 }
1350 op.path.remove(0);
1351 metadata_ops.push(op);
1352 }
1353 _ => {
1354 return Err(crate::RedDBError::Query(format!(
1355 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, or metadata/*"
1356 )));
1357 }
1358 }
1359 }
1360
1361 if !field_ops.is_empty() {
1362 context_index_dirty = true;
1363 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1364 }
1365
1366 if let Some(fields) = payload
1367 .get("fields")
1368 .and_then(crate::json::Value::as_object)
1369 {
1370 context_index_dirty = true;
1371 for (key, value) in fields {
1372 node.properties
1373 .insert(key.clone(), json_to_storage_value(value)?);
1374 }
1375 }
1376
1377 if !metadata_ops.is_empty() {
1378 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1379 let metadata = patch_metadata.get_or_insert_with(|| {
1380 store.get_metadata(&collection, id).unwrap_or_default()
1381 });
1382 let mut metadata_json = metadata_to_json(metadata);
1383 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1384 .map_err(crate::RedDBError::Query)?;
1385 *metadata = metadata_from_json(&metadata_json)?;
1386 metadata_changed = true;
1387 }
1388 }
1389 crate::storage::EntityData::Edge(edge) => {
1390 let mut field_ops = Vec::new();
1391 let mut metadata_ops = Vec::new();
1392 let mut weight_ops = Vec::new();
1393
1394 for mut op in operations {
1395 let Some(root) = op.path.first().map(String::as_str) else {
1396 return Err(crate::RedDBError::Query(
1397 "patch path cannot be empty".to_string(),
1398 ));
1399 };
1400
1401 match root {
1402 "fields" | "properties" => {
1403 if op.path.len() < 2 {
1404 return Err(crate::RedDBError::Query(
1405 "patch path 'fields' requires a nested key".to_string(),
1406 ));
1407 }
1408 op.path.remove(0);
1409 field_ops.push(op);
1410 }
1411 "weight" => {
1412 if op.path.len() != 1 {
1413 return Err(crate::RedDBError::Query(
1414 "patch path 'weight' does not allow nested keys".to_string(),
1415 ));
1416 }
1417 op.path.clear();
1418 weight_ops.push(op);
1419 }
1420 "metadata" => {
1421 if op.path.len() < 2 {
1422 return Err(crate::RedDBError::Query(
1423 "patch path 'metadata' requires a nested key".to_string(),
1424 ));
1425 }
1426 op.path.remove(0);
1427 metadata_ops.push(op);
1428 }
1429 _ => {
1430 return Err(crate::RedDBError::Query(format!(
1431 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1432 )));
1433 }
1434 }
1435 }
1436
1437 if !field_ops.is_empty() {
1438 context_index_dirty = true;
1439 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1440 }
1441
1442 for op in weight_ops {
1443 context_index_dirty = true;
1444 let value = op.value.ok_or_else(|| {
1445 crate::RedDBError::Query("weight operations require a value".to_string())
1446 })?;
1447
1448 match op.op {
1449 PatchEntityOperationType::Unset => {
1450 return Err(crate::RedDBError::Query(
1451 "weight cannot be unset through patch operations".to_string(),
1452 ));
1453 }
1454 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1455 let Some(weight) = value.as_f64() else {
1456 return Err(crate::RedDBError::Query(
1457 "weight operation requires a numeric value".to_string(),
1458 ));
1459 };
1460 edge.weight = weight as f32;
1461 }
1462 }
1463 }
1464
1465 if let Some(fields) = payload
1466 .get("fields")
1467 .and_then(crate::json::Value::as_object)
1468 {
1469 context_index_dirty = true;
1470 for (key, value) in fields {
1471 edge.properties
1472 .insert(key.clone(), json_to_storage_value(value)?);
1473 }
1474 }
1475
1476 if !metadata_ops.is_empty() {
1477 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1478 let metadata = patch_metadata.get_or_insert_with(|| {
1479 store.get_metadata(&collection, id).unwrap_or_default()
1480 });
1481 let mut metadata_json = metadata_to_json(metadata);
1482 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1483 .map_err(crate::RedDBError::Query)?;
1484 *metadata = metadata_from_json(&metadata_json)?;
1485 metadata_changed = true;
1486 }
1487 }
1488 crate::storage::EntityData::Vector(vector) => {
1489 let mut field_ops = Vec::new();
1490 let mut metadata_ops = Vec::new();
1491
1492 for mut op in operations {
1493 let Some(root) = op.path.first().map(String::as_str) else {
1494 return Err(crate::RedDBError::Query(
1495 "patch path cannot be empty".to_string(),
1496 ));
1497 };
1498
1499 match root {
1500 "fields" => {
1501 if op.path.len() < 2 {
1502 return Err(crate::RedDBError::Query(
1503 "patch path 'fields' requires a nested key".to_string(),
1504 ));
1505 }
1506 op.path.remove(0);
1507 let Some(target) = op.path.first().map(String::as_str) else {
1508 return Err(crate::RedDBError::Query(
1509 "patch path requires a target under fields".to_string(),
1510 ));
1511 };
1512 if !matches!(target, "dense" | "content" | "sparse") {
1513 return Err(crate::RedDBError::Query(format!(
1514 "unsupported vector patch target '{target}'"
1515 )));
1516 }
1517 field_ops.push(op);
1518 }
1519 "metadata" => {
1520 if op.path.len() < 2 {
1521 return Err(crate::RedDBError::Query(
1522 "patch path 'metadata' requires a nested key".to_string(),
1523 ));
1524 }
1525 op.path.remove(0);
1526 metadata_ops.push(op);
1527 }
1528 _ => {
1529 return Err(crate::RedDBError::Query(format!(
1530 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1531 )));
1532 }
1533 }
1534 }
1535
1536 if !field_ops.is_empty() {
1537 context_index_dirty = true;
1538 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1539 }
1540
1541 if let Some(fields) = payload
1542 .get("fields")
1543 .and_then(crate::json::Value::as_object)
1544 {
1545 context_index_dirty = true;
1546 if let Some(content) =
1547 fields.get("content").and_then(crate::json::Value::as_str)
1548 {
1549 vector.content = Some(content.to_string());
1550 }
1551 if let Some(dense) = fields.get("dense") {
1552 vector.dense = dense
1553 .as_array()
1554 .ok_or_else(|| {
1555 crate::RedDBError::Query(
1556 "field 'dense' must be an array".to_string(),
1557 )
1558 })?
1559 .iter()
1560 .map(|value| {
1561 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1562 crate::RedDBError::Query(
1563 "field 'dense' must contain only numbers".to_string(),
1564 )
1565 })
1566 })
1567 .collect::<Result<Vec<_>, _>>()?;
1568 }
1569 }
1570
1571 if !metadata_ops.is_empty() {
1572 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1573 let metadata = patch_metadata.get_or_insert_with(|| {
1574 store.get_metadata(&collection, id).unwrap_or_default()
1575 });
1576 let mut metadata_json = metadata_to_json(metadata);
1577 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1578 .map_err(crate::RedDBError::Query)?;
1579 *metadata = metadata_from_json(&metadata_json)?;
1580 metadata_changed = true;
1581 }
1582 }
1583 crate::storage::EntityData::TimeSeries(_)
1584 | crate::storage::EntityData::QueueMessage(_) => {
1585 return Err(crate::RedDBError::Query(
1586 "patch operations are not supported for TimeSeries or QueueMessage entities"
1587 .to_string(),
1588 ));
1589 }
1590 }
1591
1592 if let Some(metadata) = payload
1593 .get("metadata")
1594 .and_then(crate::json::Value::as_object)
1595 {
1596 let patch_metadata = patch_metadata
1597 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1598 for (key, value) in metadata {
1599 ensure_non_tree_reserved_metadata_key(key)?;
1600 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1601 }
1602 metadata_changed = true;
1603 }
1604
1605 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1606 let patch_metadata = patch_metadata
1607 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1608 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1609 patch_metadata.remove(&key);
1610 } else {
1611 patch_metadata.set(key, value);
1612 }
1613 metadata_changed = true;
1614 }
1615
1616 entity.updated_at = std::time::SystemTime::now()
1617 .duration_since(std::time::UNIX_EPOCH)
1618 .unwrap_or_default()
1619 .as_secs();
1620
1621 modified_columns = dedupe_modified_columns(modified_columns);
1622
1623 Ok(AppliedEntityMutation {
1624 id,
1625 collection,
1626 entity,
1627 metadata: patch_metadata,
1628 modified_columns,
1629 persist_metadata: metadata_changed,
1630 context_index_dirty,
1631 pre_mutation_fields,
1632 })
1633 }
1634
1635 pub(crate) fn apply_loaded_sql_update_row_core(
1636 &self,
1637 collection: String,
1638 mut entity: crate::storage::UnifiedEntity,
1639 static_field_assignments: &[(String, Value)],
1640 dynamic_field_assignments: Vec<(String, Value)>,
1641 static_metadata_assignments: &[(String, MetadataValue)],
1642 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1643 row_contract_plan: Option<&RowUpdateContractPlan>,
1644 row_modified_columns_template: &[String],
1645 row_touches_unique_columns: bool,
1646 ) -> RedDBResult<AppliedEntityMutation> {
1647 let id = entity.id;
1648 let db = self.db();
1649 let store = db.store();
1650 let Some(_) = store.get_collection(&collection) else {
1651 return Err(crate::RedDBError::NotFound(format!(
1652 "collection not found: {collection}"
1653 )));
1654 };
1655
1656 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1657 let row_contract_timestamps = row_contract_plan
1658 .map(|plan| plan.timestamps_enabled)
1659 .unwrap_or(false);
1660 let mut metadata_changed = false;
1661 let mut modified_columns = row_modified_columns_template.to_vec();
1662 let mut context_index_dirty = !modified_columns.is_empty();
1663
1664 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1668
1669 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1670 return Err(crate::RedDBError::Query(
1671 "SQL row update fast path requires a row entity".to_string(),
1672 ));
1673 };
1674
1675 let _ = row_contract_plan;
1676 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1677 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1678
1679 for (key, value) in static_metadata_assignments
1680 .iter()
1681 .cloned()
1682 .chain(dynamic_metadata_assignments)
1683 {
1684 ensure_non_tree_reserved_metadata_key(&key)?;
1685 patch_metadata
1686 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1687 .set(key, value);
1688 metadata_changed = true;
1689 }
1690
1691 if !modified_columns.is_empty() || row_contract_timestamps {
1692 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1693 if row_contract_timestamps {
1694 context_index_dirty = true;
1695 set_row_field(
1696 row,
1697 "updated_at",
1698 Value::UnsignedInteger(current_unix_ms_u64()),
1699 );
1700 modified_columns.push("updated_at".to_string());
1701 }
1702 if row_touches_unique_columns {
1703 let current_fields = collect_row_fields(row);
1704 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1705 }
1706 }
1707
1708 entity.updated_at = std::time::SystemTime::now()
1709 .duration_since(std::time::UNIX_EPOCH)
1710 .unwrap_or_default()
1711 .as_secs();
1712
1713 modified_columns = dedupe_modified_columns(modified_columns);
1714
1715 Ok(AppliedEntityMutation {
1716 id,
1717 collection,
1718 entity,
1719 metadata: patch_metadata,
1720 modified_columns,
1721 persist_metadata: metadata_changed,
1722 context_index_dirty,
1723 pre_mutation_fields,
1724 })
1725 }
1726
1727 pub(crate) fn persist_applied_entity_mutations(
1728 &self,
1729 applied: &[AppliedEntityMutation],
1730 ) -> RedDBResult<()> {
1731 if applied.is_empty() {
1732 return Ok(());
1733 }
1734
1735 let store = self.db().store();
1736 let collection = &applied[0].collection;
1737 let Some(manager) = store.get_collection(collection) else {
1738 return Err(crate::RedDBError::NotFound(format!(
1739 "collection not found: {collection}"
1740 )));
1741 };
1742
1743 manager
1744 .update_hot_batch_with_metadata(applied.iter().map(|item| {
1745 (
1746 &item.entity,
1747 item.modified_columns.as_slice(),
1748 if item.persist_metadata {
1749 item.metadata.as_ref()
1750 } else {
1751 None
1752 },
1753 )
1754 }))
1755 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1756
1757 let indexed_cols = self
1766 .index_store_ref()
1767 .indexed_columns_set(collection.as_str());
1768 let all_hot = !indexed_cols.is_empty()
1769 && applied.iter().all(|item| {
1770 !item.persist_metadata
1771 && !item
1772 .modified_columns
1773 .iter()
1774 .any(|c| indexed_cols.contains(c))
1775 })
1776 || indexed_cols.is_empty() && applied.iter().all(|item| !item.persist_metadata);
1777
1778 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
1782 applied.iter().map(|item| &item.entity).collect();
1783 let persist_fn = if all_hot {
1784 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
1785 } else {
1786 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
1787 };
1788 persist_fn(store.as_ref(), collection, &entity_refs)
1789 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
1790 }
1791
1792 pub(crate) fn flush_applied_entity_mutation(
1793 &self,
1794 applied: &AppliedEntityMutation,
1795 ) -> RedDBResult<()> {
1796 let store = self.db().store();
1797 if applied.context_index_dirty {
1798 store
1799 .context_index()
1800 .index_entity(&applied.collection, &applied.entity);
1801 }
1802 let mut changed_columns: Option<Vec<String>> = None;
1810 if !applied.pre_mutation_fields.is_empty() {
1811 let post = entity_row_fields_snapshot(&applied.entity);
1812 if !post.is_empty() {
1813 let damage = crate::application::entity::row_damage_vector(
1814 &applied.pre_mutation_fields,
1815 &post,
1816 );
1817 if !damage.is_empty() {
1818 changed_columns = Some(
1819 damage
1820 .touched_columns()
1821 .into_iter()
1822 .map(str::to_string)
1823 .collect(),
1824 );
1825 }
1826
1827 let indexed_cols: std::collections::HashSet<String> = self
1836 .index_store_ref()
1837 .list_indices(applied.collection.as_str())
1838 .into_iter()
1839 .filter_map(|idx| idx.columns.first().cloned())
1840 .collect();
1841 let modified_cols: std::collections::HashSet<String> = damage
1842 .touched_columns()
1843 .into_iter()
1844 .map(str::to_string)
1845 .collect();
1846 let decision = crate::storage::engine::hot_update::decide(
1847 &crate::storage::engine::hot_update::HotUpdateInputs {
1848 collection: applied.collection.as_str(),
1849 indexed_columns: &indexed_cols,
1850 modified_columns: &modified_cols,
1851 new_tuple_size: 0,
1855 page_free_space: usize::MAX,
1856 },
1857 );
1858 if !decision.can_hot {
1859 self.index_store_ref()
1860 .index_entity_update(
1861 &applied.collection,
1862 applied.id,
1863 &applied.pre_mutation_fields,
1864 &post,
1865 )
1866 .map_err(crate::RedDBError::Internal)?;
1867 } else {
1868 tracing::debug!(
1872 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
1873 "hot_update fast-path: skipped index_entity_update"
1874 );
1875 }
1876 }
1877 }
1878 self.cdc_emit_prebuilt_with_columns(
1879 crate::replication::cdc::ChangeOperation::Update,
1880 &applied.collection,
1881 &applied.entity,
1882 "entity",
1883 applied.metadata.as_ref(),
1884 true,
1885 changed_columns,
1886 );
1887 Ok(())
1888 }
1889
1890 pub(crate) fn apply_loaded_patch_entity(
1891 &self,
1892 collection: String,
1893 entity: crate::storage::UnifiedEntity,
1894 payload: JsonValue,
1895 operations: Vec<PatchEntityOperation>,
1896 ) -> RedDBResult<CreateEntityOutput> {
1897 let applied =
1898 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
1899 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
1900 self.flush_applied_entity_mutation(&applied)?;
1901 Ok(CreateEntityOutput {
1902 id: applied.id,
1903 entity: Some(applied.entity),
1904 })
1905 }
1906}
1907
1908fn ensure_non_tree_reserved_metadata_patch_paths(
1909 operations: &[PatchEntityOperation],
1910) -> RedDBResult<()> {
1911 for operation in operations {
1912 let Some(key) = operation.path.first().map(String::as_str) else {
1913 continue;
1914 };
1915 ensure_non_tree_reserved_metadata_key(key)?;
1916 }
1917 Ok(())
1918}
1919
1920fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
1921 if key.starts_with(TREE_METADATA_PREFIX) {
1922 return Err(crate::RedDBError::Query(format!(
1923 "metadata key '{}' is reserved for managed trees",
1924 key
1925 )));
1926 }
1927 Ok(())
1928}
1929
1930fn ensure_non_tree_reserved_metadata_entries(
1931 metadata: &[(String, MetadataValue)],
1932) -> RedDBResult<()> {
1933 for (key, _) in metadata {
1934 ensure_non_tree_reserved_metadata_key(key)?;
1935 }
1936 Ok(())
1937}
1938
1939fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
1940 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
1941 return Err(crate::RedDBError::Query(format!(
1942 "edge label '{}' is reserved for managed trees",
1943 TREE_CHILD_EDGE_LABEL
1944 )));
1945 }
1946 Ok(())
1947}
1948
1949impl RedDBRuntime {
1950 pub(crate) fn create_node_unchecked(
1951 &self,
1952 input: CreateNodeInput,
1953 ) -> RedDBResult<CreateEntityOutput> {
1954 let db = self.db();
1955 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
1956 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
1957 let mut metadata = input.metadata;
1958 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
1959 let mut builder = db.node(&input.collection, &input.label);
1960
1961 if let Some(node_type) = input.node_type {
1962 builder = builder.node_type(node_type);
1963 }
1964
1965 for (key, value) in input.properties {
1966 builder = builder.property(key, value);
1967 }
1968
1969 for (key, value) in metadata {
1970 builder = builder.metadata(key, value);
1971 }
1972
1973 for embedding in input.embeddings {
1974 if let Some(model) = embedding.model {
1975 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
1976 } else {
1977 builder = builder.embedding(embedding.name, embedding.vector);
1978 }
1979 }
1980
1981 for link in input.table_links {
1982 builder = builder.link_to_table(link.key, link.table);
1983 }
1984
1985 for link in input.node_links {
1986 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
1987 }
1988
1989 let id = builder.save()?;
1990 self.stamp_xmin_if_in_txn(&input.collection, id);
1993 refresh_context_index(&db, &input.collection, id)?;
1994 self.cdc_emit(
1995 crate::replication::cdc::ChangeOperation::Insert,
1996 &input.collection,
1997 id.raw(),
1998 "graph_node",
1999 );
2000 Ok(CreateEntityOutput {
2001 id,
2002 entity: db.store().get(&input.collection, id),
2003 })
2004 }
2005
2006 pub(crate) fn create_edge_unchecked(
2007 &self,
2008 input: CreateEdgeInput,
2009 ) -> RedDBResult<CreateEntityOutput> {
2010 let db = self.db();
2011 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2012 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2013 let mut metadata = input.metadata;
2014 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2015 let mut builder = db
2016 .edge(&input.collection, &input.label)
2017 .from(input.from)
2018 .to(input.to);
2019
2020 if let Some(weight) = input.weight {
2021 builder = builder.weight(weight);
2022 }
2023
2024 for (key, value) in input.properties {
2025 builder = builder.property(key, value);
2026 }
2027
2028 for (key, value) in metadata {
2029 builder = builder.metadata(key, value);
2030 }
2031
2032 let id = builder.save()?;
2033 self.stamp_xmin_if_in_txn(&input.collection, id);
2036 refresh_context_index(&db, &input.collection, id)?;
2037 self.cdc_emit(
2038 crate::replication::cdc::ChangeOperation::Insert,
2039 &input.collection,
2040 id.raw(),
2041 "graph_edge",
2042 );
2043 Ok(CreateEntityOutput {
2044 id,
2045 entity: db.store().get(&input.collection, id),
2046 })
2047 }
2048}
2049
2050impl RuntimeEntityPort for RedDBRuntime {
2051 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2052 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2053 let db = self.db();
2054 let CreateRowInput {
2055 collection,
2056 fields,
2057 metadata: input_metadata,
2058 node_links,
2059 vector_links,
2060 } = input;
2061 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2062 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2063 let mut metadata = input_metadata;
2064 apply_collection_default_ttl(&db, &collection, &mut metadata);
2065 let fields = contract.normalize_insert_fields(fields)?;
2066 contract.enforce_row_uniqueness(&fields, None)?;
2067 let engine = self.mutation_engine();
2069 let result = engine.apply(
2070 collection.clone(),
2071 vec![crate::runtime::mutation::MutationRow {
2072 fields,
2073 metadata,
2074 node_links,
2075 vector_links,
2076 }],
2077 )?;
2078 let id = result.ids[0];
2079 Ok(CreateEntityOutput {
2084 id,
2085 entity: db.store().get(&collection, id),
2086 })
2087 }
2088
2089 fn create_rows_batch(
2090 &self,
2091 input: CreateRowsBatchInput,
2092 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2093 if input.rows.is_empty() {
2094 return Ok(Vec::new());
2095 }
2096 self.check_batch_size(input.rows.len())?;
2097 self.check_db_size()?;
2098
2099 let db = self.db();
2100 let collection = input.collection;
2101 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2102 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2103
2104 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2105 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2106 for row in input.rows {
2107 if row.collection != collection {
2108 return Err(crate::RedDBError::Query(format!(
2109 "batch row collection mismatch: expected '{}', got '{}'",
2110 collection, row.collection
2111 )));
2112 }
2113
2114 let mut metadata = row.metadata;
2115 apply_collection_default_ttl(&db, &collection, &mut metadata);
2116 let fields = contract.normalize_insert_fields(row.fields)?;
2117 contract.enforce_row_uniqueness(&fields, None)?;
2118 uniqueness_rows.push(fields.clone());
2119 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2120 }
2121
2122 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2123
2124 let engine = {
2127 let e = self.mutation_engine();
2128 if input.suppress_events {
2129 e.with_suppress_events()
2130 } else {
2131 e
2132 }
2133 };
2134 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2135 .into_iter()
2136 .map(|(fields, metadata, node_links, vector_links)| {
2137 crate::runtime::mutation::MutationRow {
2138 fields,
2139 metadata,
2140 node_links,
2141 vector_links,
2142 }
2143 })
2144 .collect();
2145
2146 let result = engine
2147 .apply(collection.clone(), mutation_rows)
2148 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2149
2150 let store = db.store();
2151 Ok(result
2152 .ids
2153 .into_iter()
2154 .map(|id| CreateEntityOutput {
2155 id,
2156 entity: store.get(&collection, id),
2157 })
2158 .collect())
2159 }
2160
2161 fn create_rows_batch_prevalidated_columnar(
2162 &self,
2163 collection: String,
2164 column_names: std::sync::Arc<Vec<String>>,
2165 rows: Vec<Vec<crate::storage::schema::Value>>,
2166 ) -> RedDBResult<usize> {
2167 use crate::storage::{
2168 unified::{EntityData, EntityKind, RowData},
2169 EntityId, UnifiedEntity,
2170 };
2171 use std::sync::Arc;
2172
2173 if rows.is_empty() {
2174 return Ok(0);
2175 }
2176 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2177 self.check_batch_size(rows.len())?;
2178 self.check_db_size()?;
2179
2180 let db = self.db();
2181 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2182 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2183
2184 let store = db.store();
2185 let ncols = column_names.len();
2186 let table_arc: Arc<str> = Arc::from(collection.as_str());
2187
2188 let indexed_cols = self
2195 .index_store_ref()
2196 .indexed_columns_set(collection.as_str());
2197 let has_secondary_indexes = !indexed_cols.is_empty();
2198 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2199 if has_secondary_indexes {
2200 Vec::with_capacity(rows.len())
2201 } else {
2202 Vec::new()
2203 };
2204
2205 let entities: Vec<UnifiedEntity> = rows
2211 .into_iter()
2212 .map(|values| {
2213 if values.len() != ncols {
2214 }
2218 if has_secondary_indexes {
2219 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2222 Vec::with_capacity(indexed_cols.len());
2223 for (name, val) in column_names.iter().zip(values.iter()) {
2224 if indexed_cols.contains(name) {
2225 snap.push((name.clone(), val.clone()));
2226 }
2227 }
2228 field_snapshots.push(snap);
2229 }
2230 let mut row = RowData::new(values);
2231 row.schema = Some(Arc::clone(&column_names));
2232 UnifiedEntity::new(
2233 EntityId::new(0),
2234 EntityKind::TableRow {
2235 table: Arc::clone(&table_arc),
2236 row_id: 0,
2237 },
2238 EntityData::Row(row),
2239 )
2240 })
2241 .collect();
2242
2243 let ids = store
2244 .bulk_insert(&collection, entities)
2245 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2246
2247 if has_secondary_indexes {
2252 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2253 .iter()
2254 .zip(field_snapshots)
2255 .map(|(id, fields)| (*id, fields))
2256 .collect();
2257 self.index_store_ref()
2258 .index_entity_insert_batch(&collection, &index_rows)
2259 .map_err(crate::RedDBError::Internal)?;
2260 }
2261
2262 self.invalidate_result_cache();
2265 self.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2266
2267 Ok(ids.len())
2268 }
2269
2270 fn create_rows_batch_columnar(
2271 &self,
2272 collection: String,
2273 column_names: std::sync::Arc<Vec<String>>,
2274 rows: Vec<Vec<crate::storage::schema::Value>>,
2275 ) -> RedDBResult<usize> {
2276 if rows.is_empty() {
2277 return Ok(0);
2278 }
2279 self.check_batch_size(rows.len())?;
2280 self.check_db_size()?;
2281
2282 let db = self.db();
2283 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2284 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2285
2286 let needs_normalisation = match db.collection_contract(&collection) {
2296 Some(c) => {
2297 c.declared_model == crate::catalog::CollectionModel::Table
2298 && (!c.declared_columns.is_empty()
2299 || c.table_def
2300 .as_ref()
2301 .map(|t| !t.columns.is_empty())
2302 .unwrap_or(false))
2303 }
2304 None => false,
2305 };
2306 if !needs_normalisation {
2307 return self.create_rows_batch_prevalidated_columnar(collection, column_names, rows);
2308 }
2309
2310 let ncols = column_names.len();
2317 let tuple_rows: Vec<CreateRowInput> = rows
2318 .into_iter()
2319 .map(|values| {
2320 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2321 Vec::with_capacity(ncols);
2322 for (name, value) in column_names.iter().zip(values) {
2323 fields.push((name.clone(), value));
2324 }
2325 CreateRowInput {
2326 collection: collection.clone(),
2327 fields,
2328 metadata: Vec::new(),
2329 node_links: Vec::new(),
2330 vector_links: Vec::new(),
2331 }
2332 })
2333 .collect();
2334 self.create_rows_batch(CreateRowsBatchInput {
2335 collection,
2336 rows: tuple_rows,
2337 suppress_events: false,
2338 })
2339 .map(|out| out.len())
2340 }
2341
2342 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2343 if input.rows.is_empty() {
2344 return Ok(0);
2345 }
2346 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2347 self.check_batch_size(input.rows.len())?;
2348 self.check_db_size()?;
2349
2350 let db = self.db();
2351 let collection = input.collection;
2352 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2357 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2358
2359 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2365
2366 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2367 Vec::with_capacity(input.rows.len());
2368 let mut mutation_rows = mutation_rows;
2369 for row in input.rows {
2370 if row.collection != collection {
2371 return Err(crate::RedDBError::Query(format!(
2372 "batch row collection mismatch: expected '{}', got '{}'",
2373 collection, row.collection
2374 )));
2375 }
2376 let mut metadata = row.metadata;
2377 if let Some(ttl) = default_ttl_ms {
2378 if !has_internal_ttl_metadata(&metadata) {
2379 metadata.push((
2380 "_ttl_ms".to_string(),
2381 if ttl <= i64::MAX as u64 {
2382 MetadataValue::Int(ttl as i64)
2383 } else {
2384 MetadataValue::Timestamp(ttl)
2385 },
2386 ));
2387 }
2388 }
2389 mutation_rows.push(crate::runtime::mutation::MutationRow {
2390 fields: row.fields,
2391 metadata,
2392 node_links: row.node_links,
2393 vector_links: row.vector_links,
2394 });
2395 }
2396
2397 let engine = self.mutation_engine();
2398 let result = engine
2399 .apply(collection, mutation_rows)
2400 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2401 Ok(result.ids.len())
2402 }
2403
2404 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2405 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2406 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2407 self.create_node_unchecked(input)
2408 }
2409
2410 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2411 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2412 ensure_non_tree_structural_edge_label(&input.label)?;
2413 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2414 self.create_edge_unchecked(input)
2415 }
2416
2417 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2418 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2419 let db = self.db();
2420 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2421 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2422 let mut metadata = input.metadata;
2423 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2424 let mut builder = db.vector(&input.collection).dense(input.dense);
2425
2426 if let Some(content) = input.content {
2427 builder = builder.content(content);
2428 }
2429
2430 for (key, value) in metadata {
2431 builder = builder.metadata(key, value);
2432 }
2433
2434 if let Some(link_row) = input.link_row {
2435 builder = builder.link_to_table(link_row);
2436 }
2437
2438 if let Some(link_node) = input.link_node {
2439 builder = builder.link_to_node(link_node);
2440 }
2441
2442 let id = builder.save()?;
2443 self.stamp_xmin_if_in_txn(&input.collection, id);
2446 refresh_context_index(&db, &input.collection, id)?;
2447 self.cdc_emit(
2448 crate::replication::cdc::ChangeOperation::Insert,
2449 &input.collection,
2450 id.raw(),
2451 "vector",
2452 );
2453 Ok(CreateEntityOutput {
2454 id,
2455 entity: db.store().get(&input.collection, id),
2456 })
2457 }
2458
2459 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2460 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2461 let db = self.db();
2462 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2463 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2464
2465 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2467 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2468 })?;
2469 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2470 "body".to_string(),
2471 crate::storage::schema::Value::Json(body_bytes),
2472 )];
2473
2474 if let JsonValue::Object(ref map) = input.body {
2476 for (key, value) in map {
2477 let storage_value = json_to_storage_value(value)?;
2478 fields.push((key.clone(), storage_value));
2479 }
2480 }
2481
2482 let mut metadata = input.metadata;
2483 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2484 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2485 .iter()
2486 .map(|(key, value)| (key.as_str(), value.clone()))
2487 .collect();
2488 let mut builder = db.row(&input.collection, columns);
2489
2490 for (key, value) in metadata {
2491 builder = builder.metadata(key, value);
2492 }
2493
2494 for node in input.node_links {
2495 builder = builder.link_to_node(node);
2496 }
2497
2498 for vector in input.vector_links {
2499 builder = builder.link_to_vector(vector);
2500 }
2501
2502 let id = builder.save()?;
2503 self.stamp_xmin_if_in_txn(&input.collection, id);
2505 refresh_context_index(&db, &input.collection, id)?;
2506 self.cdc_emit(
2507 crate::replication::cdc::ChangeOperation::Insert,
2508 &input.collection,
2509 id.raw(),
2510 "document",
2511 );
2512 Ok(CreateEntityOutput {
2513 id,
2514 entity: db.store().get(&input.collection, id),
2515 })
2516 }
2517
2518 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2519 let db = self.db();
2520 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2521 let declared_model = db
2522 .collection_contract(&input.collection)
2523 .map(|contract| contract.declared_model);
2524 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2525 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2526 self.seal_vault_value(&input.collection, input.value)?
2527 } else {
2528 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2529 input.value
2530 };
2531 let fields = vec![
2532 (
2533 "key".to_string(),
2534 crate::storage::schema::Value::text(input.key),
2535 ),
2536 ("value".to_string(), value),
2537 ];
2538 let collection = input.collection;
2539 let result = self.mutation_engine().apply(
2540 collection.clone(),
2541 vec![crate::runtime::mutation::MutationRow {
2542 fields,
2543 metadata: input.metadata,
2544 node_links: Vec::new(),
2545 vector_links: Vec::new(),
2546 }],
2547 )?;
2548 let id = result.ids[0];
2549 Ok(CreateEntityOutput {
2550 id,
2551 entity: db.store().get(&collection, id),
2552 })
2553 }
2554
2555 fn create_timeseries_point(
2556 &self,
2557 input: CreateTimeSeriesPointInput,
2558 ) -> RedDBResult<CreateEntityOutput> {
2559 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2560 let db = self.db();
2561 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2562 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2563
2564 let mut fields = vec![
2565 (
2566 "metric".to_string(),
2567 crate::storage::schema::Value::text(input.metric),
2568 ),
2569 (
2570 "value".to_string(),
2571 crate::storage::schema::Value::Float(input.value),
2572 ),
2573 ];
2574
2575 if let Some(timestamp_ns) = input.timestamp_ns {
2576 fields.push((
2577 "timestamp".to_string(),
2578 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2579 ));
2580 }
2581
2582 if !input.tags.is_empty() {
2583 let tags_json = JsonValue::Object(
2584 input
2585 .tags
2586 .into_iter()
2587 .map(|(key, value)| (key, JsonValue::String(value)))
2588 .collect(),
2589 );
2590 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2591 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2592 })?;
2593 fields.push((
2594 "tags".to_string(),
2595 crate::storage::schema::Value::Json(tags_bytes),
2596 ));
2597 }
2598
2599 let collection = input.collection;
2600 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2601 self.stamp_xmin_if_in_txn(&collection, id);
2604 refresh_context_index(&db, &collection, id)?;
2605
2606 Ok(CreateEntityOutput {
2607 id,
2608 entity: db.store().get(&collection, id),
2609 })
2610 }
2611
2612 fn get_kv(
2613 &self,
2614 collection: &str,
2615 key: &str,
2616 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2617 let db = self.db();
2618 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2619 let store = db.store();
2620 let Some(manager) = store.get_collection(collection) else {
2621 return Ok(None);
2622 };
2623 let entities = manager.query_all(|_| true);
2624 for entity in entities {
2625 if let crate::storage::EntityData::Row(ref row) = entity.data {
2626 if let Some(ref named) = row.named {
2627 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2628 if &**k == key {
2629 let value = named
2630 .get("value")
2631 .cloned()
2632 .unwrap_or(crate::storage::schema::Value::Null);
2633 return Ok(Some((value, entity.id)));
2634 }
2635 }
2636 }
2637 }
2638 }
2639 Ok(None)
2640 }
2641
2642 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2643 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2644 let found = self.get_kv(collection, key)?;
2645 if let Some((_, id)) = found {
2646 let db = self.db();
2647 let store = db.store();
2648 let deleted = store
2649 .delete(collection, id)
2650 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2651 if deleted {
2652 store.context_index().remove_entity(id);
2653 }
2654 Ok(deleted)
2655 } else {
2656 Ok(false)
2657 }
2658 }
2659
2660 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2661 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2662 let PatchEntityInput {
2663 collection,
2664 id,
2665 payload,
2666 operations,
2667 } = input;
2668 let db = self.db();
2669 let store = db.store();
2670 let Some(manager) = store.get_collection(&collection) else {
2671 return Err(crate::RedDBError::NotFound(format!(
2672 "collection not found: {collection}"
2673 )));
2674 };
2675 let Some(entity) = manager.get(id) else {
2676 return Err(crate::RedDBError::NotFound(format!(
2677 "entity not found: {}",
2678 id.raw()
2679 )));
2680 };
2681 self.apply_loaded_patch_entity(collection, entity, payload, operations)
2682 }
2683
2684 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2685 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2686 let store = self.db().store();
2687 let pre_delete_fields = store
2691 .get(&input.collection, input.id)
2692 .as_ref()
2693 .map(entity_row_fields_snapshot)
2694 .unwrap_or_default();
2695 let deleted = store
2699 .delete(&input.collection, input.id)
2700 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2701 if deleted {
2702 store.context_index().remove_entity(input.id);
2703 if !pre_delete_fields.is_empty() {
2706 self.index_store_ref()
2707 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2708 .map_err(crate::RedDBError::Internal)?;
2709 }
2710 self.cdc_emit(
2711 crate::replication::cdc::ChangeOperation::Delete,
2712 &input.collection,
2713 input.id.raw(),
2714 "entity",
2715 );
2716 }
2717 Ok(DeleteEntityOutput {
2718 deleted,
2719 id: input.id,
2720 })
2721 }
2722}