1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 if requested_model == crate::catalog::CollectionModel::Vector
100 && contract
101 .ai_policy
102 .as_ref()
103 .is_some_and(|policy| policy.embed.is_some() || policy.vision.is_some())
104 {
105 return Ok(());
106 }
107 return Err(crate::RedDBError::InvalidOperation(format!(
108 "collection '{}' is declared as '{}' and does not allow '{}' writes",
109 collection,
110 collection_model_name(contract.declared_model),
111 collection_model_name(requested_model)
112 )));
113 }
114
115 let now = implicit_contract_unix_ms();
116 db.save_collection_contract(crate::physical::CollectionContract {
117 name: collection.to_string(),
118 declared_model: requested_model,
119 schema_mode: crate::catalog::SchemaMode::Dynamic,
120 origin: crate::physical::ContractOrigin::Implicit,
121 version: 1,
122 created_at_unix_ms: now,
123 updated_at_unix_ms: now,
124 default_ttl_ms: db.collection_default_ttl_ms(collection),
125 vector_dimension: None,
126 vector_metric: None,
127 context_index_fields: Vec::new(),
128 declared_columns: Vec::new(),
129 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
130 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
131 timestamps_enabled: false,
132 context_index_enabled: false,
133 metrics_raw_retention_ms: None,
134 metrics_rollup_policies: Vec::new(),
135 metrics_tenant_identity: None,
136 metrics_namespace: None,
137 append_only: false,
140 subscriptions: Vec::new(),
141 analytics_config: Vec::new(),
142 session_key: None,
143 session_gap_ms: None,
144 retention_duration_ms: None,
145 analytical_storage: None,
146
147 ai_policy: None,
148 })
149 .map(|_| ())
150 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
151}
152
153fn implicit_contract_unix_ms() -> u128 {
154 std::time::SystemTime::now()
155 .duration_since(std::time::UNIX_EPOCH)
156 .unwrap_or_default()
157 .as_millis()
158}
159
160fn collection_model_allows(
161 declared_model: crate::catalog::CollectionModel,
162 requested_model: crate::catalog::CollectionModel,
163) -> bool {
164 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
165}
166
167fn ensure_vector_dimension_contract(
168 db: &crate::storage::unified::devx::RedDB,
169 collection: &str,
170 actual_dimension: usize,
171) -> RedDBResult<()> {
172 let Some(expected_dimension) = db
173 .collection_contract(collection)
174 .and_then(|contract| contract.vector_dimension)
175 else {
176 return Ok(());
177 };
178 if expected_dimension == actual_dimension {
179 return Ok(());
180 }
181 Err(crate::RedDBError::Query(format!(
182 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
183 )))
184}
185
186fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
187 match model {
188 crate::catalog::CollectionModel::Table => "table",
189 crate::catalog::CollectionModel::Document => "document",
190 crate::catalog::CollectionModel::Graph => "graph",
191 crate::catalog::CollectionModel::Vector => "vector",
192 crate::catalog::CollectionModel::Hll => "hll",
193 crate::catalog::CollectionModel::Sketch => "sketch",
194 crate::catalog::CollectionModel::Filter => "filter",
195 crate::catalog::CollectionModel::Kv => "kv",
196 crate::catalog::CollectionModel::Config => "config",
197 crate::catalog::CollectionModel::Vault => "vault",
198 crate::catalog::CollectionModel::Mixed => "mixed",
199 crate::catalog::CollectionModel::TimeSeries => "timeseries",
200 crate::catalog::CollectionModel::Queue => "queue",
201 crate::catalog::CollectionModel::Metrics => "metrics",
202 }
203}
204
205#[derive(Clone)]
206struct UniquenessRule {
207 name: String,
208 columns: Vec<String>,
209 primary_key: bool,
210}
211
212#[derive(Copy, Clone, PartialEq, Eq)]
213enum NormalizeMode {
214 Insert,
218 Update,
222}
223
224mod collection_contract_enforcement {
225 use super::*;
226
227 pub(super) struct CollectionContractWriteEnforcer<'a> {
228 db: &'a crate::storage::unified::devx::RedDB,
229 collection: &'a str,
230 }
231
232 impl<'a> CollectionContractWriteEnforcer<'a> {
233 pub(super) fn new(
234 db: &'a crate::storage::unified::devx::RedDB,
235 collection: &'a str,
236 ) -> Self {
237 Self { db, collection }
238 }
239
240 pub(super) fn ensure_model(
241 &self,
242 requested_model: crate::catalog::CollectionModel,
243 ) -> RedDBResult<()> {
244 ensure_collection_model_contract(self.db, self.collection, requested_model)
245 }
246
247 pub(super) fn normalize_insert_fields(
248 &self,
249 fields: Vec<(String, Value)>,
250 ) -> RedDBResult<Vec<(String, Value)>> {
251 normalize_row_fields_for_contract_with_mode(
252 self.db,
253 self.collection,
254 fields,
255 NormalizeMode::Insert,
256 )
257 }
258
259 pub(super) fn normalize_update_fields(
260 &self,
261 fields: Vec<(String, Value)>,
262 ) -> RedDBResult<Vec<(String, Value)>> {
263 normalize_row_fields_for_contract_with_mode(
264 self.db,
265 self.collection,
266 fields,
267 NormalizeMode::Update,
268 )
269 }
270
271 pub(super) fn enforce_row_uniqueness(
272 &self,
273 fields: &[(String, Value)],
274 exclude_id: Option<crate::storage::EntityId>,
275 ) -> RedDBResult<()> {
276 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
277 }
278
279 pub(super) fn enforce_batch_uniqueness(
280 &self,
281 rows: &[Vec<(String, Value)>],
282 ) -> RedDBResult<()> {
283 enforce_row_batch_uniqueness(self.db, self.collection, rows)
284 }
285
286 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
287 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
288 }
289 }
290}
291
292use collection_contract_enforcement::CollectionContractWriteEnforcer;
293
294fn normalize_row_fields_for_contract_with_mode(
295 db: &crate::storage::unified::devx::RedDB,
296 collection: &str,
297 fields: Vec<(String, Value)>,
298 mode: NormalizeMode,
299) -> RedDBResult<Vec<(String, Value)>> {
300 let Some(contract) = db.collection_contract(collection) else {
301 return Ok(fields);
302 };
303
304 if contract.declared_model != crate::catalog::CollectionModel::Table
305 || (contract.declared_columns.is_empty()
306 && contract
307 .table_def
308 .as_ref()
309 .map(|table| table.columns.is_empty())
310 .unwrap_or(true))
311 {
312 return Ok(fields);
313 }
314
315 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
325 fields
326 .iter()
327 .find(|(n, _)| n == "created_at")
328 .map(|(_, v)| v.clone())
329 } else {
330 None
331 };
332
333 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
338 for (name, _) in &fields {
339 if name == "created_at" || name == "updated_at" {
340 return Err(crate::RedDBError::Query(format!(
341 "collection '{}' manages '{}' automatically — do not set it in INSERT",
342 collection, name
343 )));
344 }
345 }
346 }
347
348 let mut provided = std::collections::BTreeMap::new();
349 for (name, value) in &fields {
350 provided.insert(name.clone(), value.clone());
351 }
352
353 let resolved_columns = resolved_contract_columns(&contract)?;
354 let declared_names: std::collections::BTreeSet<String> = resolved_columns
355 .iter()
356 .map(|column| column.name.clone())
357 .collect();
358 let unknown_fields: Vec<String> = fields
359 .iter()
360 .filter(|(name, _)| !declared_names.contains(name))
361 .map(|(name, _)| name.clone())
362 .collect();
363 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
364 && !unknown_fields.is_empty()
365 {
366 return Err(crate::RedDBError::Query(format!(
367 "collection '{}' is strict and does not allow undeclared fields: {}",
368 collection,
369 unknown_fields.join(", ")
370 )));
371 }
372 let mut normalized = Vec::new();
373 let now_ms = current_unix_ms_u64();
374
375 for column in &resolved_columns {
376 match provided.remove(&column.name) {
377 Some(value) => {
378 if contract.timestamps_enabled && mode == NormalizeMode::Update {
383 match column.name.as_str() {
384 "created_at" => {
385 normalized.push((
386 column.name.clone(),
387 existing_created_at
388 .clone()
389 .unwrap_or(Value::UnsignedInteger(now_ms)),
390 ));
391 continue;
392 }
393 "updated_at" => {
394 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395 continue;
396 }
397 _ => {}
398 }
399 }
400 normalized.push((
401 column.name.clone(),
402 normalize_contract_value(collection, column, value)?,
403 ));
404 }
405 None => {
406 if contract.timestamps_enabled
410 && (column.name == "created_at" || column.name == "updated_at")
411 {
412 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
413 continue;
414 }
415 if let Some(default) = &column.default {
416 normalized.push((
417 column.name.clone(),
418 coerce_contract_literal(collection, &column.name, column, default)?,
419 ));
420 } else if column.not_null {
421 return Err(crate::RedDBError::Query(format!(
422 "missing required column '{}' for collection '{}'",
423 column.name, collection
424 )));
425 }
426 }
427 }
428 }
429
430 for (name, value) in fields {
431 if !declared_names.contains(&name) {
432 normalized.push((name, value));
433 }
434 }
435
436 Ok(normalized)
437}
438
439fn current_unix_ms_u64() -> u64 {
440 std::time::SystemTime::now()
441 .duration_since(std::time::UNIX_EPOCH)
442 .map(|d| d.as_millis() as u64)
443 .unwrap_or(0)
444}
445
446fn enforce_row_uniqueness(
447 db: &crate::storage::unified::devx::RedDB,
448 collection: &str,
449 fields: &[(String, Value)],
450 exclude_id: Option<crate::storage::EntityId>,
451) -> RedDBResult<()> {
452 let Some(contract) = db.collection_contract(collection) else {
453 return Ok(());
454 };
455 if contract.declared_model != crate::catalog::CollectionModel::Table
456 && contract.declared_model != crate::catalog::CollectionModel::Mixed
457 {
458 return Ok(());
459 }
460
461 let rules = resolved_uniqueness_rules(&contract);
462 if rules.is_empty() {
463 return Ok(());
464 }
465
466 let store = db.store();
467 let Some(manager) = store.get_collection(collection) else {
468 return Ok(());
469 };
470
471 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
472
473 for rule in &rules {
474 let mut expected_signatures = Vec::new();
475 let mut skip_rule = false;
476
477 for column in &rule.columns {
478 match input_fields.get(column) {
479 Some(Value::Null) | None if rule.primary_key => {
480 return Err(crate::RedDBError::Query(format!(
481 "primary key '{}' in collection '{}' requires non-null column '{}'",
482 rule.name, collection, column
483 )))
484 }
485 Some(Value::Null) | None => {
486 skip_rule = true;
487 break;
488 }
489 Some(value) => {
490 expected_signatures.push((column.clone(), value_signature(value)));
491 }
492 }
493 }
494
495 if skip_rule {
496 continue;
497 }
498
499 for entity in manager.query_all(|_| true) {
500 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
501 continue;
502 }
503 let Some(existing_fields) = row_fields_from_entity(&entity) else {
504 continue;
505 };
506
507 let duplicate = expected_signatures.iter().all(|(column, expected)| {
508 existing_fields
509 .get(column)
510 .map(|value| value_signature(value) == *expected)
511 .unwrap_or(false)
512 });
513
514 if duplicate {
515 let qualifier = if rule.primary_key {
516 "primary key"
517 } else {
518 "unique constraint"
519 };
520 return Err(crate::RedDBError::Query(format!(
521 "{} '{}' violated on collection '{}' for columns [{}]",
522 qualifier,
523 rule.name,
524 collection,
525 rule.columns.join(", ")
526 )));
527 }
528 }
529 }
530
531 Ok(())
532}
533
534fn enforce_row_batch_uniqueness(
535 db: &crate::storage::unified::devx::RedDB,
536 collection: &str,
537 rows: &[Vec<(String, Value)>],
538) -> RedDBResult<()> {
539 let Some(contract) = db.collection_contract(collection) else {
540 return Ok(());
541 };
542 if contract.declared_model != crate::catalog::CollectionModel::Table
543 && contract.declared_model != crate::catalog::CollectionModel::Mixed
544 {
545 return Ok(());
546 }
547
548 let rules = resolved_uniqueness_rules(&contract);
549 if rules.is_empty() {
550 return Ok(());
551 }
552
553 for rule in &rules {
554 let mut seen = std::collections::HashMap::<String, usize>::new();
555 for (row_index, fields) in rows.iter().enumerate() {
556 let input_fields: std::collections::BTreeMap<String, Value> =
557 fields.iter().cloned().collect();
558 let mut signatures = Vec::new();
559 let mut skip_rule = false;
560
561 for column in &rule.columns {
562 match input_fields.get(column) {
563 Some(Value::Null) | None if rule.primary_key => {
564 return Err(crate::RedDBError::Query(format!(
565 "primary key '{}' in collection '{}' requires non-null column '{}'",
566 rule.name, collection, column
567 )))
568 }
569 Some(Value::Null) | None => {
570 skip_rule = true;
571 break;
572 }
573 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
574 }
575 }
576
577 if skip_rule {
578 continue;
579 }
580
581 let signature = signatures.join("|");
582 if let Some(previous_index) = seen.insert(signature, row_index) {
583 return Err(crate::RedDBError::Query(format!(
584 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
585 rule.name,
586 collection,
587 previous_index + 1,
588 row_index + 1
589 )));
590 }
591 }
592 }
593
594 Ok(())
595}
596
597fn row_update_requires_uniqueness_check(
598 db: &crate::storage::unified::devx::RedDB,
599 collection: &str,
600 modified_columns: &[String],
601) -> bool {
602 if modified_columns.is_empty() {
603 return false;
604 }
605
606 let Some(contract) = db.collection_contract(collection) else {
607 return false;
608 };
609 if contract.declared_model != crate::catalog::CollectionModel::Table
610 && contract.declared_model != crate::catalog::CollectionModel::Mixed
611 {
612 return false;
613 }
614
615 let rules = resolved_uniqueness_rules(&contract);
616 if rules.is_empty() {
617 return false;
618 }
619
620 rules.iter().any(|rule| {
621 rule.columns.iter().any(|column| {
622 modified_columns
623 .iter()
624 .any(|modified| modified.eq_ignore_ascii_case(column))
625 })
626 })
627}
628
629pub(crate) fn build_row_update_contract_plan(
630 db: &crate::storage::unified::devx::RedDB,
631 collection: &str,
632) -> RedDBResult<Option<RowUpdateContractPlan>> {
633 let Some(contract) = db.collection_contract(collection) else {
634 return Ok(None);
635 };
636
637 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
638 && !(contract.declared_columns.is_empty()
639 && contract
640 .table_def
641 .as_ref()
642 .map(|table| table.columns.is_empty())
643 .unwrap_or(true))
644 {
645 resolved_contract_columns(&contract)?
646 .into_iter()
647 .map(|rule| {
648 (
649 rule.name.clone(),
650 RowUpdateColumnRule {
651 name: rule.name,
652 data_type: rule.data_type,
653 data_type_name: rule.data_type_name,
654 not_null: rule.not_null,
655 enum_variants: rule.enum_variants,
656 },
657 )
658 })
659 .collect()
660 } else {
661 HashMap::new()
662 };
663
664 let unique_columns = if matches!(
665 contract.declared_model,
666 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
667 ) {
668 resolved_uniqueness_rules(&contract)
669 .into_iter()
670 .flat_map(|rule| rule.columns.into_iter())
671 .map(|column| (column, ()))
672 .collect()
673 } else {
674 HashMap::new()
675 };
676
677 Ok(Some(RowUpdateContractPlan {
678 timestamps_enabled: contract.timestamps_enabled,
679 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
680 declared_rules,
681 unique_columns,
682 }))
683}
684
685pub(crate) fn normalize_row_update_assignment_with_plan(
686 collection: &str,
687 column: &str,
688 value: Value,
689 row_contract_plan: Option<&RowUpdateContractPlan>,
690) -> RedDBResult<Value> {
691 let Some(plan) = row_contract_plan else {
692 return Ok(value);
693 };
694
695 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
696 return Err(crate::RedDBError::Query(format!(
697 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
698 collection, column
699 )));
700 }
701
702 if let Some(rule) = plan.declared_rules.get(column) {
703 let rule = ResolvedColumnRule {
704 name: rule.name.clone(),
705 data_type: rule.data_type,
706 data_type_name: rule.data_type_name.clone(),
707 not_null: rule.not_null,
708 default: None,
709 enum_variants: rule.enum_variants.clone(),
710 };
711 normalize_contract_value(collection, &rule, value)
712 } else if plan.strict_schema {
713 Err(crate::RedDBError::Query(format!(
714 "collection '{}' is strict and does not allow undeclared fields: {}",
715 collection, column
716 )))
717 } else {
718 Ok(value)
719 }
720}
721
722pub(crate) fn normalize_row_update_value_for_rule(
723 collection: &str,
724 value: Value,
725 row_rule: Option<&RowUpdateColumnRule>,
726) -> RedDBResult<Value> {
727 let Some(rule) = row_rule else {
728 return Ok(value);
729 };
730
731 let rule = ResolvedColumnRule {
732 name: rule.name.clone(),
733 data_type: rule.data_type,
734 data_type_name: rule.data_type_name.clone(),
735 not_null: rule.not_null,
736 default: None,
737 enum_variants: rule.enum_variants.clone(),
738 };
739 normalize_contract_value(collection, &rule, value)
740}
741
742fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
743 if let Some(named) = row.named.as_mut() {
744 named.insert(name.to_string(), value);
745 return;
746 }
747
748 if let Some(schema) = row.schema.as_ref() {
749 if let Some(index) = schema.iter().position(|column| column == name) {
750 if let Some(slot) = row.columns.get_mut(index) {
751 *slot = value;
752 return;
753 }
754 }
755
756 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
757 for (column, current) in schema.iter().zip(row.columns.iter()) {
758 named.insert(column.clone(), current.clone());
759 }
760 named.insert(name.to_string(), value);
761 row.named = Some(named);
762 return;
763 }
764
765 let mut named = HashMap::with_capacity(1);
766 named.insert(name.to_string(), value);
767 row.named = Some(named);
768}
769
770fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
771 if let Some(named) = row.named.as_ref() {
772 named
773 .iter()
774 .map(|(key, value)| (key.clone(), value.clone()))
775 .collect()
776 } else if let Some(schema) = row.schema.as_ref() {
777 schema
778 .iter()
779 .cloned()
780 .zip(row.columns.iter().cloned())
781 .collect()
782 } else {
783 Vec::new()
784 }
785}
786
787fn apply_row_field_assignments_raw<I>(
788 row: &mut crate::storage::unified::entity::RowData,
789 field_assignments: I,
790) where
791 I: IntoIterator<Item = (String, Value)>,
792{
793 for (column, value) in field_assignments {
794 set_row_field(row, &column, value);
795 }
796}
797
798fn apply_row_field_assignments_incremental<I>(
799 collection: &str,
800 row: &mut crate::storage::unified::entity::RowData,
801 field_assignments: I,
802 row_contract_plan: Option<&RowUpdateContractPlan>,
803) -> RedDBResult<()>
804where
805 I: IntoIterator<Item = (String, Value)>,
806{
807 for (column, value) in field_assignments {
808 let value = normalize_row_update_assignment_with_plan(
809 collection,
810 &column,
811 value,
812 row_contract_plan,
813 )?;
814
815 set_row_field(row, &column, value);
816 }
817
818 Ok(())
819}
820
821fn resolved_uniqueness_rules(
822 contract: &crate::physical::CollectionContract,
823) -> Vec<UniquenessRule> {
824 let mut rules = Vec::new();
825
826 if let Some(table_def) = &contract.table_def {
827 if !table_def.primary_key.is_empty() {
828 rules.push(UniquenessRule {
829 name: "primary_key".to_string(),
830 columns: table_def.primary_key.clone(),
831 primary_key: true,
832 });
833 }
834
835 for constraint in &table_def.constraints {
836 if matches!(
837 constraint.constraint_type,
838 crate::storage::schema::ConstraintType::PrimaryKey
839 ) && !constraint.columns.is_empty()
840 {
841 rules.push(UniquenessRule {
842 name: constraint.name.clone(),
843 columns: constraint.columns.clone(),
844 primary_key: true,
845 });
846 } else if matches!(
847 constraint.constraint_type,
848 crate::storage::schema::ConstraintType::Unique
849 ) && !constraint.columns.is_empty()
850 {
851 rules.push(UniquenessRule {
852 name: constraint.name.clone(),
853 columns: constraint.columns.clone(),
854 primary_key: false,
855 });
856 }
857 }
858 } else {
859 for column in &contract.declared_columns {
860 if column.primary_key {
861 rules.push(UniquenessRule {
862 name: format!("pk_{}", column.name),
863 columns: vec![column.name.clone()],
864 primary_key: true,
865 });
866 } else if column.unique {
867 rules.push(UniquenessRule {
868 name: format!("uniq_{}", column.name),
869 columns: vec![column.name.clone()],
870 primary_key: false,
871 });
872 }
873 }
874 }
875
876 let mut dedup = std::collections::BTreeSet::new();
877 rules
878 .into_iter()
879 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
880 .collect()
881}
882
883fn row_fields_from_entity(
884 entity: &crate::storage::UnifiedEntity,
885) -> Option<std::collections::BTreeMap<String, Value>> {
886 match &entity.data {
887 crate::storage::EntityData::Row(row) => {
888 if let Some(named) = &row.named {
889 Some(
890 named
891 .iter()
892 .map(|(key, value)| (key.clone(), value.clone()))
893 .collect(),
894 )
895 } else {
896 row.schema.as_ref().map(|schema| {
897 schema
898 .iter()
899 .cloned()
900 .zip(row.columns.iter().cloned())
901 .collect()
902 })
903 }
904 }
905 _ => None,
906 }
907}
908
909fn value_signature(value: &Value) -> String {
910 format!("{value:?}")
911}
912
913fn normalize_contract_value(
914 collection: &str,
915 column: &ResolvedColumnRule,
916 value: Value,
917) -> RedDBResult<Value> {
918 if matches!(value, Value::Null) {
919 if column.not_null {
920 return Err(crate::RedDBError::Query(format!(
921 "column '{}' in collection '{}' cannot be null",
922 column.name, collection
923 )));
924 }
925 return Ok(Value::Null);
926 }
927
928 let target = column.data_type;
929 if value_matches_declared_type(&value, target) {
930 return Ok(value);
931 }
932
933 let Some(raw) = value_to_coercion_input(&value) else {
934 return Err(crate::RedDBError::Query(format!(
935 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
936 column.name, collection, column.data_type_name
937 )));
938 };
939
940 coerce_contract_literal(collection, &column.name, column, &raw)
941}
942
943fn coerce_contract_literal(
944 collection: &str,
945 column_name: &str,
946 column: &ResolvedColumnRule,
947 raw: &str,
948) -> RedDBResult<Value> {
949 let target = column.data_type;
950 match target {
951 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
952 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
953 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
954 crate::RedDBError::Query(format!(
955 "failed to coerce column '{}' in collection '{}' to '{}': {}",
956 column_name, collection, column.data_type_name, err
957 ))
958 }),
959 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
960 crate::RedDBError::Query(format!(
961 "failed to coerce column '{}' in collection '{}' to '{}': {}",
962 column_name, collection, column.data_type_name, err
963 ))
964 }),
965 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
966 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
967 column_name, collection, column.data_type_name
968 ))),
969 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
970 |err| {
971 crate::RedDBError::Query(format!(
972 "failed to coerce column '{}' in collection '{}' to '{}': {}",
973 column_name, collection, column.data_type_name, err
974 ))
975 },
976 ),
977 }
978}
979
980struct ResolvedColumnRule {
981 name: String,
982 data_type: DataType,
983 data_type_name: String,
984 not_null: bool,
985 default: Option<String>,
986 enum_variants: Vec<String>,
987}
988
989fn resolved_contract_columns(
990 contract: &crate::physical::CollectionContract,
991) -> RedDBResult<Vec<ResolvedColumnRule>> {
992 if let Some(table_def) = &contract.table_def {
993 return Ok(table_def
994 .columns
995 .iter()
996 .map(|column| ResolvedColumnRule {
997 name: column.name.clone(),
998 data_type: column.data_type,
999 data_type_name: data_type_name(column.data_type).to_string(),
1000 not_null: !column.nullable,
1001 default: column
1002 .default
1003 .as_ref()
1004 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
1005 enum_variants: column.enum_variants.clone(),
1006 })
1007 .collect());
1008 }
1009
1010 contract
1011 .declared_columns
1012 .iter()
1013 .map(|column| {
1014 let data_type = column
1015 .sql_type
1016 .as_ref()
1017 .map(crate::storage::query::resolve_sql_type_name)
1018 .transpose()
1019 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1020 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1021 Ok(ResolvedColumnRule {
1022 name: column.name.clone(),
1023 data_type,
1024 data_type_name: column.data_type.clone(),
1025 not_null: column.not_null,
1026 default: column.default.clone(),
1027 enum_variants: column.enum_variants.clone(),
1028 })
1029 })
1030 .collect()
1031}
1032
1033fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1034 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1035}
1036
1037fn data_type_name(data_type: DataType) -> &'static str {
1038 match data_type {
1039 DataType::Integer => "integer",
1040 DataType::UnsignedInteger => "unsigned_integer",
1041 DataType::Float => "float",
1042 DataType::Text => "text",
1043 DataType::Blob => "blob",
1044 DataType::Boolean => "boolean",
1045 DataType::Timestamp => "timestamp",
1046 DataType::Duration => "duration",
1047 DataType::IpAddr => "ipaddr",
1048 DataType::MacAddr => "macaddr",
1049 DataType::Vector => "vector",
1050 DataType::Nullable => "nullable",
1051 DataType::Unknown => "unknown",
1052 DataType::Json => "json",
1053 DataType::Uuid => "uuid",
1054 DataType::NodeRef => "noderef",
1055 DataType::EdgeRef => "edgeref",
1056 DataType::VectorRef => "vectorref",
1057 DataType::RowRef => "rowref",
1058 DataType::Color => "color",
1059 DataType::Email => "email",
1060 DataType::Url => "url",
1061 DataType::Phone => "phone",
1062 DataType::Semver => "semver",
1063 DataType::Cidr => "cidr",
1064 DataType::Date => "date",
1065 DataType::Time => "time",
1066 DataType::Decimal => "decimal",
1067 DataType::Enum => "enum",
1068 DataType::Array => "array",
1069 DataType::TimestampMs => "timestamp_ms",
1070 DataType::Ipv4 => "ipv4",
1071 DataType::Ipv6 => "ipv6",
1072 DataType::Subnet => "subnet",
1073 DataType::Port => "port",
1074 DataType::Latitude => "latitude",
1075 DataType::Longitude => "longitude",
1076 DataType::GeoPoint => "geopoint",
1077 DataType::Country2 => "country2",
1078 DataType::Country3 => "country3",
1079 DataType::Lang2 => "lang2",
1080 DataType::Lang5 => "lang5",
1081 DataType::Currency => "currency",
1082 DataType::AssetCode => "asset_code",
1083 DataType::Money => "money",
1084 DataType::ColorAlpha => "color_alpha",
1085 DataType::BigInt => "bigint",
1086 DataType::KeyRef => "keyref",
1087 DataType::DocRef => "docref",
1088 DataType::TableRef => "tableref",
1089 DataType::PageRef => "pageref",
1090 DataType::Secret => "secret",
1091 DataType::Password => "password",
1092 DataType::TextZstd => "text",
1093 DataType::BlobZstd => "blob",
1094 }
1095}
1096
1097fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1098 matches!(
1099 (value, target),
1100 (Value::Null, _)
1101 | (Value::Integer(_), DataType::Integer)
1102 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1103 | (Value::Float(_), DataType::Float)
1104 | (Value::Text(_), DataType::Text)
1105 | (Value::Blob(_), DataType::Blob)
1106 | (Value::Boolean(_), DataType::Boolean)
1107 | (Value::Timestamp(_), DataType::Timestamp)
1108 | (Value::Duration(_), DataType::Duration)
1109 | (Value::IpAddr(_), DataType::IpAddr)
1110 | (Value::MacAddr(_), DataType::MacAddr)
1111 | (Value::Vector(_), DataType::Vector)
1112 | (Value::Json(_), DataType::Json)
1113 | (Value::Uuid(_), DataType::Uuid)
1114 | (Value::NodeRef(_), DataType::NodeRef)
1115 | (Value::EdgeRef(_), DataType::EdgeRef)
1116 | (Value::VectorRef(_, _), DataType::VectorRef)
1117 | (Value::RowRef(_, _), DataType::RowRef)
1118 | (Value::Color(_), DataType::Color)
1119 | (Value::Email(_), DataType::Email)
1120 | (Value::Url(_), DataType::Url)
1121 | (Value::Phone(_), DataType::Phone)
1122 | (Value::Semver(_), DataType::Semver)
1123 | (Value::Cidr(_, _), DataType::Cidr)
1124 | (Value::Date(_), DataType::Date)
1125 | (Value::Time(_), DataType::Time)
1126 | (Value::Decimal(_), DataType::Decimal)
1127 | (Value::EnumValue(_), DataType::Enum)
1128 | (Value::Array(_), DataType::Array)
1129 | (Value::TimestampMs(_), DataType::TimestampMs)
1130 | (Value::Ipv4(_), DataType::Ipv4)
1131 | (Value::Ipv6(_), DataType::Ipv6)
1132 | (Value::Subnet(_, _), DataType::Subnet)
1133 | (Value::Port(_), DataType::Port)
1134 | (Value::Latitude(_), DataType::Latitude)
1135 | (Value::Longitude(_), DataType::Longitude)
1136 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1137 | (Value::Country2(_), DataType::Country2)
1138 | (Value::Country3(_), DataType::Country3)
1139 | (Value::Lang2(_), DataType::Lang2)
1140 | (Value::Lang5(_), DataType::Lang5)
1141 | (Value::Currency(_), DataType::Currency)
1142 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1143 | (Value::BigInt(_), DataType::BigInt)
1144 | (Value::KeyRef(_, _), DataType::KeyRef)
1145 | (Value::DocRef(_, _), DataType::DocRef)
1146 | (Value::TableRef(_), DataType::TableRef)
1147 | (Value::PageRef(_), DataType::PageRef)
1148 | (Value::Secret(_), DataType::Secret)
1149 | (Value::Password(_), DataType::Password)
1150 )
1151}
1152
1153fn value_to_coercion_input(value: &Value) -> Option<String> {
1154 match value {
1155 Value::Null => None,
1156 Value::Integer(value) => Some(value.to_string()),
1157 Value::UnsignedInteger(value) => Some(value.to_string()),
1158 Value::Float(value) => Some(value.to_string()),
1159 Value::Text(value) => Some(value.to_string()),
1160 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1161 Value::Boolean(value) => Some(value.to_string()),
1162 Value::Timestamp(value) => Some(value.to_string()),
1163 Value::Duration(value) => Some(value.to_string()),
1164 Value::IpAddr(value) => Some(value.to_string()),
1165 Value::MacAddr(value) => Some(format!(
1166 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1167 value[0], value[1], value[2], value[3], value[4], value[5]
1168 )),
1169 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1170 Value::Email(value) => Some(value.clone()),
1171 Value::Url(value) => Some(value.clone()),
1172 Value::Phone(value) => Some(value.to_string()),
1173 Value::Semver(value) => Some(format!(
1174 "{}.{}.{}",
1175 value / 1_000_000,
1176 (value / 1_000) % 1_000,
1177 value % 1_000
1178 )),
1179 Value::Date(value) => Some(value.to_string()),
1180 Value::Time(value) => Some(value.to_string()),
1181 Value::Decimal(value) => Some(value.to_string()),
1182 Value::TimestampMs(value) => Some(value.to_string()),
1183 Value::Ipv4(value) => Some(format!(
1184 "{}.{}.{}.{}",
1185 (value >> 24) & 0xFF,
1186 (value >> 16) & 0xFF,
1187 (value >> 8) & 0xFF,
1188 value & 0xFF
1189 )),
1190 Value::Port(value) => Some(value.to_string()),
1191 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1192 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1193 Value::GeoPoint(lat, lon) => Some(format!(
1194 "{},{}",
1195 *lat as f64 / 1_000_000.0,
1196 *lon as f64 / 1_000_000.0
1197 )),
1198 Value::BigInt(value) => Some(value.to_string()),
1199 Value::TableRef(value) => Some(value.clone()),
1200 Value::PageRef(value) => Some(value.to_string()),
1201 Value::Password(value) => Some(value.clone()),
1202 _ => None,
1203 }
1204}
1205
1206fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1207 if modified_columns.is_empty() {
1208 return modified_columns;
1209 }
1210
1211 let mut unique = Vec::with_capacity(modified_columns.len());
1212 for column in modified_columns.drain(..) {
1213 if !unique
1214 .iter()
1215 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1216 {
1217 unique.push(column);
1218 }
1219 }
1220 unique
1221}
1222
1223fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1224 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1225 return Err(crate::RedDBError::Query(
1226 "array positional document patch paths are unsupported; replace the array or full document body instead"
1227 .to_string(),
1228 ));
1229 }
1230 Ok(())
1231}
1232
1233fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1234 match fields.get("body") {
1235 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1236 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1237 }),
1238 Some(_) => Err(crate::RedDBError::Query(
1239 "document body field must contain JSON".to_string(),
1240 )),
1241 None => Ok(JsonValue::Object(Default::default())),
1242 }
1243}
1244
1245fn replace_document_row_body(
1246 fields: &mut HashMap<String, Value>,
1247 body: JsonValue,
1248 modified_columns: &mut Vec<String>,
1249) -> RedDBResult<()> {
1250 modified_columns.push("body".to_string());
1251
1252 let old_keys: Vec<String> = fields
1253 .keys()
1254 .filter(|key| key.as_str() != "body")
1255 .cloned()
1256 .collect();
1257 for key in old_keys {
1258 fields.remove(&key);
1259 modified_columns.push(key);
1260 }
1261
1262 let body_bytes = json_to_vec(&body).map_err(|err| {
1263 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1264 })?;
1265 fields.insert("body".to_string(), Value::Json(body_bytes));
1266
1267 if let JsonValue::Object(map) = &body {
1268 for (key, value) in map {
1269 fields.insert(key.clone(), json_to_storage_value(value)?);
1270 modified_columns.push(key.clone());
1271 }
1272 }
1273
1274 Ok(())
1275}
1276
1277impl RedDBRuntime {
1278 pub(crate) fn apply_loaded_patch_entity_core(
1279 &self,
1280 collection: String,
1281 mut entity: crate::storage::UnifiedEntity,
1282 payload: JsonValue,
1283 operations: Vec<PatchEntityOperation>,
1284 ) -> RedDBResult<AppliedEntityMutation> {
1285 let id = entity.id;
1286 let previous_xmax = entity.xmax;
1287 let operations = normalize_ttl_patch_operations(operations)?;
1288 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1291
1292 let patch_versions = matches!(
1304 entity.data,
1305 crate::storage::EntityData::Row(_)
1306 | crate::storage::EntityData::Node(_)
1307 | crate::storage::EntityData::Edge(_)
1308 ) && self.vcs_is_versioned(&collection).unwrap_or(false);
1309 let versioned_update_xid = if patch_versions {
1310 match self.current_xid() {
1311 Some(xid) => Some(xid),
1312 None => {
1313 let snapshot_manager = self.snapshot_manager();
1314 let xid = snapshot_manager.begin();
1315 snapshot_manager.commit(xid);
1316 Some(xid)
1317 }
1318 }
1319 } else {
1320 None
1321 };
1322 let mut replaced_entity = versioned_update_xid.map(|xid| {
1323 let mut old = entity.clone();
1324 old.set_xmax(xid);
1325 old
1326 });
1327
1328 let db = self.db();
1329 let store = db.store();
1330 let Some(manager) = store.get_collection(&collection) else {
1331 return Err(crate::RedDBError::NotFound(format!(
1332 "collection not found: {collection}"
1333 )));
1334 };
1335
1336 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1337 let mut metadata_changed = false;
1338 let mut modified_columns: Vec<String> = Vec::new();
1339 let mut context_index_dirty = false;
1340 let mut graph_node_type: Option<String> = None;
1341 let mut graph_edge_weight: Option<f32> = None;
1342
1343 let row_contract_timestamps = db
1344 .collection_contract(&collection)
1345 .map(|c| c.timestamps_enabled)
1346 .unwrap_or(false);
1347
1348 match &mut entity.data {
1349 crate::storage::EntityData::Row(row) => {
1350 let is_document_collection = db
1351 .collection_contract(&collection)
1352 .map(|contract| {
1353 contract.declared_model == crate::catalog::CollectionModel::Document
1354 })
1355 .unwrap_or(false);
1356 let mut field_ops = Vec::new();
1357 let mut metadata_ops = Vec::new();
1358 let mut document_body_ops = Vec::new();
1359
1360 for mut op in operations {
1361 let Some(root) = op.path.first().map(String::as_str) else {
1362 return Err(crate::RedDBError::Query(
1363 "patch path cannot be empty".to_string(),
1364 ));
1365 };
1366
1367 match root {
1368 "body" if is_document_collection => {
1369 if op.path.len() < 2 {
1370 return Err(crate::RedDBError::Query(
1371 "document body patch paths require a nested key; use payload.body for full replacement"
1372 .to_string(),
1373 ));
1374 }
1375 op.path.remove(0);
1376 reject_document_array_position_path(&op.path)?;
1377 document_body_ops.push(op);
1378 }
1379 "fields" | "named" => {
1380 if op.path.len() < 2 {
1381 return Err(crate::RedDBError::Query(
1382 "patch path 'fields' requires a nested key".to_string(),
1383 ));
1384 }
1385 if row_contract_timestamps {
1386 let leaf = op.path.get(1).map(String::as_str);
1387 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1388 return Err(crate::RedDBError::Query(format!(
1389 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1390 collection,
1391 leaf.unwrap_or("")
1392 )));
1393 }
1394 }
1395 op.path.remove(0);
1396 field_ops.push(op);
1397 }
1398 "metadata" => {
1399 if op.path.len() < 2 {
1400 return Err(crate::RedDBError::Query(
1401 "patch path 'metadata' requires a nested key".to_string(),
1402 ));
1403 }
1404 op.path.remove(0);
1405 metadata_ops.push(op);
1406 }
1407 _ => {
1408 return Err(crate::RedDBError::Query(format!(
1409 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1410 )));
1411 }
1412 }
1413 }
1414
1415 if !document_body_ops.is_empty() {
1416 context_index_dirty = true;
1417 let named = row.named.get_or_insert_with(Default::default);
1418 let mut body = document_body_from_named(named)?;
1419 apply_patch_operations_to_json(&mut body, &document_body_ops)
1420 .map_err(crate::RedDBError::Query)?;
1421 replace_document_row_body(named, body, &mut modified_columns)?;
1422 }
1423
1424 if is_document_collection {
1425 if let Some(body) = payload.get("body") {
1426 context_index_dirty = true;
1427 let named = row.named.get_or_insert_with(Default::default);
1428 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1429 }
1430 }
1431
1432 if !field_ops.is_empty() {
1433 context_index_dirty = true;
1434 for op in &field_ops {
1435 if let Some(col) = op.path.first() {
1436 modified_columns.push(col.clone());
1437 }
1438 }
1439 let named = row.named.get_or_insert_with(Default::default);
1440 apply_patch_operations_to_storage_map(named, &field_ops)?;
1441 }
1442
1443 if let Some(fields) = payload
1444 .get("fields")
1445 .and_then(crate::json::Value::as_object)
1446 {
1447 if row_contract_timestamps {
1448 for key in fields.keys() {
1449 if key == "created_at" || key == "updated_at" {
1450 return Err(crate::RedDBError::Query(format!(
1451 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1452 collection, key
1453 )));
1454 }
1455 }
1456 }
1457 context_index_dirty = true;
1458 let named = row.named.get_or_insert_with(Default::default);
1459 for (key, value) in fields {
1460 modified_columns.push(key.clone());
1461 named.insert(key.clone(), json_to_storage_value(value)?);
1462 }
1463 }
1464
1465 if !metadata_ops.is_empty() {
1466 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1467 let metadata = patch_metadata.get_or_insert_with(|| {
1468 store.get_metadata(&collection, id).unwrap_or_default()
1469 });
1470 let mut metadata_json = metadata_to_json(metadata);
1471 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1472 .map_err(crate::RedDBError::Query)?;
1473 *metadata = metadata_from_json(&metadata_json)?;
1474 metadata_changed = true;
1475 }
1476
1477 if !modified_columns.is_empty() || row_contract_timestamps {
1478 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1479 let current_fields = if let Some(named) = row.named.take() {
1480 named.into_iter().collect::<Vec<_>>()
1481 } else if let Some(schema) = row.schema.as_ref() {
1482 schema
1483 .iter()
1484 .cloned()
1485 .zip(row.columns.iter().cloned())
1486 .collect::<Vec<_>>()
1487 } else {
1488 Vec::new()
1489 };
1490 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1491 if row_contract_timestamps {
1492 modified_columns.push("updated_at".to_string());
1493 context_index_dirty = true;
1494 }
1495 if contract.requires_uniqueness_check(&modified_columns) {
1496 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1497 }
1498 row.named = Some(normalized_fields.into_iter().collect());
1499 }
1500 }
1501 crate::storage::EntityData::Node(node) => {
1502 let mut field_ops = Vec::new();
1503 let mut metadata_ops = Vec::new();
1504 let mut node_type_ops = Vec::new();
1505
1506 for mut op in operations {
1507 let Some(root) = op.path.first().map(String::as_str) else {
1508 return Err(crate::RedDBError::Query(
1509 "patch path cannot be empty".to_string(),
1510 ));
1511 };
1512
1513 match root {
1514 "fields" | "properties" => {
1515 if op.path.len() < 2 {
1516 return Err(crate::RedDBError::Query(
1517 "patch path 'fields' requires a nested key".to_string(),
1518 ));
1519 }
1520 op.path.remove(0);
1521 field_ops.push(op);
1522 }
1523 "metadata" => {
1524 if op.path.len() < 2 {
1525 return Err(crate::RedDBError::Query(
1526 "patch path 'metadata' requires a nested key".to_string(),
1527 ));
1528 }
1529 op.path.remove(0);
1530 metadata_ops.push(op);
1531 }
1532 "node_type" => {
1533 if op.path.len() != 1 {
1534 return Err(crate::RedDBError::Query(
1535 "patch path 'node_type' does not allow nested keys".to_string(),
1536 ));
1537 }
1538 op.path.clear();
1539 node_type_ops.push(op);
1540 }
1541 _ => {
1542 return Err(crate::RedDBError::Query(format!(
1543 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1544 )));
1545 }
1546 }
1547 }
1548
1549 for op in node_type_ops {
1550 context_index_dirty = true;
1551 let value = op.value.ok_or_else(|| {
1552 crate::RedDBError::Query("node_type operations require a value".to_string())
1553 })?;
1554
1555 match op.op {
1556 PatchEntityOperationType::Unset => {
1557 return Err(crate::RedDBError::Query(
1558 "node_type cannot be unset through patch operations".to_string(),
1559 ));
1560 }
1561 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1562 let Some(node_type) = value.as_str() else {
1563 return Err(crate::RedDBError::Query(
1564 "node_type operation requires a text value".to_string(),
1565 ));
1566 };
1567 graph_node_type = Some(node_type.to_string());
1568 modified_columns.push("node_type".to_string());
1569 }
1570 }
1571 }
1572
1573 if !field_ops.is_empty() {
1574 context_index_dirty = true;
1575 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1576 }
1577
1578 if let Some(fields) = payload
1579 .get("fields")
1580 .and_then(crate::json::Value::as_object)
1581 {
1582 context_index_dirty = true;
1583 for (key, value) in fields {
1584 node.properties
1585 .insert(key.clone(), json_to_storage_value(value)?);
1586 }
1587 }
1588
1589 if !metadata_ops.is_empty() {
1590 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1591 let metadata = patch_metadata.get_or_insert_with(|| {
1592 store.get_metadata(&collection, id).unwrap_or_default()
1593 });
1594 let mut metadata_json = metadata_to_json(metadata);
1595 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1596 .map_err(crate::RedDBError::Query)?;
1597 *metadata = metadata_from_json(&metadata_json)?;
1598 metadata_changed = true;
1599 }
1600 }
1601 crate::storage::EntityData::Edge(edge) => {
1602 let mut field_ops = Vec::new();
1603 let mut metadata_ops = Vec::new();
1604 let mut weight_ops = Vec::new();
1605
1606 for mut op in operations {
1607 let Some(root) = op.path.first().map(String::as_str) else {
1608 return Err(crate::RedDBError::Query(
1609 "patch path cannot be empty".to_string(),
1610 ));
1611 };
1612
1613 match root {
1614 "fields" | "properties" => {
1615 if op.path.len() < 2 {
1616 return Err(crate::RedDBError::Query(
1617 "patch path 'fields' requires a nested key".to_string(),
1618 ));
1619 }
1620 op.path.remove(0);
1621 field_ops.push(op);
1622 }
1623 "weight" => {
1624 if op.path.len() != 1 {
1625 return Err(crate::RedDBError::Query(
1626 "patch path 'weight' does not allow nested keys".to_string(),
1627 ));
1628 }
1629 op.path.clear();
1630 weight_ops.push(op);
1631 }
1632 "metadata" => {
1633 if op.path.len() < 2 {
1634 return Err(crate::RedDBError::Query(
1635 "patch path 'metadata' requires a nested key".to_string(),
1636 ));
1637 }
1638 op.path.remove(0);
1639 metadata_ops.push(op);
1640 }
1641 _ => {
1642 return Err(crate::RedDBError::Query(format!(
1643 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1644 )));
1645 }
1646 }
1647 }
1648
1649 if !field_ops.is_empty() {
1650 context_index_dirty = true;
1651 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1652 }
1653
1654 for op in weight_ops {
1655 context_index_dirty = true;
1656 let value = op.value.ok_or_else(|| {
1657 crate::RedDBError::Query("weight operations require a value".to_string())
1658 })?;
1659
1660 match op.op {
1661 PatchEntityOperationType::Unset => {
1662 return Err(crate::RedDBError::Query(
1663 "weight cannot be unset through patch operations".to_string(),
1664 ));
1665 }
1666 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1667 let Some(weight) = value.as_f64() else {
1668 return Err(crate::RedDBError::Query(
1669 "weight operation requires a numeric value".to_string(),
1670 ));
1671 };
1672 graph_edge_weight = Some(weight as f32);
1673 edge.weight = weight as f32;
1674 modified_columns.push("weight".to_string());
1675 }
1676 }
1677 }
1678
1679 if let Some(fields) = payload
1680 .get("fields")
1681 .and_then(crate::json::Value::as_object)
1682 {
1683 context_index_dirty = true;
1684 for (key, value) in fields {
1685 edge.properties
1686 .insert(key.clone(), json_to_storage_value(value)?);
1687 }
1688 }
1689
1690 if !metadata_ops.is_empty() {
1691 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1692 let metadata = patch_metadata.get_or_insert_with(|| {
1693 store.get_metadata(&collection, id).unwrap_or_default()
1694 });
1695 let mut metadata_json = metadata_to_json(metadata);
1696 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1697 .map_err(crate::RedDBError::Query)?;
1698 *metadata = metadata_from_json(&metadata_json)?;
1699 metadata_changed = true;
1700 }
1701 }
1702 crate::storage::EntityData::Vector(vector) => {
1703 let mut field_ops = Vec::new();
1704 let mut metadata_ops = Vec::new();
1705
1706 for mut op in operations {
1707 let Some(root) = op.path.first().map(String::as_str) else {
1708 return Err(crate::RedDBError::Query(
1709 "patch path cannot be empty".to_string(),
1710 ));
1711 };
1712
1713 match root {
1714 "fields" => {
1715 if op.path.len() < 2 {
1716 return Err(crate::RedDBError::Query(
1717 "patch path 'fields' requires a nested key".to_string(),
1718 ));
1719 }
1720 op.path.remove(0);
1721 let Some(target) = op.path.first().map(String::as_str) else {
1722 return Err(crate::RedDBError::Query(
1723 "patch path requires a target under fields".to_string(),
1724 ));
1725 };
1726 if !matches!(target, "dense" | "content" | "sparse") {
1727 return Err(crate::RedDBError::Query(format!(
1728 "unsupported vector patch target '{target}'"
1729 )));
1730 }
1731 field_ops.push(op);
1732 }
1733 "metadata" => {
1734 if op.path.len() < 2 {
1735 return Err(crate::RedDBError::Query(
1736 "patch path 'metadata' requires a nested key".to_string(),
1737 ));
1738 }
1739 op.path.remove(0);
1740 metadata_ops.push(op);
1741 }
1742 _ => {
1743 return Err(crate::RedDBError::Query(format!(
1744 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1745 )));
1746 }
1747 }
1748 }
1749
1750 if !field_ops.is_empty() {
1751 context_index_dirty = true;
1752 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1753 }
1754
1755 if let Some(fields) = payload
1756 .get("fields")
1757 .and_then(crate::json::Value::as_object)
1758 {
1759 context_index_dirty = true;
1760 if let Some(content) =
1761 fields.get("content").and_then(crate::json::Value::as_str)
1762 {
1763 vector.content = Some(content.to_string());
1764 }
1765 if let Some(dense) = fields.get("dense") {
1766 vector.dense = dense
1767 .as_array()
1768 .ok_or_else(|| {
1769 crate::RedDBError::Query(
1770 "field 'dense' must be an array".to_string(),
1771 )
1772 })?
1773 .iter()
1774 .map(|value| {
1775 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1776 crate::RedDBError::Query(
1777 "field 'dense' must contain only numbers".to_string(),
1778 )
1779 })
1780 })
1781 .collect::<Result<Vec<_>, _>>()?;
1782 }
1783 }
1784
1785 if !metadata_ops.is_empty() {
1786 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1787 let metadata = patch_metadata.get_or_insert_with(|| {
1788 store.get_metadata(&collection, id).unwrap_or_default()
1789 });
1790 let mut metadata_json = metadata_to_json(metadata);
1791 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1792 .map_err(crate::RedDBError::Query)?;
1793 *metadata = metadata_from_json(&metadata_json)?;
1794 metadata_changed = true;
1795 }
1796 }
1797 crate::storage::EntityData::TimeSeries(_)
1798 | crate::storage::EntityData::QueueMessage(_) => {
1799 return Err(crate::RedDBError::Query(
1800 "patch operations are not supported for TimeSeries or QueueMessage entities"
1801 .to_string(),
1802 ));
1803 }
1804 }
1805
1806 if let Some(node_type) = graph_node_type {
1807 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1808 node.node_type = node_type;
1809 }
1810 }
1811 if let Some(weight) = graph_edge_weight {
1812 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1813 edge.weight = (weight * 1000.0) as u32;
1814 }
1815 }
1816
1817 if let Some(metadata) = payload
1818 .get("metadata")
1819 .and_then(crate::json::Value::as_object)
1820 {
1821 let patch_metadata = patch_metadata
1822 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1823 for (key, value) in metadata {
1824 ensure_non_tree_reserved_metadata_key(key)?;
1825 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1826 }
1827 metadata_changed = true;
1828 }
1829
1830 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1831 let patch_metadata = patch_metadata
1832 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1833 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1834 patch_metadata.remove(&key);
1835 } else {
1836 patch_metadata.set(key, value);
1837 }
1838 metadata_changed = true;
1839 }
1840
1841 entity.updated_at = std::time::SystemTime::now()
1842 .duration_since(std::time::UNIX_EPOCH)
1843 .unwrap_or_default()
1844 .as_secs();
1845
1846 if let Some(xid) = versioned_update_xid {
1852 let logical_id = entity.logical_id();
1853 entity.id = store.next_entity_id();
1854 entity.set_logical_id(logical_id);
1855 entity.set_xmin(xid);
1856 entity.set_xmax(0);
1857 if let Some(old) = replaced_entity.as_mut() {
1858 old.set_xmax(xid);
1859 }
1860 }
1861
1862 modified_columns = dedupe_modified_columns(modified_columns);
1863
1864 Ok(AppliedEntityMutation {
1865 id: entity.id,
1866 collection,
1867 entity,
1868 metadata: patch_metadata,
1869 modified_columns,
1870 persist_metadata: metadata_changed,
1871 context_index_dirty,
1872 replaced_entity,
1873 replaced_entity_previous_xmax: previous_xmax,
1874 pre_mutation_fields,
1875 })
1876 }
1877
1878 pub(crate) fn apply_loaded_sql_update_row_core(
1879 &self,
1880 collection: String,
1881 mut entity: crate::storage::UnifiedEntity,
1882 static_field_assignments: &[(String, Value)],
1883 dynamic_field_assignments: Vec<(String, Value)>,
1884 static_metadata_assignments: &[(String, MetadataValue)],
1885 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1886 row_contract_plan: Option<&RowUpdateContractPlan>,
1887 row_modified_columns_template: &[String],
1888 row_touches_unique_columns: bool,
1889 ) -> RedDBResult<AppliedEntityMutation> {
1890 let id = entity.id;
1891 let previous_xmax = entity.xmax;
1892 let db = self.db();
1893 let store = db.store();
1894 let Some(_) = store.get_collection(&collection) else {
1895 return Err(crate::RedDBError::NotFound(format!(
1896 "collection not found: {collection}"
1897 )));
1898 };
1899
1900 let versioned_update_xid = match self.current_xid() {
1901 Some(xid) => Some(xid),
1902 None => {
1903 let snapshot_manager = self.snapshot_manager();
1904 let xid = snapshot_manager.begin();
1905 snapshot_manager.commit(xid);
1906 Some(xid)
1907 }
1908 };
1909 let mut replaced_entity = versioned_update_xid.map(|xid| {
1910 let mut old = entity.clone();
1911 old.set_xmax(xid);
1912 old
1913 });
1914
1915 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1916 let row_contract_timestamps = row_contract_plan
1917 .map(|plan| plan.timestamps_enabled)
1918 .unwrap_or(false);
1919 let mut metadata_changed = false;
1920 let mut modified_columns = row_modified_columns_template.to_vec();
1921 let mut context_index_dirty = !modified_columns.is_empty();
1922
1923 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1927
1928 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1929 return Err(crate::RedDBError::Query(
1930 "SQL row update fast path requires a row entity".to_string(),
1931 ));
1932 };
1933
1934 let _ = row_contract_plan;
1935 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1936 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1937
1938 for (key, value) in static_metadata_assignments
1939 .iter()
1940 .cloned()
1941 .chain(dynamic_metadata_assignments)
1942 {
1943 ensure_non_tree_reserved_metadata_key(&key)?;
1944 patch_metadata
1945 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1946 .set(key, value);
1947 metadata_changed = true;
1948 }
1949
1950 if !modified_columns.is_empty() || row_contract_timestamps {
1951 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1952 if row_contract_timestamps {
1953 context_index_dirty = true;
1954 set_row_field(
1955 row,
1956 "updated_at",
1957 Value::UnsignedInteger(current_unix_ms_u64()),
1958 );
1959 modified_columns.push("updated_at".to_string());
1960 }
1961 if row_touches_unique_columns {
1962 let current_fields = collect_row_fields(row);
1963 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1964 }
1965 }
1966
1967 entity.updated_at = std::time::SystemTime::now()
1968 .duration_since(std::time::UNIX_EPOCH)
1969 .unwrap_or_default()
1970 .as_secs();
1971
1972 if let Some(xid) = versioned_update_xid {
1973 let logical_id = entity.logical_id();
1974 entity.id = store.next_entity_id();
1975 entity.set_logical_id(logical_id);
1976 entity.set_xmin(xid);
1977 entity.set_xmax(0);
1978 if let Some(old) = replaced_entity.as_mut() {
1979 old.set_xmax(xid);
1980 }
1981 }
1982
1983 modified_columns = dedupe_modified_columns(modified_columns);
1984
1985 Ok(AppliedEntityMutation {
1986 id: entity.id,
1987 collection,
1988 entity,
1989 metadata: patch_metadata,
1990 modified_columns,
1991 persist_metadata: metadata_changed,
1992 context_index_dirty,
1993 replaced_entity,
1994 replaced_entity_previous_xmax: previous_xmax,
1995 pre_mutation_fields,
1996 })
1997 }
1998
1999 pub(crate) fn persist_applied_entity_mutations(
2000 &self,
2001 applied: &[AppliedEntityMutation],
2002 ) -> RedDBResult<()> {
2003 if applied.is_empty() {
2004 return Ok(());
2005 }
2006
2007 let store = self.db().store();
2008 let collection = &applied[0].collection;
2009 let Some(manager) = store.get_collection(collection) else {
2010 return Err(crate::RedDBError::NotFound(format!(
2011 "collection not found: {collection}"
2012 )));
2013 };
2014
2015 let mut ordinary = Vec::with_capacity(applied.len());
2016 for item in applied {
2017 if let Some(old_version) = item.replaced_entity.as_ref() {
2018 store
2019 .install_versioned_table_row_update(
2020 collection,
2021 old_version.clone(),
2022 item.entity.clone(),
2023 if item.persist_metadata {
2024 item.metadata.as_ref()
2025 } else {
2026 None
2027 },
2028 )
2029 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2030 if self.current_xid().is_some() {
2031 self.record_pending_versioned_update(
2032 crate::runtime::impl_core::current_connection_id(),
2033 collection,
2034 old_version.id,
2035 item.entity.id,
2036 old_version.xmax,
2037 item.replaced_entity_previous_xmax,
2038 );
2039 }
2040 } else {
2041 ordinary.push(item);
2042 }
2043 }
2044 if ordinary.is_empty() {
2045 return Ok(());
2046 }
2047
2048 manager
2049 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
2050 (
2051 &item.entity,
2052 item.modified_columns.as_slice(),
2053 if item.persist_metadata {
2054 item.metadata.as_ref()
2055 } else {
2056 None
2057 },
2058 )
2059 }))
2060 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
2061
2062 let indexed_cols = self
2071 .index_store_ref()
2072 .indexed_columns_set(collection.as_str());
2073 let all_hot = !indexed_cols.is_empty()
2074 && ordinary.iter().all(|item| {
2075 !item.persist_metadata
2076 && !item
2077 .modified_columns
2078 .iter()
2079 .any(|c| indexed_cols.contains(c))
2080 })
2081 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2082
2083 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2087 ordinary.iter().map(|item| &item.entity).collect();
2088 let persist_fn = if all_hot {
2089 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2090 } else {
2091 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2092 };
2093 persist_fn(store.as_ref(), collection, &entity_refs)
2094 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2095 }
2096
2097 pub(crate) fn flush_applied_entity_mutation(
2098 &self,
2099 applied: &AppliedEntityMutation,
2100 ) -> RedDBResult<()> {
2101 let store = self.db().store();
2102 if applied.context_index_dirty {
2103 store
2104 .context_index()
2105 .index_entity(&applied.collection, &applied.entity);
2106 }
2107 let mut changed_columns: Option<Vec<String>> = None;
2115 if !applied.pre_mutation_fields.is_empty() {
2116 let post = entity_row_fields_snapshot(&applied.entity);
2117 if !post.is_empty() {
2118 let damage = crate::application::entity::row_damage_vector(
2119 &applied.pre_mutation_fields,
2120 &post,
2121 );
2122 if !damage.is_empty() {
2123 changed_columns = Some(
2124 damage
2125 .touched_columns()
2126 .into_iter()
2127 .map(str::to_string)
2128 .collect(),
2129 );
2130 }
2131
2132 let indexed_cols: std::collections::HashSet<String> = self
2141 .index_store_ref()
2142 .list_indices(applied.collection.as_str())
2143 .into_iter()
2144 .filter_map(|idx| idx.columns.first().cloned())
2145 .collect();
2146 let modified_cols: std::collections::HashSet<String> = damage
2147 .touched_columns()
2148 .into_iter()
2149 .map(str::to_string)
2150 .collect();
2151 if let Some(old_version) = applied.replaced_entity.as_ref() {
2152 let old_index_fields: Vec<(String, Value)> = applied
2153 .pre_mutation_fields
2154 .iter()
2155 .filter(|(col, _)| indexed_cols.contains(col))
2156 .cloned()
2157 .collect();
2158 let new_index_fields: Vec<(String, Value)> = post
2159 .iter()
2160 .filter(|(col, _)| indexed_cols.contains(col))
2161 .cloned()
2162 .collect();
2163 if !old_index_fields.is_empty() {
2164 self.index_store_ref()
2165 .index_entity_delete(
2166 &applied.collection,
2167 old_version.id,
2168 &old_index_fields,
2169 )
2170 .map_err(crate::RedDBError::Internal)?;
2171 }
2172 if !new_index_fields.is_empty() {
2173 self.index_store_ref()
2174 .index_entity_insert(
2175 &applied.collection,
2176 applied.entity.id,
2177 &new_index_fields,
2178 )
2179 .map_err(crate::RedDBError::Internal)?;
2180 }
2181 } else {
2182 let decision = crate::storage::engine::hot_update::decide(
2183 &crate::storage::engine::hot_update::HotUpdateInputs {
2184 collection: applied.collection.as_str(),
2185 indexed_columns: &indexed_cols,
2186 modified_columns: &modified_cols,
2187 new_tuple_size: 0,
2191 page_free_space: usize::MAX,
2192 },
2193 );
2194 if !decision.can_hot {
2195 self.index_store_ref()
2196 .index_entity_update(
2197 &applied.collection,
2198 applied.id,
2199 &applied.pre_mutation_fields,
2200 &post,
2201 )
2202 .map_err(crate::RedDBError::Internal)?;
2203 } else {
2204 tracing::debug!(
2208 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2209 "hot_update fast-path: skipped index_entity_update"
2210 );
2211 }
2212 }
2213 }
2214 }
2215 self.cdc_emit_prebuilt_with_columns(
2216 crate::replication::cdc::ChangeOperation::Update,
2217 &applied.collection,
2218 &applied.entity,
2219 "entity",
2220 applied.metadata.as_ref(),
2221 true,
2222 changed_columns,
2223 );
2224 Ok(())
2225 }
2226
2227 pub(crate) fn apply_loaded_patch_entity(
2228 &self,
2229 collection: String,
2230 entity: crate::storage::UnifiedEntity,
2231 payload: JsonValue,
2232 operations: Vec<PatchEntityOperation>,
2233 ) -> RedDBResult<CreateEntityOutput> {
2234 let applied =
2235 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2236 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2237 self.flush_applied_entity_mutation(&applied)?;
2238 Ok(CreateEntityOutput {
2239 id: applied.id,
2240 entity: Some(applied.entity),
2241 })
2242 }
2243}
2244
2245fn ensure_non_tree_reserved_metadata_patch_paths(
2246 operations: &[PatchEntityOperation],
2247) -> RedDBResult<()> {
2248 for operation in operations {
2249 let Some(key) = operation.path.first().map(String::as_str) else {
2250 continue;
2251 };
2252 ensure_non_tree_reserved_metadata_key(key)?;
2253 }
2254 Ok(())
2255}
2256
2257fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2258 if key.starts_with(TREE_METADATA_PREFIX) {
2259 return Err(crate::RedDBError::Query(format!(
2260 "metadata key '{}' is reserved for managed trees",
2261 key
2262 )));
2263 }
2264 Ok(())
2265}
2266
2267fn ensure_non_tree_reserved_metadata_entries(
2268 metadata: &[(String, MetadataValue)],
2269) -> RedDBResult<()> {
2270 for (key, _) in metadata {
2271 ensure_non_tree_reserved_metadata_key(key)?;
2272 }
2273 Ok(())
2274}
2275
2276fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2277 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2278 return Err(crate::RedDBError::Query(format!(
2279 "edge label '{}' is reserved for managed trees",
2280 TREE_CHILD_EDGE_LABEL
2281 )));
2282 }
2283 Ok(())
2284}
2285
2286impl RedDBRuntime {
2287 pub(crate) fn create_node_unchecked(
2288 &self,
2289 input: CreateNodeInput,
2290 ) -> RedDBResult<CreateEntityOutput> {
2291 let db = self.db();
2292 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2293 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2294 let mut metadata = input.metadata;
2295 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2296 let mut builder = db.node(&input.collection, &input.label);
2297
2298 if let Some(node_type) = input.node_type {
2299 builder = builder.node_type(node_type);
2300 }
2301
2302 for (key, value) in input.properties {
2303 builder = builder.property(key, value);
2304 }
2305
2306 for (key, value) in metadata {
2307 builder = builder.metadata(key, value);
2308 }
2309
2310 for embedding in input.embeddings {
2311 if let Some(model) = embedding.model {
2312 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2313 } else {
2314 builder = builder.embedding(embedding.name, embedding.vector);
2315 }
2316 }
2317
2318 for link in input.table_links {
2319 builder = builder.link_to_table(link.key, link.table);
2320 }
2321
2322 for link in input.node_links {
2323 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2324 }
2325
2326 let id = builder.save()?;
2327 self.stamp_xmin_if_in_txn(&input.collection, id);
2330 refresh_context_index(&db, &input.collection, id)?;
2331 self.cdc_emit(
2332 crate::replication::cdc::ChangeOperation::Insert,
2333 &input.collection,
2334 id.raw(),
2335 "graph_node",
2336 );
2337 Ok(CreateEntityOutput {
2338 id,
2339 entity: db.store().get(&input.collection, id),
2340 })
2341 }
2342
2343 pub(crate) fn create_edge_unchecked(
2344 &self,
2345 input: CreateEdgeInput,
2346 ) -> RedDBResult<CreateEntityOutput> {
2347 let db = self.db();
2348 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2349 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2350 let mut metadata = input.metadata;
2351 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2352 let mut builder = db
2353 .edge(&input.collection, &input.label)
2354 .from(input.from)
2355 .to(input.to);
2356
2357 if let Some(weight) = input.weight {
2358 builder = builder.weight(weight);
2359 }
2360
2361 for (key, value) in input.properties {
2362 builder = builder.property(key, value);
2363 }
2364
2365 for (key, value) in metadata {
2366 builder = builder.metadata(key, value);
2367 }
2368
2369 let id = builder.save()?;
2370 self.stamp_xmin_if_in_txn(&input.collection, id);
2373 refresh_context_index(&db, &input.collection, id)?;
2374 self.cdc_emit(
2375 crate::replication::cdc::ChangeOperation::Insert,
2376 &input.collection,
2377 id.raw(),
2378 "graph_edge",
2379 );
2380 Ok(CreateEntityOutput {
2381 id,
2382 entity: db.store().get(&input.collection, id),
2383 })
2384 }
2385}
2386
2387fn create_rows_batch_prevalidated_columnar_with_outputs(
2388 runtime: &RedDBRuntime,
2389 collection: String,
2390 column_names: std::sync::Arc<Vec<String>>,
2391 rows: Vec<Vec<crate::storage::schema::Value>>,
2392) -> RedDBResult<Vec<CreateEntityOutput>> {
2393 use crate::storage::{
2394 unified::{EntityData, EntityKind, RowData},
2395 EntityId, UnifiedEntity,
2396 };
2397 use std::sync::Arc;
2398
2399 if rows.is_empty() {
2400 return Ok(Vec::new());
2401 }
2402 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2403 runtime.check_batch_size(rows.len())?;
2404 runtime.check_db_size()?;
2405
2406 let db = runtime.db();
2407 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2408 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2409
2410 let store = db.store();
2411 let table_arc: Arc<str> = Arc::from(collection.as_str());
2412
2413 let indexed_cols = runtime
2414 .index_store_ref()
2415 .indexed_columns_set(collection.as_str());
2416 let has_secondary_indexes = !indexed_cols.is_empty();
2417 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2418 if has_secondary_indexes {
2419 Vec::with_capacity(rows.len())
2420 } else {
2421 Vec::new()
2422 };
2423
2424 let entities: Vec<UnifiedEntity> = rows
2425 .into_iter()
2426 .map(|values| {
2427 if has_secondary_indexes {
2428 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2429 Vec::with_capacity(indexed_cols.len());
2430 for (name, val) in column_names.iter().zip(values.iter()) {
2431 if indexed_cols.contains(name) {
2432 snap.push((name.clone(), val.clone()));
2433 }
2434 }
2435 field_snapshots.push(snap);
2436 }
2437 let mut row = RowData::new(values);
2438 row.schema = Some(Arc::clone(&column_names));
2439 UnifiedEntity::new(
2440 EntityId::new(0),
2441 EntityKind::TableRow {
2442 table: Arc::clone(&table_arc),
2443 row_id: 0,
2444 },
2445 EntityData::Row(row),
2446 )
2447 })
2448 .collect();
2449
2450 let ids = store
2451 .bulk_insert(&collection, entities)
2452 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2453
2454 if has_secondary_indexes {
2455 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2456 .iter()
2457 .zip(field_snapshots)
2458 .map(|(id, fields)| (*id, fields))
2459 .collect();
2460 runtime
2461 .index_store_ref()
2462 .index_entity_insert_batch(&collection, &index_rows)
2463 .map_err(crate::RedDBError::Internal)?;
2464 }
2465
2466 runtime.invalidate_result_cache();
2467 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2468
2469 Ok(ids
2470 .into_iter()
2471 .map(|id| CreateEntityOutput { id, entity: None })
2472 .collect())
2473}
2474
2475impl RuntimeEntityPort for RedDBRuntime {
2476 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2477 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2478 let db = self.db();
2479 let CreateRowInput {
2480 collection,
2481 fields,
2482 metadata: input_metadata,
2483 node_links,
2484 vector_links,
2485 } = input;
2486 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2487 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2488 let mut metadata = input_metadata;
2489 apply_collection_default_ttl(&db, &collection, &mut metadata);
2490 let fields = contract.normalize_insert_fields(fields)?;
2491 contract.enforce_row_uniqueness(&fields, None)?;
2492 let engine = self.mutation_engine();
2494 let result = engine.apply(
2495 collection.clone(),
2496 vec![crate::runtime::mutation::MutationRow {
2497 fields,
2498 metadata,
2499 node_links,
2500 vector_links,
2501 }],
2502 )?;
2503 let id = result.ids[0];
2504 Ok(CreateEntityOutput {
2509 id,
2510 entity: db.store().get(&collection, id),
2511 })
2512 }
2513
2514 fn create_rows_batch(
2515 &self,
2516 input: CreateRowsBatchInput,
2517 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2518 if input.rows.is_empty() {
2519 return Ok(Vec::new());
2520 }
2521 self.check_batch_size(input.rows.len())?;
2522 self.check_db_size()?;
2523
2524 let db = self.db();
2525 let collection = input.collection;
2526 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2527 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2528
2529 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2530 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2531 for row in input.rows {
2532 if row.collection != collection {
2533 return Err(crate::RedDBError::Query(format!(
2534 "batch row collection mismatch: expected '{}', got '{}'",
2535 collection, row.collection
2536 )));
2537 }
2538
2539 let mut metadata = row.metadata;
2540 apply_collection_default_ttl(&db, &collection, &mut metadata);
2541 let fields = contract.normalize_insert_fields(row.fields)?;
2542 contract.enforce_row_uniqueness(&fields, None)?;
2543 uniqueness_rows.push(fields.clone());
2544 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2545 }
2546
2547 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2548
2549 let engine = {
2552 let e = self.mutation_engine();
2553 if input.suppress_events {
2554 e.with_suppress_events()
2555 } else {
2556 e
2557 }
2558 };
2559 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2560 .into_iter()
2561 .map(|(fields, metadata, node_links, vector_links)| {
2562 crate::runtime::mutation::MutationRow {
2563 fields,
2564 metadata,
2565 node_links,
2566 vector_links,
2567 }
2568 })
2569 .collect();
2570
2571 let result = engine
2572 .apply(collection.clone(), mutation_rows)
2573 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2574
2575 let store = db.store();
2576 Ok(result
2577 .ids
2578 .into_iter()
2579 .map(|id| CreateEntityOutput {
2580 id,
2581 entity: store.get(&collection, id),
2582 })
2583 .collect())
2584 }
2585
2586 fn create_rows_batch_prevalidated_columnar(
2587 &self,
2588 collection: String,
2589 column_names: std::sync::Arc<Vec<String>>,
2590 rows: Vec<Vec<crate::storage::schema::Value>>,
2591 ) -> RedDBResult<usize> {
2592 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2593 .map(|outputs| outputs.len())
2594 }
2595
2596 fn create_rows_batch_columnar(
2597 &self,
2598 collection: String,
2599 column_names: std::sync::Arc<Vec<String>>,
2600 rows: Vec<Vec<crate::storage::schema::Value>>,
2601 ) -> RedDBResult<usize> {
2602 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2603 .map(|outputs| outputs.len())
2604 }
2605
2606 fn create_rows_batch_columnar_with_outputs(
2607 &self,
2608 collection: String,
2609 column_names: std::sync::Arc<Vec<String>>,
2610 rows: Vec<Vec<crate::storage::schema::Value>>,
2611 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2612 if rows.is_empty() {
2613 return Ok(Vec::new());
2614 }
2615 self.check_batch_size(rows.len())?;
2616 self.check_db_size()?;
2617
2618 let db = self.db();
2619 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2620 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2621
2622 let needs_normalisation = match db.collection_contract(&collection) {
2632 Some(c) => {
2633 c.declared_model == crate::catalog::CollectionModel::Table
2634 && (!c.declared_columns.is_empty()
2635 || c.table_def
2636 .as_ref()
2637 .map(|t| !t.columns.is_empty())
2638 .unwrap_or(false))
2639 }
2640 None => false,
2641 };
2642 if !needs_normalisation {
2643 return create_rows_batch_prevalidated_columnar_with_outputs(
2644 self,
2645 collection,
2646 column_names,
2647 rows,
2648 );
2649 }
2650
2651 let ncols = column_names.len();
2658 let tuple_rows: Vec<CreateRowInput> = rows
2659 .into_iter()
2660 .map(|values| {
2661 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2662 Vec::with_capacity(ncols);
2663 for (name, value) in column_names.iter().zip(values) {
2664 fields.push((name.clone(), value));
2665 }
2666 CreateRowInput {
2667 collection: collection.clone(),
2668 fields,
2669 metadata: Vec::new(),
2670 node_links: Vec::new(),
2671 vector_links: Vec::new(),
2672 }
2673 })
2674 .collect();
2675 self.create_rows_batch(CreateRowsBatchInput {
2676 collection,
2677 rows: tuple_rows,
2678 suppress_events: false,
2679 })
2680 }
2681
2682 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2683 if input.rows.is_empty() {
2684 return Ok(0);
2685 }
2686 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2687 self.check_batch_size(input.rows.len())?;
2688 self.check_db_size()?;
2689
2690 let db = self.db();
2691 let collection = input.collection;
2692 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2697 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2698
2699 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2705
2706 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2707 Vec::with_capacity(input.rows.len());
2708 let mut mutation_rows = mutation_rows;
2709 for row in input.rows {
2710 if row.collection != collection {
2711 return Err(crate::RedDBError::Query(format!(
2712 "batch row collection mismatch: expected '{}', got '{}'",
2713 collection, row.collection
2714 )));
2715 }
2716 let mut metadata = row.metadata;
2717 if let Some(ttl) = default_ttl_ms {
2718 if !has_internal_ttl_metadata(&metadata) {
2719 metadata.push((
2720 "_ttl_ms".to_string(),
2721 if ttl <= i64::MAX as u64 {
2722 MetadataValue::Int(ttl as i64)
2723 } else {
2724 MetadataValue::Timestamp(ttl)
2725 },
2726 ));
2727 }
2728 }
2729 mutation_rows.push(crate::runtime::mutation::MutationRow {
2730 fields: row.fields,
2731 metadata,
2732 node_links: row.node_links,
2733 vector_links: row.vector_links,
2734 });
2735 }
2736
2737 let engine = self.mutation_engine();
2738 let result = engine
2739 .apply(collection, mutation_rows)
2740 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2741 Ok(result.ids.len())
2742 }
2743
2744 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2745 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2746 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2747 input.properties.iter().map(|(key, _)| key.as_str()),
2748 &format!("node '{}'", input.collection),
2749 )?;
2750 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2751 self.create_node_unchecked(input)
2752 }
2753
2754 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2755 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2756 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2757 input.properties.iter().map(|(key, _)| key.as_str()),
2758 &format!("edge '{}'", input.collection),
2759 )?;
2760 ensure_non_tree_structural_edge_label(&input.label)?;
2761 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2762 self.create_edge_unchecked(input)
2763 }
2764
2765 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2766 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2767 let db = self.db();
2768 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2769 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2770 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2771 let mut metadata = input.metadata;
2772 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2773 let mut builder = db.vector(&input.collection).dense(input.dense);
2774
2775 if let Some(content) = input.content {
2776 builder = builder.content(content);
2777 }
2778
2779 for (key, value) in metadata {
2780 builder = builder.metadata(key, value);
2781 }
2782
2783 if let Some(link_row) = input.link_row {
2784 builder = builder.link_to_table(link_row);
2785 }
2786
2787 if let Some(link_node) = input.link_node {
2788 builder = builder.link_to_node(link_node);
2789 }
2790
2791 let id = builder.save()?;
2792 let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2793 Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2794 _ => None,
2795 };
2796 self.stamp_xmin_if_in_txn(&input.collection, id);
2799 refresh_context_index(&db, &input.collection, id)?;
2800 if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2806 state.ensure_populated(&db.store(), &input.collection);
2810 use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2812 fire(InjectionPoint::BeforeWalFsync);
2813 let _ = db
2814 .store()
2815 .append_vector_insert_record(&input.collection, id.raw(), &dense);
2816 fire(InjectionPoint::BeforeIndexCommit);
2817 let mut index = state.index.lock();
2818 index.insert(id, dense.clone());
2819 fire(InjectionPoint::BeforeExtentFsync);
2826 if let Some(extent) = state.extent.lock().as_mut() {
2827 let packed = index.encode_for_extent(&dense);
2828 let _ = extent.append(&packed);
2829 }
2830 }
2831 self.cdc_emit(
2832 crate::replication::cdc::ChangeOperation::Insert,
2833 &input.collection,
2834 id.raw(),
2835 "vector",
2836 );
2837 Ok(CreateEntityOutput {
2838 id,
2839 entity: db.store().get(&input.collection, id),
2840 })
2841 }
2842
2843 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2844 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2845 let db = self.db();
2846 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2847 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2848
2849 if let JsonValue::Object(ref map) = input.body {
2850 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2851 map.keys().map(String::as_str),
2852 &format!("document '{}'", input.collection),
2853 )?;
2854 }
2855
2856 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2858 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2859 })?;
2860 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2861 "body".to_string(),
2862 crate::storage::schema::Value::Json(body_bytes),
2863 )];
2864
2865 if let JsonValue::Object(ref map) = input.body {
2867 for (key, value) in map {
2868 let storage_value = json_to_storage_value(value)?;
2869 fields.push((key.clone(), storage_value));
2870 }
2871 }
2872
2873 let mut metadata = input.metadata;
2874 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2875 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2876 .iter()
2877 .map(|(key, value)| (key.as_str(), value.clone()))
2878 .collect();
2879 let mut builder = db.row(&input.collection, columns);
2880
2881 for (key, value) in metadata {
2882 builder = builder.metadata(key, value);
2883 }
2884
2885 for node in input.node_links {
2886 builder = builder.link_to_node(node);
2887 }
2888
2889 for vector in input.vector_links {
2890 builder = builder.link_to_vector(vector);
2891 }
2892
2893 let id = builder.save()?;
2894 self.stamp_xmin_if_in_txn(&input.collection, id);
2896 refresh_context_index(&db, &input.collection, id)?;
2897 self.cdc_emit(
2898 crate::replication::cdc::ChangeOperation::Insert,
2899 &input.collection,
2900 id.raw(),
2901 "document",
2902 );
2903 Ok(CreateEntityOutput {
2904 id,
2905 entity: db.store().get(&input.collection, id),
2906 })
2907 }
2908
2909 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2910 let db = self.db();
2911 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2912 let declared_model = db
2913 .collection_contract(&input.collection)
2914 .map(|contract| contract.declared_model);
2915 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2916 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2917 self.seal_vault_value(&input.collection, input.value)?
2918 } else {
2919 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2920 input.value
2921 };
2922 let fields = vec![
2923 (
2924 "key".to_string(),
2925 crate::storage::schema::Value::text(input.key),
2926 ),
2927 ("value".to_string(), value),
2928 ];
2929 let collection = input.collection;
2930 let result = self.mutation_engine().apply(
2931 collection.clone(),
2932 vec![crate::runtime::mutation::MutationRow {
2933 fields,
2934 metadata: input.metadata,
2935 node_links: Vec::new(),
2936 vector_links: Vec::new(),
2937 }],
2938 )?;
2939 let id = result.ids[0];
2940 Ok(CreateEntityOutput {
2941 id,
2942 entity: db.store().get(&collection, id),
2943 })
2944 }
2945
2946 fn create_timeseries_point(
2947 &self,
2948 input: CreateTimeSeriesPointInput,
2949 ) -> RedDBResult<CreateEntityOutput> {
2950 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2951 let db = self.db();
2952 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2953 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2954
2955 let mut fields = vec![
2956 (
2957 "metric".to_string(),
2958 crate::storage::schema::Value::text(input.metric),
2959 ),
2960 (
2961 "value".to_string(),
2962 crate::storage::schema::Value::Float(input.value),
2963 ),
2964 ];
2965
2966 if let Some(timestamp_ns) = input.timestamp_ns {
2967 fields.push((
2968 "timestamp".to_string(),
2969 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2970 ));
2971 }
2972
2973 if !input.tags.is_empty() {
2974 let tags_json = JsonValue::Object(
2975 input
2976 .tags
2977 .into_iter()
2978 .map(|(key, value)| (key, JsonValue::String(value)))
2979 .collect(),
2980 );
2981 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2982 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2983 })?;
2984 fields.push((
2985 "tags".to_string(),
2986 crate::storage::schema::Value::Json(tags_bytes),
2987 ));
2988 }
2989
2990 let collection = input.collection;
2991 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2992 self.stamp_xmin_if_in_txn(&collection, id);
2995 refresh_context_index(&db, &collection, id)?;
2996
2997 Ok(CreateEntityOutput {
2998 id,
2999 entity: db.store().get(&collection, id),
3000 })
3001 }
3002
3003 fn get_kv(
3004 &self,
3005 collection: &str,
3006 key: &str,
3007 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
3008 let db = self.db();
3009 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
3010 let store = db.store();
3011 let Some(manager) = store.get_collection(collection) else {
3012 return Ok(None);
3013 };
3014 let entities = manager.query_all(|_| true);
3015 for entity in entities {
3016 if let crate::storage::EntityData::Row(ref row) = entity.data {
3017 if let Some(ref named) = row.named {
3018 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
3019 if &**k == key {
3020 let value = named
3021 .get("value")
3022 .cloned()
3023 .unwrap_or(crate::storage::schema::Value::Null);
3024 return Ok(Some((value, entity.id)));
3025 }
3026 }
3027 }
3028 }
3029 }
3030 Ok(None)
3031 }
3032
3033 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
3034 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3035 let found = self.get_kv(collection, key)?;
3036 if let Some((_, id)) = found {
3037 let db = self.db();
3038 let store = db.store();
3039 let deleted = store
3040 .delete(collection, id)
3041 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3042 if deleted {
3043 store.context_index().remove_entity(id);
3044 }
3045 Ok(deleted)
3046 } else {
3047 Ok(false)
3048 }
3049 }
3050
3051 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
3052 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3053 let PatchEntityInput {
3054 collection,
3055 id,
3056 payload,
3057 operations,
3058 } = input;
3059 let db = self.db();
3060 let store = db.store();
3061 let Some(manager) = store.get_collection(&collection) else {
3062 return Err(crate::RedDBError::NotFound(format!(
3063 "collection not found: {collection}"
3064 )));
3065 };
3066 let Some(entity) = manager.get(id) else {
3067 return Err(crate::RedDBError::NotFound(format!(
3068 "entity not found: {}",
3069 id.raw()
3070 )));
3071 };
3072 self.apply_loaded_patch_entity(collection, entity, payload, operations)
3073 }
3074
3075 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3076 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3077 let store = self.db().store();
3078 let pre_delete_fields = store
3082 .get(&input.collection, input.id)
3083 .as_ref()
3084 .map(entity_row_fields_snapshot)
3085 .unwrap_or_default();
3086 let deleted = store
3090 .delete(&input.collection, input.id)
3091 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3092 if deleted {
3093 store.context_index().remove_entity(input.id);
3094 if !pre_delete_fields.is_empty() {
3097 self.index_store_ref()
3098 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3099 .map_err(crate::RedDBError::Internal)?;
3100 }
3101 self.cdc_emit(
3102 crate::replication::cdc::ChangeOperation::Delete,
3103 &input.collection,
3104 input.id.raw(),
3105 "entity",
3106 );
3107 }
3108 Ok(DeleteEntityOutput {
3109 deleted,
3110 id: input.id,
3111 })
3112 }
3113}