1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_rid, sys_key_tenant,
28 sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100 if !outcome.ok {
101 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102 self.inner
103 .chain_integrity_broken
104 .lock()
105 .insert(collection.to_string(), true);
106 }
107 Some(outcome)
108 }
109
110 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114 let store = self.inner.db.store();
115 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116 return false;
117 }
118 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119 self.inner
120 .chain_integrity_broken
121 .lock()
122 .insert(collection.to_string(), false);
123 true
124 }
125
126 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130 {
131 let cache = self.inner.chain_integrity_broken.lock();
132 if let Some(v) = cache.get(collection) {
133 return *v;
134 }
135 }
136 let store = self.inner.db.store();
137 let persisted =
138 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139 .unwrap_or(false);
140 self.inner
141 .chain_integrity_broken
142 .lock()
143 .insert(collection.to_string(), persisted);
144 persisted
145 }
146
147 fn ensure_integrity_tombstones_loaded(&self) -> bool {
152 use std::sync::atomic::Ordering;
153 match self
154 .inner
155 .integrity_tombstones_state
156 .load(Ordering::Relaxed)
157 {
158 1 => return false,
159 2 => return true,
160 _ => {}
161 }
162 let mut guard = self.inner.integrity_tombstones.lock();
165 if self
166 .inner
167 .integrity_tombstones_state
168 .load(Ordering::Relaxed)
169 == 0
170 {
171 let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172 let present = !ranges.is_empty();
173 *guard = ranges;
174 self.inner
175 .integrity_tombstones_state
176 .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177 }
178 self.inner
179 .integrity_tombstones_state
180 .load(Ordering::Relaxed)
181 == 2
182 }
183
184 pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190 use std::sync::atomic::Ordering;
191 self.ensure_integrity_tombstones_loaded();
192 let mut guard = self.inner.integrity_tombstones.lock();
193 guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194 table.to_string(),
195 lo,
196 hi,
197 ));
198 crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199 self.inner
200 .integrity_tombstones_state
201 .store(2, Ordering::Relaxed);
202 }
203
204 pub fn integrity_tombstone_ranges(
208 &self,
209 ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210 self.ensure_integrity_tombstones_loaded();
211 self.inner.integrity_tombstones.lock().clone()
212 }
213
214 pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219 if !self.ensure_integrity_tombstones_loaded() {
220 return;
221 }
222 let guard = self.inner.integrity_tombstones.lock();
223 if guard.is_empty() {
224 return;
225 }
226 let before = result.records.len();
227 result.records.retain(|record| {
228 !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229 });
230 if result.records.len() != before {
231 result.pre_serialized_json = None;
232 }
233 }
234
235 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249 let Some(tenant_col) = self.tenant_column(&query.table) else {
250 return Ok(None);
251 };
252 if query
254 .columns
255 .iter()
256 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257 {
258 return Ok(None);
259 }
260
261 if let Some(dot_pos) = tenant_col.find('.') {
267 let (root, tail) = tenant_col.split_at(dot_pos);
268 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
270 }
271
272 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273 return Err(RedDBError::Query(format!(
274 "INSERT into tenant-scoped table '{}' requires an active tenant — \
275 run SET TENANT '<id>' first or name column '{}' explicitly",
276 query.table, tenant_col
277 )));
278 };
279
280 let mut augmented = query.clone();
281 augmented.columns.push(tenant_col);
282 let lit = Value::text(tenant_id.clone());
283 for row in augmented.values.iter_mut() {
284 row.push(lit.clone());
285 }
286 for row in augmented.value_exprs.iter_mut() {
287 row.push(crate::storage::query::ast::Expr::Literal {
288 value: lit.clone(),
289 span: crate::storage::query::ast::Span::synthetic(),
290 });
291 }
292 Ok(Some(augmented))
293 }
294
295 fn inject_dotted_tenant(
305 &self,
306 query: &InsertQuery,
307 root: &str,
308 tail: &str,
309 ) -> RedDBResult<Option<InsertQuery>> {
310 let active_tenant = crate::runtime::impl_core::current_tenant();
311 let mut augmented = query.clone();
312 let root_idx = augmented
313 .columns
314 .iter()
315 .position(|c| c.eq_ignore_ascii_case(root));
316
317 if let Some(idx) = root_idx {
318 for row in augmented.values.iter_mut() {
324 let Some(slot) = row.get_mut(idx) else {
325 continue;
326 };
327 if dotted_tail_already_set(slot, tail) {
328 continue;
329 }
330 let Some(tenant_id) = &active_tenant else {
331 return Err(RedDBError::Query(format!(
332 "INSERT into tenant-scoped table '{}' requires an active tenant — \
333 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334 query.table, root, tail
335 )));
336 };
337 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338 }
339 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343 if let Some(slot) = row.get_mut(idx) {
344 let new_value = augmented
345 .values
346 .get(row_idx)
347 .and_then(|v| v.get(idx))
348 .cloned()
349 .unwrap_or(Value::Null);
350 *slot = crate::storage::query::ast::Expr::Literal {
351 value: new_value,
352 span: crate::storage::query::ast::Span::synthetic(),
353 };
354 }
355 }
356 } else {
357 let Some(tenant_id) = &active_tenant else {
361 return Err(RedDBError::Query(format!(
362 "INSERT into tenant-scoped table '{}' requires an active tenant — \
363 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364 query.table, root, tail
365 )));
366 };
367 augmented.columns.push(root.to_string());
369 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370 for row in augmented.values.iter_mut() {
371 row.push(fresh.clone());
372 }
373 for row in augmented.value_exprs.iter_mut() {
374 row.push(crate::storage::query::ast::Expr::Literal {
375 value: fresh.clone(),
376 span: crate::storage::query::ast::Span::synthetic(),
377 });
378 }
379 }
380
381 Ok(Some(augmented))
382 }
383
384 fn delete_entities_batch(
387 &self,
388 collection: &str,
389 ids: &[EntityId],
390 ) -> RedDBResult<(u64, Vec<u64>)> {
391 if ids.is_empty() {
392 return Ok((0, vec![]));
393 }
394
395 let store = self.db().store();
396 let Some(manager) = store.get_collection(collection) else {
397 return Ok((0, vec![]));
398 };
399
400 let active_xid = self.current_xid();
401 let conn_id = crate::runtime::impl_core::current_connection_id();
402 let mut autocommit_xid = None;
403 let mut tombstoned_ids = Vec::new();
404 let mut tombstoned_entities = Vec::new();
405 let mut physical_delete_ids = Vec::new();
406 let table_row_resolver =
407 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408 let versioned_collection = self.vcs_is_versioned(collection).unwrap_or(false);
423
424 for &id in ids {
425 let Some(mut entity) = manager.get(id) else {
426 continue;
427 };
428 let is_versioned_graph = versioned_collection
429 && matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_));
430 let is_versioned_vector =
431 versioned_collection && matches!(entity.data, EntityData::Vector(_));
432 if matches!(entity.data, EntityData::Row(_))
433 || is_versioned_graph
434 || is_versioned_vector
435 {
436 let previous_xmax = entity.xmax;
437 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
438 if table_row_resolver.resolve_candidate(&entity).is_none() {
439 continue;
440 }
441 } else if is_versioned_vector {
442 if table_row_resolver.resolve_read_candidate(&entity).is_none() {
449 continue;
450 }
451 } else if entity.xmax != 0 {
452 continue;
453 }
454
455 let xid = match active_xid {
456 Some(xid) => xid,
457 None => match autocommit_xid {
458 Some(xid) => xid,
459 None => {
460 let mgr = self.snapshot_manager();
461 let xid = mgr.begin();
462 autocommit_xid = Some(xid);
463 xid
464 }
465 },
466 };
467 entity.set_xmax(xid);
468 if manager.update(entity.clone()).is_ok() {
469 if active_xid.is_some() {
470 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
471 }
472 tombstoned_entities.push(entity);
473 tombstoned_ids.push(id);
474 }
475 } else {
476 physical_delete_ids.push(id);
477 }
478 }
479
480 if let Some(xid) = autocommit_xid {
481 self.snapshot_manager().commit(xid);
482 }
483
484 let mut affected = tombstoned_ids.len() as u64;
485 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
486 if active_xid.is_some() {
487 store
488 .persist_entities_to_pager(collection, &tombstoned_entities)
489 .map_err(|err| RedDBError::Internal(err.to_string()))?;
490 } else {
491 store
492 .persist_entities_to_pager(collection, &tombstoned_entities)
493 .map_err(|err| RedDBError::Internal(err.to_string()))?;
494 for id in &tombstoned_ids {
495 store.context_index().remove_entity(*id);
496 let lsn = self.cdc_emit(
497 crate::replication::cdc::ChangeOperation::Delete,
498 collection,
499 id.raw(),
500 "entity",
501 );
502 lsns.push(lsn);
503 }
504 }
505
506 let deleted_ids = store
507 .delete_batch(collection, &physical_delete_ids)
508 .map_err(|err| RedDBError::Internal(err.to_string()))?;
509 affected += deleted_ids.len() as u64;
510 for id in &deleted_ids {
511 store.context_index().remove_entity(*id);
512 let lsn = self.cdc_emit(
513 crate::replication::cdc::ChangeOperation::Delete,
514 collection,
515 id.raw(),
516 "entity",
517 );
518 lsns.push(lsn);
519 }
520
521 Ok((affected, lsns))
522 }
523
524 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
527 if applied.is_empty() {
528 return Ok(Vec::new());
529 }
530
531 let store = self.db().store();
532 if applied.iter().any(|item| item.context_index_dirty) {
533 store.context_index().index_entities(
534 &applied[0].collection,
535 applied
536 .iter()
537 .filter(|item| item.context_index_dirty)
538 .map(|item| &item.entity),
539 );
540 }
541
542 for item in applied {
543 self.refresh_update_secondary_indexes(item)?;
544 }
545
546 let mut lsns = Vec::with_capacity(applied.len());
547 for item in applied {
548 let lsn = self.cdc_emit_prebuilt(
549 crate::replication::cdc::ChangeOperation::Update,
550 &item.collection,
551 &item.entity,
552 update_cdc_item_kind(self, &item.collection, &item.entity),
553 item.metadata.as_ref(),
554 false,
555 );
556 lsns.push(lsn);
557 }
558 Ok(lsns)
559 }
560
561 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
562 self.persist_applied_entity_mutations(applied)
563 }
564
565 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
566 if applied.pre_mutation_fields.is_empty() {
567 return Ok(());
568 }
569 let post = entity_row_fields_snapshot(&applied.entity);
570 if post.is_empty() {
571 return Ok(());
572 }
573
574 let indexed_cols = self
575 .index_store_ref()
576 .indexed_columns_set(&applied.collection);
577 if indexed_cols.is_empty() {
578 return Ok(());
579 }
580
581 if let Some(old_version) = applied.replaced_entity.as_ref() {
582 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
583 .pre_mutation_fields
584 .iter()
585 .filter(|(col, _)| indexed_cols.contains(col))
586 .cloned()
587 .collect();
588 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
589 .iter()
590 .filter(|(col, _)| indexed_cols.contains(col))
591 .cloned()
592 .collect();
593 if !old_index_fields.is_empty() {
594 self.index_store_ref()
595 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
596 .map_err(crate::RedDBError::Internal)?;
597 }
598 if !new_index_fields.is_empty() {
599 self.index_store_ref()
600 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
601 .map_err(crate::RedDBError::Internal)?;
602 }
603 return Ok(());
604 }
605
606 let damage =
607 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
608 if damage
609 .touched_columns()
610 .into_iter()
611 .any(|col| indexed_cols.contains(col))
612 {
613 self.index_store_ref()
614 .index_entity_update(
615 &applied.collection,
616 applied.id,
617 &applied.pre_mutation_fields,
618 &post,
619 )
620 .map_err(crate::RedDBError::Internal)?;
621 }
622 Ok(())
623 }
624
625 pub fn execute_insert(
630 &self,
631 raw_query: &str,
632 query: &InsertQuery,
633 ) -> RedDBResult<RuntimeQueryResult> {
634 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
635 crate::runtime::collection_contract::CollectionContractGate::check(
641 self,
642 &query.table,
643 crate::runtime::collection_contract::MutationKind::Insert,
644 )?;
645 let augmented_owned;
654 let query = match self.maybe_inject_tenant_column(query)? {
655 Some(new_q) => {
656 augmented_owned = new_q;
657 &augmented_owned
658 }
659 None => query,
660 };
661 self.check_insert_column_policy(query)?;
662 if let Some(ref embed_config) = query.auto_embed {
663 let provider = crate::ai::parse_provider(&embed_config.provider)?;
664 crate::runtime::ai::provider_gate::enforce(self, &provider)?;
668 if matches!(provider, crate::ai::AiProvider::Local) {
669 crate::runtime::ai::local_embedding::ensure_local_embedding_available()?;
670 let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
678 if model_name.is_empty() {
679 return Err(RedDBError::Query(
680 "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
681 the local provider does not have an implicit default model"
682 .to_string(),
683 ));
684 }
685 crate::runtime::ai::local_embedding::preflight_local_embedding(
686 &self.inner.db,
687 model_name,
688 )?;
689 }
690 }
691
692 let mut inserted_count: u64 = 0;
693 let effective_rows =
694 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
695
696 let store = self.inner.db.store();
698 let _ = store.get_or_create_collection(&query.table);
699 let declared_model = self
700 .db()
701 .collection_contract_arc(&query.table)
702 .map(|contract| contract.declared_model);
703
704 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
705 if query.returning.is_some() {
706 Some(Vec::with_capacity(effective_rows.len()))
707 } else {
708 None
709 };
710 let mut returning_result: Option<UnifiedResult> = None;
711
712 if matches!(query.entity_type, InsertEntityType::Row)
713 && !matches!(
714 declared_model,
715 Some(crate::catalog::CollectionModel::TimeSeries)
716 )
717 {
718 let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
729 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
730 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
731 } else {
732 None
733 };
734 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
735
736 if chain_mode && self.is_chain_integrity_broken(&query.table) {
739 return Err(RedDBError::InvalidOperation(format!(
740 "ChainIntegrityBroken: collection '{}' is locked until \
741 POST /collections/{}/clear-integrity-flag is called by an admin",
742 query.table, query.table
743 )));
744 }
745
746 let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
750 if chain_mode {
751 let mut cache = self.inner.chain_tip_cache.lock();
752 if let Some(existing) = cache.get(&query.table) {
753 Some(existing.clone())
754 } else if let Some(scanned) =
755 crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
756 {
757 cache.insert(query.table.clone(), scanned.clone());
758 Some(scanned)
759 } else {
760 None
761 }
762 } else {
763 None
764 };
765
766 let mut rows = Vec::with_capacity(effective_rows.len());
767 for row_values in &effective_rows {
768 if row_values.len() != query.columns.len() {
769 return Err(RedDBError::Query(format!(
770 "INSERT column count ({}) does not match value count ({})",
771 query.columns.len(),
772 row_values.len()
773 )));
774 }
775 let (mut fields, mut metadata) =
776 split_insert_metadata(self, &query.columns, row_values)?;
777 if chain_mode {
778 use crate::runtime::blockchain_kind::{
779 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
780 COL_TIMESTAMP, RESERVED_COLUMNS,
781 };
782 let supplied_height = fields
783 .iter()
784 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
785 .map(|(_, v)| v.clone());
786 let supplied_prev = fields
787 .iter()
788 .find(|(k, _)| k == COL_PREV_HASH)
789 .map(|(_, v)| v.clone());
790 let supplied_ts = fields
791 .iter()
792 .find(|(k, _)| k == COL_TIMESTAMP)
793 .map(|(_, v)| v.clone());
794 let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
795 let user_supplied_any = supplied_height.is_some()
796 || supplied_prev.is_some()
797 || supplied_ts.is_some()
798 || supplied_hash;
799
800 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
801 let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
802
803 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
804 Some(t) => (t.hash, t.height + 1),
805 None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
806 };
807 let server_now = crate::runtime::blockchain_kind::now_ms();
808
809 let (use_prev, use_height, use_ts) = if user_supplied_any {
810 if supplied_hash {
813 return Err(chain_conflict_error(
814 tip_next_height.saturating_sub(1),
815 tip_prev_hash,
816 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
817 server_now,
818 "hash column is engine-computed and cannot be supplied",
819 ));
820 }
821 let caller_prev = match &supplied_prev {
822 Some(Value::Blob(b)) if b.len() == 32 => {
823 let mut a = [0u8; 32];
824 a.copy_from_slice(b);
825 a
826 }
827 Some(Value::Text(s)) if s.len() == 64 => {
828 let mut a = [0u8; 32];
832 let mut ok = true;
833 for (i, slot) in a.iter_mut().enumerate() {
834 let pair = &s.as_ref()[i * 2..i * 2 + 2];
835 match u8::from_str_radix(pair, 16) {
836 Ok(byte) => *slot = byte,
837 Err(_) => {
838 ok = false;
839 break;
840 }
841 }
842 }
843 if !ok {
844 return Err(chain_conflict_error(
845 tip_next_height.saturating_sub(1),
846 tip_prev_hash,
847 chain_tip_full
848 .as_ref()
849 .map(|t| t.timestamp_ms)
850 .unwrap_or(0),
851 server_now,
852 "prev_hash is not valid hex",
853 ));
854 }
855 a
856 }
857 _ => {
858 return Err(chain_conflict_error(
859 tip_next_height.saturating_sub(1),
860 tip_prev_hash,
861 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
862 server_now,
863 "prev_hash missing or not a 32-byte Blob",
864 ));
865 }
866 };
867 if caller_prev != tip_prev_hash {
868 return Err(chain_conflict_error(
869 tip_next_height.saturating_sub(1),
870 tip_prev_hash,
871 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
872 server_now,
873 "prev_hash does not match current tip",
874 ));
875 }
876 let caller_height = match &supplied_height {
877 Some(Value::UnsignedInteger(v)) => *v,
878 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
879 _ => {
880 return Err(chain_conflict_error(
881 tip_next_height.saturating_sub(1),
882 tip_prev_hash,
883 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
884 server_now,
885 "block_height missing or not an unsigned integer",
886 ));
887 }
888 };
889 if caller_height != tip_next_height {
890 return Err(chain_conflict_error(
891 tip_next_height.saturating_sub(1),
892 tip_prev_hash,
893 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
894 server_now,
895 "block_height does not match tip+1",
896 ));
897 }
898 let caller_ts = match &supplied_ts {
899 Some(Value::UnsignedInteger(v)) => *v,
900 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
901 _ => {
902 return Err(chain_conflict_error(
903 tip_next_height.saturating_sub(1),
904 tip_prev_hash,
905 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
906 server_now,
907 "timestamp missing or not an unsigned integer",
908 ));
909 }
910 };
911 let drift = (caller_ts as i128) - (server_now as i128);
912 if drift.abs() > 60_000 {
913 return Err(chain_conflict_error(
914 tip_next_height.saturating_sub(1),
915 tip_prev_hash,
916 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
917 server_now,
918 "timestamp outside ±60s of server_time",
919 ));
920 }
921 (caller_prev, caller_height, caller_ts)
922 } else {
923 (tip_prev_hash, tip_next_height, server_now)
924 };
925
926 let (reserved, new_hash) =
927 crate::runtime::blockchain_kind::make_block_reserved_fields(
928 use_prev, use_height, use_ts, &payload,
929 );
930 fields.extend(reserved);
931 chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
932 height: use_height,
933 hash: new_hash,
934 timestamp_ms: use_ts,
935 });
936 }
937 if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
949 let (pk_col, sig_col, residual) =
950 crate::runtime::signed_writes_kind::split_signature_fields(fields);
951 let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
952 let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
953 crate::runtime::signed_writes_kind::verify_row(
954 ®,
955 pk_col.as_ref().map(|c| c.bytes.as_slice()),
956 sig_col.as_ref().map(|c| c.bytes.as_slice()),
957 &payload,
958 )
959 .map_err(crate::runtime::signed_writes_kind::map_error)?;
960 fields = residual;
961 if let Some(col) = pk_col {
966 fields.push((
967 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
968 col.raw_value,
969 ));
970 }
971 if let Some(col) = sig_col {
972 fields.push((
973 crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
974 col.raw_value,
975 ));
976 }
977 }
978 merge_with_clauses(
979 &mut metadata,
980 query.ttl_ms,
981 query.expires_at_ms,
982 &query.with_metadata,
983 );
984 if let Some(snaps) = returning_snapshots.as_mut() {
985 snaps.push(fields.clone());
986 }
987 rows.push(CreateRowInput {
988 collection: query.table.clone(),
989 fields,
990 metadata,
991 node_links: Vec::new(),
992 vector_links: Vec::new(),
993 });
994 }
995 let outputs = self.create_rows_batch(CreateRowsBatchInput {
996 collection: query.table.clone(),
997 rows,
998 suppress_events: query.suppress_events,
999 })?;
1000 inserted_count = outputs.len() as u64;
1001
1002 if chain_mode {
1006 if let Some(new_tip) = chain_tip_full.as_ref() {
1007 self.inner
1008 .chain_tip_cache
1009 .lock()
1010 .insert(query.table.clone(), new_tip.clone());
1011 }
1012 }
1013
1014 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
1022 let time_col = &spec.time_column;
1023 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
1025 for row in &effective_rows {
1026 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
1027 if *n >= 0 {
1028 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
1029 }
1030 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
1031 let _ = self.inner.db.hypertables().route(&query.table, *n);
1032 }
1033 }
1034 }
1035 }
1036
1037 if let (Some(items), Some(snaps)) =
1038 (query.returning.as_ref(), returning_snapshots.take())
1039 {
1040 let snaps = row_insert_returning_snapshots(&outputs, snaps);
1041 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1042 }
1043 } else {
1044 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1051 Vec::with_capacity(effective_rows.len());
1052 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1053 {
1054 Vec::with_capacity(effective_rows.len())
1055 } else {
1056 Vec::new()
1057 };
1058 if matches!(
1059 query.entity_type,
1060 InsertEntityType::Node | InsertEntityType::Edge
1061 ) {
1062 enum PreparedGraphInsert {
1063 Node {
1064 fields: Vec<(String, Value)>,
1065 input: CreateNodeInput,
1066 },
1067 Edge {
1068 fields: Vec<(String, Value)>,
1069 input: CreateEdgeInput,
1070 },
1071 }
1072
1073 let mut prepared = Vec::with_capacity(effective_rows.len());
1074 for row_values in &effective_rows {
1075 if row_values.len() != query.columns.len() {
1076 return Err(RedDBError::Query(format!(
1077 "INSERT column count ({}) does not match value count ({})",
1078 query.columns.len(),
1079 row_values.len()
1080 )));
1081 }
1082
1083 match query.entity_type {
1084 InsertEntityType::Node => {
1085 let (node_values, mut metadata) =
1086 split_insert_metadata(self, &query.columns, row_values)?;
1087 merge_with_clauses(
1088 &mut metadata,
1089 query.ttl_ms,
1090 query.expires_at_ms,
1091 &query.with_metadata,
1092 );
1093 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1094 apply_collection_default_ttl_metadata(
1095 self,
1096 &query.table,
1097 &mut metadata,
1098 );
1099 let (columns, values) = pairwise_columns_values(&node_values);
1100 let label = find_column_value_string(&columns, &values, "label")?;
1101 let node_type =
1102 find_column_value_opt_string(&columns, &values, "node_type");
1103 let properties = extract_remaining_properties(
1104 &columns,
1105 &values,
1106 &["label", "node_type"],
1107 );
1108 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1109 properties.iter().map(|(key, _)| key.as_str()),
1110 &format!("node '{}'", query.table),
1111 )?;
1112 prepared.push(PreparedGraphInsert::Node {
1113 fields: node_values,
1114 input: CreateNodeInput {
1115 collection: query.table.clone(),
1116 label,
1117 node_type,
1118 properties,
1119 metadata,
1120 embeddings: Vec::new(),
1121 table_links: Vec::new(),
1122 node_links: Vec::new(),
1123 },
1124 });
1125 }
1126 InsertEntityType::Edge => {
1127 let (edge_values, mut metadata) =
1128 split_insert_metadata(self, &query.columns, row_values)?;
1129 merge_with_clauses(
1130 &mut metadata,
1131 query.ttl_ms,
1132 query.expires_at_ms,
1133 &query.with_metadata,
1134 );
1135 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1136 apply_collection_default_ttl_metadata(
1137 self,
1138 &query.table,
1139 &mut metadata,
1140 );
1141 let (columns, values) = pairwise_columns_values(&edge_values);
1142 let label = find_column_value_string(&columns, &values, "label")?;
1143 ensure_non_tree_structural_edge_label(&label)?;
1144 let from_id = resolve_edge_endpoint_any(
1145 self.inner.db.store().as_ref(),
1146 &query.table,
1147 &columns,
1148 &values,
1149 &["from_rid", "from"],
1150 )?;
1151 let to_id = resolve_edge_endpoint_any(
1152 self.inner.db.store().as_ref(),
1153 &query.table,
1154 &columns,
1155 &values,
1156 &["to_rid", "to"],
1157 )?;
1158 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1159 let properties = extract_remaining_properties(
1160 &columns,
1161 &values,
1162 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1163 );
1164 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1165 properties.iter().map(|(key, _)| key.as_str()),
1166 &format!("edge '{}'", query.table),
1167 )?;
1168 prepared.push(PreparedGraphInsert::Edge {
1169 fields: edge_values,
1170 input: CreateEdgeInput {
1171 collection: query.table.clone(),
1172 label,
1173 from: EntityId::new(from_id),
1174 to: EntityId::new(to_id),
1175 weight,
1176 properties,
1177 metadata,
1178 },
1179 });
1180 }
1181 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1182 }
1183 }
1184
1185 ensure_graph_insert_contract(self, &query.table)?;
1186 let mut batch = self.inner.db.batch();
1187 for item in prepared {
1188 match item {
1189 PreparedGraphInsert::Node { fields, input } => {
1190 if query.returning.is_some() {
1191 returning_field_snaps.push(fields);
1192 }
1193 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1194 batch = batch.add_node_with_type(
1195 input.collection,
1196 input.label,
1197 node_type,
1198 input.properties.into_iter().collect(),
1199 input.metadata.into_iter().collect(),
1200 );
1201 }
1202 PreparedGraphInsert::Edge { fields, input } => {
1203 if query.returning.is_some() {
1204 returning_field_snaps.push(fields);
1205 }
1206 batch = batch.add_edge(
1207 input.collection,
1208 input.label,
1209 input.from,
1210 input.to,
1211 input.weight.unwrap_or(1.0),
1212 input.properties.into_iter().collect(),
1213 input.metadata.into_iter().collect(),
1214 );
1215 }
1216 }
1217 }
1218 let batch_result = batch
1219 .execute()
1220 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1221 let (ids, entity_kind) = match query.entity_type {
1222 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1223 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1224 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1225 };
1226 for id in &ids {
1227 self.stamp_xmin_if_in_txn(&query.table, *id);
1228 }
1229 if query.returning.is_some() {
1230 returning_field_snaps = graph_insert_returning_snapshots(
1231 self.inner.db.store().as_ref(),
1232 &query.table,
1233 &ids,
1234 );
1235 }
1236 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1237 let store = self.inner.db.store();
1238 entity_outputs.extend(ids.iter().map(|id| {
1239 crate::application::entity::CreateEntityOutput {
1240 id: *id,
1241 entity: store.get(&query.table, *id),
1242 }
1243 }));
1244 inserted_count = ids.len() as u64;
1245 } else {
1246 for row_values in &effective_rows {
1247 if row_values.len() != query.columns.len() {
1248 return Err(RedDBError::Query(format!(
1249 "INSERT column count ({}) does not match value count ({})",
1250 query.columns.len(),
1251 row_values.len()
1252 )));
1253 }
1254
1255 match query.entity_type {
1256 InsertEntityType::Row => {
1257 if query.returning.is_some() {
1258 return Err(RedDBError::Query(
1259 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1260 .to_string(),
1261 ));
1262 }
1263 let (fields, mut metadata) =
1264 split_insert_metadata(self, &query.columns, row_values)?;
1265 merge_with_clauses(
1266 &mut metadata,
1267 query.ttl_ms,
1268 query.expires_at_ms,
1269 &query.with_metadata,
1270 );
1271 self.insert_timeseries_point(&query.table, fields, metadata)?;
1272 }
1273 InsertEntityType::Node | InsertEntityType::Edge => {
1274 unreachable!("NODE and EDGE are handled by the prepared graph path")
1275 }
1276 InsertEntityType::Vector => {
1277 let (vector_values, mut metadata) =
1278 split_insert_metadata(self, &query.columns, row_values)?;
1279 merge_with_clauses(
1280 &mut metadata,
1281 query.ttl_ms,
1282 query.expires_at_ms,
1283 &query.with_metadata,
1284 );
1285 let (columns, values) = pairwise_columns_values(&vector_values);
1286 let dense = find_column_value_vec_f32_any(
1287 &columns,
1288 &values,
1289 &["dense", "embedding"],
1290 )?;
1291 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1292 let content =
1293 find_column_value_opt_string(&columns, &values, "content");
1294 if query.returning.is_some() {
1295 returning_field_snaps.push(vector_values.clone());
1296 }
1297 let input = CreateVectorInput {
1298 collection: query.table.clone(),
1299 dense,
1300 content,
1301 metadata,
1302 link_row: None,
1303 link_node: None,
1304 };
1305 entity_outputs.push(self.create_vector(input)?);
1306 }
1307 InsertEntityType::Document => {
1308 let (document_values, mut metadata) =
1309 split_insert_metadata(self, &query.columns, row_values)?;
1310 merge_with_clauses(
1311 &mut metadata,
1312 query.ttl_ms,
1313 query.expires_at_ms,
1314 &query.with_metadata,
1315 );
1316 let (columns, values) = pairwise_columns_values(&document_values);
1317 let body = find_document_body_json(&columns, &values)?;
1318 let input = CreateDocumentInput {
1319 collection: query.table.clone(),
1320 body,
1321 metadata,
1322 node_links: Vec::new(),
1323 vector_links: Vec::new(),
1324 };
1325 let output = self.create_document(input)?;
1326 if query.returning.is_some() {
1327 let fields = output
1328 .entity
1329 .as_ref()
1330 .map(entity_row_fields_snapshot)
1331 .filter(|fields| !fields.is_empty())
1332 .unwrap_or(document_values);
1333 returning_field_snaps.push(fields);
1334 }
1335 entity_outputs.push(output);
1336 }
1337 InsertEntityType::Kv => {
1338 let (kv_values, mut metadata) =
1339 split_insert_metadata(self, &query.columns, row_values)?;
1340 merge_with_clauses(
1341 &mut metadata,
1342 query.ttl_ms,
1343 query.expires_at_ms,
1344 &query.with_metadata,
1345 );
1346 let (columns, values) = pairwise_columns_values(&kv_values);
1347 let key = find_column_value_string(&columns, &values, "key")?;
1348 let value = find_column_value(&columns, &values, "value")?;
1349 if query.returning.is_some() {
1350 returning_field_snaps.push(kv_values.clone());
1351 }
1352 let input = CreateKvInput {
1353 collection: query.table.clone(),
1354 key,
1355 value,
1356 metadata,
1357 };
1358 entity_outputs.push(self.create_kv(input)?);
1359 }
1360 }
1361
1362 inserted_count += 1;
1363 }
1364 }
1365
1366 if let Some(items) = query.returning.as_ref() {
1367 if !entity_outputs.is_empty() {
1368 returning_result = Some(build_returning_result(
1369 items,
1370 &returning_field_snaps,
1371 Some(&entity_outputs),
1372 ));
1373 }
1374 }
1375 }
1376
1377 if let Some(ref embed_config) = query.auto_embed {
1379 let store = self.inner.db.store();
1380 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1381 let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1382 let api_key = if is_local_provider {
1387 String::new()
1388 } else {
1389 crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1390 };
1391 let model = embed_config.model.clone().unwrap_or_else(|| {
1392 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1393 .ok()
1394 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1395 });
1396
1397 let manager = store
1399 .get_collection(&query.table)
1400 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1401 let entities = manager.query_all(|_| true);
1402 let recent: Vec<_> = entities
1403 .into_iter()
1404 .rev()
1405 .take(effective_rows.len())
1406 .collect();
1407
1408 let entity_combos: Vec<(usize, String)> = recent
1410 .iter()
1411 .enumerate()
1412 .filter_map(|(i, entity)| {
1413 if let EntityData::Row(ref row) = entity.data {
1414 if let Some(ref named) = row.named {
1415 let texts: Vec<String> = embed_config
1416 .fields
1417 .iter()
1418 .filter_map(|field| match named.get(field) {
1419 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1420 _ => None,
1421 })
1422 .collect();
1423 if !texts.is_empty() {
1424 return Some((i, texts.join(" ")));
1425 }
1426 }
1427 }
1428 None
1429 })
1430 .collect();
1431
1432 if !entity_combos.is_empty() {
1433 let batch_texts: Vec<String> =
1435 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1436
1437 let embeddings = if is_local_provider {
1447 let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1448 &self.inner.db,
1449 &model,
1450 batch_texts,
1451 )?;
1452 response.embeddings
1453 } else {
1454 let batch_client =
1455 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1456
1457 match tokio::runtime::Handle::try_current() {
1458 Ok(handle) => tokio::task::block_in_place(|| {
1459 handle.block_on(batch_client.embed_batch(
1460 &provider,
1461 &model,
1462 &api_key,
1463 batch_texts,
1464 ))
1465 }),
1466 Err(_) => {
1467 return Err(RedDBError::Query(
1468 "AUTO EMBED requires a Tokio runtime context".to_string(),
1469 ));
1470 }
1471 }
1472 .map_err(|e| RedDBError::Query(e.to_string()))?
1473 };
1474
1475 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1477 if dense.is_empty() {
1478 continue;
1479 }
1480 self.create_vector(CreateVectorInput {
1481 collection: query.table.clone(),
1482 dense,
1483 content: Some(combined.clone()),
1484 metadata: Vec::new(),
1485 link_row: None,
1486 link_node: None,
1487 })?;
1488 }
1489 }
1490 }
1491
1492 if inserted_count > 0 {
1493 self.note_table_write(&query.table);
1494 }
1495
1496 let mut result = RuntimeQueryResult::dml_result(
1497 raw_query.to_string(),
1498 inserted_count,
1499 "insert",
1500 "runtime-dml",
1501 );
1502 if let Some(returning) = returning_result {
1503 result.result = returning;
1504 }
1505 Ok(result)
1506 }
1507
1508 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1509 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1510 return Ok(());
1511 };
1512 if !auth_store.iam_authorization_enabled() {
1513 return Ok(());
1514 }
1515 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1516 return Ok(());
1517 };
1518
1519 let tenant = crate::runtime::impl_core::current_tenant();
1520 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1521 let request = crate::auth::ColumnAccessRequest {
1522 action: "insert".to_string(),
1523 schema: None,
1524 table: query.table.clone(),
1525 columns: query.columns.clone(),
1526 };
1527 let ctx = crate::auth::policies::EvalContext {
1528 principal_tenant: tenant.clone(),
1529 current_tenant: tenant,
1530 peer_ip: None,
1531 mfa_present: false,
1532 now_ms: crate::auth::now_ms(),
1533 principal_is_admin_role: role == crate::auth::Role::Admin,
1534 principal_is_platform_scoped: principal.tenant.is_none(),
1535 };
1536
1537 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1538 let table_allowed = matches!(
1539 outcome.table_decision,
1540 crate::auth::policies::Decision::Allow { .. }
1541 | crate::auth::policies::Decision::AdminBypass
1542 );
1543 if !table_allowed {
1544 return Err(RedDBError::Query(format!(
1545 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1546 outcome.table_resource.kind, outcome.table_resource.name
1547 )));
1548 }
1549 if let Some(denied) = outcome.first_denied_column() {
1550 return Err(RedDBError::Query(format!(
1551 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1552 denied.resource.kind, denied.resource.name
1553 )));
1554 }
1555
1556 Ok(())
1557 }
1558
1559 pub(crate) fn insert_timeseries_point(
1560 &self,
1561 collection: &str,
1562 fields: Vec<(String, Value)>,
1563 mut metadata: Vec<(String, MetadataValue)>,
1564 ) -> RedDBResult<EntityId> {
1565 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1566
1567 let (columns, values) = pairwise_columns_values(&fields);
1568 validate_timeseries_insert_columns(&columns)?;
1569
1570 let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1579 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1580 if let Some(event_name) = event_name_opt.as_deref() {
1581 let store_for_schema = self.inner.db.store();
1582 if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1583 .is_some()
1584 {
1585 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1586 super::analytics_schema_registry::validate(
1587 store_for_schema.as_ref(),
1588 event_name,
1589 payload_json,
1590 )
1591 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1592 }
1593 }
1594
1595 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1600 Some(m) => m,
1601 None => event_name_opt.clone().ok_or_else(|| {
1602 RedDBError::Query(
1603 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1604 )
1605 })?,
1606 };
1607 let value = match find_column_value_opt_string(&columns, &values, "value") {
1611 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1612 None => columns
1613 .iter()
1614 .position(|c| c.eq_ignore_ascii_case("value"))
1615 .and_then(|i| match &values[i] {
1616 Value::Float(f) => Some(*f),
1617 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1618 Value::UnsignedInteger(n) => Some(*n as f64),
1619 _ => None,
1620 })
1621 .unwrap_or(1.0),
1622 };
1623 let timestamp_ns =
1624 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1625 let mut tags = find_timeseries_tags(&columns, &values)?;
1626 if let Some(ref name) = event_name_opt {
1627 tags.entry("event_name".to_string())
1628 .or_insert_with(|| name.clone());
1629 }
1630 if let Some(ref payload) = payload_opt {
1631 tags.entry("payload".to_string())
1632 .or_insert_with(|| payload.clone());
1633 }
1634
1635 let mut entity = UnifiedEntity::new(
1636 EntityId::new(0),
1637 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1638 series: collection.to_string(),
1639 metric: metric.clone(),
1640 })),
1641 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1642 metric,
1643 timestamp_ns,
1644 value,
1645 tags,
1646 }),
1647 );
1648 let writer_xid = match self.current_xid() {
1652 Some(xid) => xid,
1653 None => {
1654 let mgr = self.snapshot_manager();
1655 let xid = mgr.begin();
1656 mgr.commit(xid);
1657 xid
1658 }
1659 };
1660 entity.set_xmin(writer_xid);
1661
1662 let store = self.inner.db.store();
1663 let id = store
1664 .insert_auto(collection, entity)
1665 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1666
1667 if !metadata.is_empty() {
1668 let _ = store.set_metadata(
1669 collection,
1670 id,
1671 Metadata::with_fields(metadata.into_iter().collect()),
1672 );
1673 }
1674
1675 self.cdc_emit(
1676 crate::replication::cdc::ChangeOperation::Insert,
1677 collection,
1678 id.raw(),
1679 "timeseries",
1680 );
1681
1682 Ok(id)
1683 }
1684
1685 pub fn execute_update(
1690 &self,
1691 raw_query: &str,
1692 query: &UpdateQuery,
1693 ) -> RedDBResult<RuntimeQueryResult> {
1694 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1695 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1699 return Err(RedDBError::InvalidOperation(format!(
1700 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1701 query.table
1702 )));
1703 }
1704 crate::runtime::collection_contract::CollectionContractGate::check(
1710 self,
1711 &query.table,
1712 crate::runtime::collection_contract::MutationKind::Update,
1713 )?;
1714 ensure_update_target_contract(self, &query.table, query.target)?;
1715 ensure_graph_identity_update_target_allowed(query)?;
1716
1717 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1723 let augmented_query: UpdateQuery;
1724 let effective_query: &UpdateQuery = if rls_gated {
1725 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1726 self,
1727 &query.table,
1728 crate::storage::query::ast::PolicyAction::Update,
1729 );
1730 let Some(policy) = rls_filter else {
1731 let mut response = RuntimeQueryResult::dml_result(
1734 raw_query.to_string(),
1735 0,
1736 "update",
1737 "runtime-dml-rls",
1738 );
1739 if let Some(items) = query.returning.clone() {
1740 response.result = build_returning_result(&items, &[], None);
1741 }
1742 return Ok(response);
1743 };
1744 let mut augmented = query.clone();
1745 augmented.filter = Some(match augmented.filter.take() {
1746 Some(existing) => {
1747 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1748 }
1749 None => policy,
1750 });
1751 augmented_query = augmented;
1752 &augmented_query
1753 } else {
1754 query
1755 };
1756
1757 if let Some(items) = effective_query.returning.clone() {
1762 let mut inner_query = effective_query.clone();
1763 inner_query.returning = None;
1764 let (mut response, touched_ids) =
1765 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1766
1767 let snapshots = if matches!(
1768 effective_query.target,
1769 UpdateTarget::Nodes | UpdateTarget::Edges
1770 ) {
1771 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1772 } else {
1773 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1774 .row_snapshots(&touched_ids)
1775 };
1776
1777 response.result = build_returning_result(&items, &snapshots, None);
1778 response.engine = "runtime-dml-returning";
1779 return Ok(response);
1780 }
1781
1782 self.execute_update_inner(raw_query, effective_query)
1783 }
1784
1785 fn execute_update_inner(
1787 &self,
1788 raw_query: &str,
1789 query: &UpdateQuery,
1790 ) -> RedDBResult<RuntimeQueryResult> {
1791 self.execute_update_inner_tracked(raw_query, query)
1792 .map(|(res, _)| res)
1793 }
1794
1795 fn execute_update_inner_tracked(
1796 &self,
1797 raw_query: &str,
1798 query: &UpdateQuery,
1799 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1800 let store = self.inner.db.store();
1801 let effective_filter = effective_update_filter(query);
1802 let compiled_plan = self.compile_update_plan(query)?;
1803 let needs_rmw_lock = update_needs_rmw_lock(query);
1804 let table_rmw_lock = if needs_rmw_lock {
1805 Some(
1806 self.inner
1807 .rmw_locks
1808 .lock_for(&query.table, "__table_rmw_update__"),
1809 )
1810 } else {
1811 None
1812 };
1813 let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1814 let mut touched_ids: Vec<EntityId> = Vec::new();
1815 let limit_cap = query.limit.map(|l| l as usize);
1816 let manager = store
1817 .get_collection(&query.table)
1818 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1819 let scan_limit = if query.order_by.is_empty() {
1820 limit_cap
1821 } else {
1822 None
1823 };
1824 let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1825 self,
1826 &query.table,
1827 effective_filter.as_ref(),
1828 scan_limit,
1829 query.target,
1830 );
1831 if needs_rmw_lock {
1832 target_scan = target_scan.with_live_table_rows();
1833 }
1834 let ids_to_update = target_scan.find_target_ids()?;
1835 let ids_to_update = if query.order_by.is_empty() {
1836 ids_to_update
1837 } else {
1838 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1839 };
1840
1841 if needs_rmw_lock {
1842 return self.execute_update_inner_tracked_locked(
1843 raw_query,
1844 query,
1845 &compiled_plan,
1846 &ids_to_update,
1847 effective_filter.as_ref(),
1848 );
1849 }
1850
1851 let mut affected: u64 = 0;
1852 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1853 let mut applied_chunk = Vec::with_capacity(chunk.len());
1854 for entity in manager.get_many(chunk).into_iter().flatten() {
1855 let assignments =
1856 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1857 let applied = self.apply_materialized_update_for_entity(
1858 query.table.clone(),
1859 entity,
1860 &compiled_plan,
1861 assignments,
1862 )?;
1863 touched_ids.push(applied.id);
1864 applied_chunk.push(applied);
1865 }
1866 self.persist_update_chunk(&applied_chunk)?;
1867 affected += applied_chunk.len() as u64;
1868 let lsns = self.flush_update_chunk(&applied_chunk)?;
1869 if !query.suppress_events {
1870 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1871 }
1872 }
1873
1874 if affected > 0 {
1875 self.note_table_write(&query.table);
1876 }
1877
1878 Ok((
1879 RuntimeQueryResult::dml_result(
1880 raw_query.to_string(),
1881 affected,
1882 "update",
1883 "runtime-dml",
1884 ),
1885 touched_ids,
1886 ))
1887 }
1888
1889 fn execute_update_inner_tracked_locked(
1890 &self,
1891 raw_query: &str,
1892 query: &UpdateQuery,
1893 compiled_plan: &CompiledUpdatePlan,
1894 ids_to_update: &[EntityId],
1895 effective_filter: Option<&Filter>,
1896 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1897 let store = self.inner.db.store();
1898 let mut touched_ids = Vec::new();
1899 let mut lock_entries = Vec::new();
1900
1901 for id in ids_to_update {
1902 let Some(candidate) = store.get(&query.table, *id) else {
1903 continue;
1904 };
1905 let logical_id = candidate.logical_id();
1906 let lock_key = format!("row:{}", logical_id.raw());
1907 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1908 lock_entries.push((lock_key, logical_id, rmw_lock));
1909 }
1910
1911 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1912 lock_entries.dedup_by(|left, right| left.0 == right.0);
1913 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1914
1915 let mut applied_chunk = Vec::new();
1916 for (_, logical_id, _) in &lock_entries {
1917 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1918 else {
1919 continue;
1920 };
1921 if let Some(filter) = effective_filter {
1922 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1923 Some(self.inner.db.as_ref()),
1924 &entity,
1925 filter,
1926 &query.table,
1927 &query.table,
1928 ) {
1929 continue;
1930 }
1931 }
1932
1933 let assignments =
1934 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1935 let applied = self.apply_materialized_update_for_entity(
1936 query.table.clone(),
1937 entity,
1938 compiled_plan,
1939 assignments,
1940 )?;
1941 touched_ids.push(applied.id);
1942 applied_chunk.push(applied);
1943 }
1944
1945 let affected = applied_chunk.len() as u64;
1946 if !applied_chunk.is_empty() {
1947 self.persist_update_chunk(&applied_chunk)?;
1948 let lsns = self.flush_update_chunk(&applied_chunk)?;
1949 if !query.suppress_events {
1950 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1951 }
1952 }
1953
1954 if affected > 0 {
1955 self.note_table_write(&query.table);
1956 }
1957
1958 Ok((
1959 RuntimeQueryResult::dml_result(
1960 raw_query.to_string(),
1961 affected,
1962 "update",
1963 "runtime-dml",
1964 ),
1965 touched_ids,
1966 ))
1967 }
1968
1969 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1970 let mut static_field_assignments = Vec::new();
1971 let mut static_metadata_assignments = Vec::new();
1972 let mut dynamic_assignments = Vec::new();
1973 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1974 let mut row_modified_columns = Vec::new();
1975
1976 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1977 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1978 let metadata_key = resolve_sql_ttl_metadata_key(column);
1979 if compound_op.is_some() && metadata_key.is_some() {
1980 return Err(RedDBError::Query(format!(
1981 "compound assignment is only supported for row fields: {column}"
1982 )));
1983 }
1984 if compound_op.is_none() {
1985 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1986 if let Some(metadata_key) = metadata_key {
1987 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1988 let (canonical_key, canonical_value) =
1989 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1990 static_metadata_assignments
1991 .push((canonical_key.to_string(), canonical_value));
1992 } else {
1993 let value = self.resolve_crypto_sentinel(value)?;
1994 static_field_assignments.push((
1995 column.clone(),
1996 normalize_row_update_assignment_with_plan(
1997 &query.table,
1998 column,
1999 value,
2000 row_contract_plan.as_ref(),
2001 )?,
2002 ));
2003 row_modified_columns.push(column.clone());
2004 }
2005 continue;
2006 }
2007 }
2008
2009 dynamic_assignments.push(CompiledUpdateAssignment {
2010 column: column.clone(),
2011 expr: expr.clone(),
2012 compound_op,
2013 metadata_key,
2014 row_rule: if metadata_key.is_none() {
2015 if let Some(plan) = row_contract_plan.as_ref() {
2016 if plan.timestamps_enabled
2017 && (column == "created_at" || column == "updated_at")
2018 {
2019 return Err(RedDBError::Query(format!(
2020 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
2021 query.table, column
2022 )));
2023 }
2024 if let Some(rule) = plan.declared_rules.get(column) {
2025 Some(rule.clone())
2026 } else if plan.strict_schema {
2027 return Err(RedDBError::Query(format!(
2028 "collection '{}' is strict and does not allow undeclared fields: {}",
2029 query.table, column
2030 )));
2031 } else {
2032 None
2033 }
2034 } else {
2035 None
2036 }
2037 } else {
2038 None
2039 },
2040 });
2041 if metadata_key.is_none() {
2042 row_modified_columns.push(column.clone());
2043 }
2044 }
2045
2046 let row_modified_columns = dedupe_update_columns(row_modified_columns);
2047 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2048 row_modified_columns.iter().any(|column| {
2049 plan.unique_columns
2050 .keys()
2051 .any(|unique| unique.eq_ignore_ascii_case(column))
2052 })
2053 });
2054
2055 if let Some(ttl_ms) = query.ttl_ms {
2056 static_metadata_assignments
2057 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2058 }
2059 if let Some(expires_at_ms) = query.expires_at_ms {
2060 static_metadata_assignments.push((
2061 "_expires_at".to_string(),
2062 metadata_u64_to_value(expires_at_ms),
2063 ));
2064 }
2065 for (key, val) in &query.with_metadata {
2066 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2067 }
2068
2069 Ok(CompiledUpdatePlan {
2070 static_field_assignments,
2071 static_metadata_assignments,
2072 dynamic_assignments,
2073 row_contract_plan,
2074 row_modified_columns,
2075 row_touches_unique_columns,
2076 })
2077 }
2078
2079 fn materialize_update_assignments_for_entity(
2080 &self,
2081 query: &UpdateQuery,
2082 entity: &UnifiedEntity,
2083 compiled_plan: &CompiledUpdatePlan,
2084 ) -> RedDBResult<MaterializedUpdateAssignments> {
2085 let mut assignments = MaterializedUpdateAssignments::default();
2086 let mut record: Option<UnifiedRecord> = None;
2087
2088 for assignment in &compiled_plan.dynamic_assignments {
2089 if assignment.compound_op.is_some()
2090 && !matches!(
2091 entity.data,
2092 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2093 )
2094 {
2095 return Err(RedDBError::Query(format!(
2096 "compound assignment is only supported for row or graph UPDATE column '{}'",
2097 assignment.column
2098 )));
2099 }
2100 if record.is_none() {
2101 record = runtime_any_record_from_entity_ref(entity);
2102 }
2103 let Some(record) = record.as_ref() else {
2104 return Err(RedDBError::Query(format!(
2105 "UPDATE could not materialize runtime record for entity {} in '{}'",
2106 entity.id.raw(),
2107 query.table
2108 )));
2109 };
2110 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2111 Some(self.inner.db.as_ref()),
2112 &assignment.expr,
2113 record,
2114 Some(query.table.as_str()),
2115 Some(query.table.as_str()),
2116 )
2117 .ok_or_else(|| {
2118 RedDBError::Query(format!(
2119 "failed to evaluate UPDATE expression for column '{}'",
2120 assignment.column
2121 ))
2122 })?;
2123 let value = if let Some(op) = assignment.compound_op {
2124 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2125 } else {
2126 rhs
2127 };
2128
2129 if let Some(metadata_key) = assignment.metadata_key {
2130 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2131 let (canonical_key, canonical_value) =
2132 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2133 assignments
2134 .dynamic_metadata_assignments
2135 .push((canonical_key.to_string(), canonical_value));
2136 } else {
2137 assignments.dynamic_field_assignments.push((
2138 assignment.column.clone(),
2139 normalize_row_update_value_for_rule(
2140 &query.table,
2141 self.resolve_crypto_sentinel(value)?,
2142 assignment.row_rule.as_ref(),
2143 )?,
2144 ));
2145 }
2146 }
2147
2148 Ok(assignments)
2149 }
2150
2151 fn apply_materialized_update_for_entity(
2152 &self,
2153 collection: String,
2154 entity: UnifiedEntity,
2155 compiled_plan: &CompiledUpdatePlan,
2156 assignments: MaterializedUpdateAssignments,
2157 ) -> RedDBResult<AppliedEntityMutation> {
2158 if matches!(entity.data, EntityData::Row(_)) {
2159 return self.apply_loaded_sql_update_row_core(
2160 collection,
2161 entity,
2162 &compiled_plan.static_field_assignments,
2163 assignments.dynamic_field_assignments,
2164 &compiled_plan.static_metadata_assignments,
2165 assignments.dynamic_metadata_assignments,
2166 compiled_plan.row_contract_plan.as_ref(),
2167 &compiled_plan.row_modified_columns,
2168 compiled_plan.row_touches_unique_columns,
2169 );
2170 }
2171
2172 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2173
2174 let operations = build_patch_operations_from_materialized_assignments(
2175 &entity,
2176 compiled_plan,
2177 assignments,
2178 );
2179 self.apply_loaded_patch_entity_core(
2180 collection,
2181 entity,
2182 crate::json::Value::Null,
2183 operations,
2184 )
2185 }
2186
2187 pub fn execute_delete(
2189 &self,
2190 raw_query: &str,
2191 query: &DeleteQuery,
2192 ) -> RedDBResult<RuntimeQueryResult> {
2193 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2194 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2197 return Err(RedDBError::InvalidOperation(format!(
2198 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2199 query.table
2200 )));
2201 }
2202 crate::runtime::collection_contract::CollectionContractGate::check(
2206 self,
2207 &query.table,
2208 crate::runtime::collection_contract::MutationKind::Delete,
2209 )?;
2210
2211 if let Some(items) = query.returning.clone() {
2218 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2219 RedDBError::Query(
2220 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2221 )
2222 })?;
2223 let captured = self.execute_query(&select_sql)?;
2224
2225 let mut inner_query = query.clone();
2226 inner_query.returning = None;
2227 let _ = self.execute_delete(raw_query, &inner_query)?;
2228
2229 let snapshots: Vec<Vec<(String, Value)>> = captured
2230 .result
2231 .records
2232 .iter()
2233 .map(|rec| {
2234 rec.iter_fields()
2235 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2236 .collect()
2237 })
2238 .collect();
2239 let affected = snapshots.len() as u64;
2240 let result = build_returning_result(&items, &snapshots, None);
2241
2242 let mut response = RuntimeQueryResult::dml_result(
2243 raw_query.to_string(),
2244 affected,
2245 "delete",
2246 "runtime-dml-returning",
2247 );
2248 response.result = result;
2249 return Ok(response);
2250 }
2251 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2258 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2259 self,
2260 &query.table,
2261 crate::storage::query::ast::PolicyAction::Delete,
2262 );
2263 let Some(policy) = rls_filter else {
2264 return Ok(RuntimeQueryResult::dml_result(
2265 raw_query.to_string(),
2266 0,
2267 "delete",
2268 "runtime-dml-rls",
2269 ));
2270 };
2271 let mut augmented = query.clone();
2276 augmented.filter = Some(match augmented.filter.take() {
2277 Some(existing) => {
2278 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2279 }
2280 None => policy,
2281 });
2282 return self.execute_delete_inner(raw_query, &augmented);
2283 }
2284 self.execute_delete_inner(raw_query, query)
2285 }
2286
2287 fn execute_delete_inner(
2288 &self,
2289 raw_query: &str,
2290 query: &DeleteQuery,
2291 ) -> RedDBResult<RuntimeQueryResult> {
2292 let effective_filter = effective_delete_filter(query);
2293
2294 let scan = super::dml_target_scan::DmlTargetScan::new(
2298 self,
2299 &query.table,
2300 effective_filter.as_ref(),
2301 None,
2302 );
2303 let ids_to_delete = scan.find_target_ids()?;
2304
2305 let needs_delete_events =
2308 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2309 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2310 scan.row_json_pre_images(&ids_to_delete)
2311 } else {
2312 HashMap::new()
2313 };
2314
2315 let mut affected: u64 = 0;
2316 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2317 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2318 affected += count;
2319 if needs_delete_events && !lsns.is_empty() {
2320 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2324 self.emit_delete_events_for_collection(
2325 &query.table,
2326 deleted_chunk,
2327 &lsns,
2328 &pre_images,
2329 )?;
2330 }
2331 }
2332 pre_images.clear();
2333
2334 if affected > 0 {
2335 self.note_table_write(&query.table);
2336 }
2337
2338 Ok(RuntimeQueryResult::dml_result(
2339 raw_query.to_string(),
2340 affected,
2341 "delete",
2342 "runtime-dml",
2343 ))
2344 }
2345}
2346
2347fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2353 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2354 return Ok(());
2355 }
2356 for (column, _) in &query.assignment_exprs {
2357 if is_immutable_graph_identity_field(column) {
2358 return Err(RedDBError::Query(format!(
2359 "immutable graph field '{column}' cannot be updated"
2360 )));
2361 }
2362 }
2363 Ok(())
2364}
2365
2366fn ensure_graph_identity_update_allowed(
2367 entity: &UnifiedEntity,
2368 compiled_plan: &CompiledUpdatePlan,
2369 assignments: &MaterializedUpdateAssignments,
2370) -> RedDBResult<()> {
2371 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2372 return Ok(());
2373 }
2374
2375 for (column, _) in compiled_plan
2376 .static_field_assignments
2377 .iter()
2378 .chain(assignments.dynamic_field_assignments.iter())
2379 {
2380 if is_immutable_graph_identity_field(column) {
2381 return Err(RedDBError::Query(format!(
2382 "immutable graph field '{column}' cannot be updated"
2383 )));
2384 }
2385 }
2386
2387 Ok(())
2388}
2389
2390fn is_immutable_graph_identity_field(column: &str) -> bool {
2391 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2392 .iter()
2393 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2394}
2395
2396fn build_patch_operations_from_materialized_assignments(
2397 entity: &UnifiedEntity,
2398 compiled_plan: &CompiledUpdatePlan,
2399 assignments: MaterializedUpdateAssignments,
2400) -> Vec<PatchEntityOperation> {
2401 let mut operations = Vec::with_capacity(
2402 compiled_plan.static_field_assignments.len()
2403 + compiled_plan.static_metadata_assignments.len()
2404 + assignments.dynamic_field_assignments.len()
2405 + assignments.dynamic_metadata_assignments.len(),
2406 );
2407
2408 for (column, value) in &compiled_plan.static_field_assignments {
2409 operations.push(PatchEntityOperation {
2410 op: PatchEntityOperationType::Set,
2411 path: update_patch_path_for_entity(entity, column),
2412 value: Some(storage_value_to_json(value)),
2413 });
2414 }
2415
2416 for (column, value) in assignments.dynamic_field_assignments {
2417 operations.push(PatchEntityOperation {
2418 op: PatchEntityOperationType::Set,
2419 path: update_patch_path_for_entity(entity, &column),
2420 value: Some(storage_value_to_json(&value)),
2421 });
2422 }
2423
2424 for (key, value) in &compiled_plan.static_metadata_assignments {
2425 operations.push(PatchEntityOperation {
2426 op: PatchEntityOperationType::Set,
2427 path: vec!["metadata".to_string(), key.clone()],
2428 value: Some(metadata_value_to_json(value)),
2429 });
2430 }
2431
2432 for (key, value) in assignments.dynamic_metadata_assignments {
2433 operations.push(PatchEntityOperation {
2434 op: PatchEntityOperationType::Set,
2435 path: vec!["metadata".to_string(), key],
2436 value: Some(metadata_value_to_json(&value)),
2437 });
2438 }
2439
2440 operations
2441}
2442
2443fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2444 if matches!(
2445 (&entity.kind, &entity.data),
2446 (
2447 crate::storage::EntityKind::GraphNode(_),
2448 EntityData::Node(_)
2449 )
2450 ) && column.eq_ignore_ascii_case("node_type")
2451 {
2452 return vec!["node_type".to_string()];
2453 }
2454 if matches!(
2455 (&entity.kind, &entity.data),
2456 (
2457 crate::storage::EntityKind::GraphEdge(_),
2458 EntityData::Edge(_)
2459 )
2460 ) && column.eq_ignore_ascii_case("weight")
2461 {
2462 return vec!["weight".to_string()];
2463 }
2464 vec!["fields".to_string(), column.to_string()]
2465}
2466
2467fn delete_to_select_sql(sql: &str) -> Option<String> {
2477 let trimmed = sql.trim_start();
2478 let lowered = trimmed.to_ascii_lowercase();
2479 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2480 return None;
2481 }
2482 let from_idx = lowered.find(" from ")?;
2484 let after_from = &trimmed[from_idx + " from ".len()..];
2485 let after_from_lc = &lowered[from_idx + " from ".len()..];
2486
2487 let mut body = after_from.to_string();
2492 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2493 body.truncate(pos);
2494 }
2495 Some(format!("SELECT * FROM {}", body.trim_end()))
2496}
2497
2498fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2503 let bytes = haystack.as_bytes();
2504 let nlen = needle.len();
2505 let mut i = 0usize;
2506 let mut in_string = false;
2507 while i < bytes.len() {
2508 let c = bytes[i];
2509 if c == b'\'' {
2510 in_string = !in_string;
2511 i += 1;
2512 continue;
2513 }
2514 if !in_string
2515 && i + nlen <= bytes.len()
2516 && &bytes[i..i + nlen] == needle.as_bytes()
2517 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2518 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2519 {
2520 return Some(i);
2521 }
2522 i += 1;
2523 }
2524 None
2525}
2526
2527fn build_returning_result(
2534 items: &[ReturningItem],
2535 snapshots: &[Vec<(String, Value)>],
2536 outputs: Option<&[CreateEntityOutput]>,
2537) -> UnifiedResult {
2538 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2539 let public_item_outputs = outputs.is_some_and(|outs| {
2540 outs.first()
2541 .and_then(|out| out.entity.as_ref())
2542 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2543 });
2544
2545 let mut columns: Vec<String> = if project_all {
2546 let mut cols: Vec<String> = Vec::new();
2547 if public_item_outputs {
2548 cols.extend(
2549 [
2550 "rid",
2551 "collection",
2552 "kind",
2553 "tenant",
2554 "created_at",
2555 "updated_at",
2556 ]
2557 .into_iter()
2558 .map(str::to_string),
2559 );
2560 } else if outputs.is_some() {
2561 cols.push("rid".to_string());
2562 }
2563 if let Some(first) = snapshots.first() {
2564 for (name, _) in first {
2565 cols.push(name.clone());
2566 }
2567 }
2568 cols
2569 } else {
2570 items
2571 .iter()
2572 .filter_map(|it| match it {
2573 ReturningItem::Column(c) => Some(c.clone()),
2574 ReturningItem::All => None,
2575 })
2576 .collect()
2577 };
2578 {
2580 let mut seen = std::collections::HashSet::new();
2581 columns.retain(|c| seen.insert(c.clone()));
2582 }
2583
2584 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2585 for (idx, snap) in snapshots.iter().enumerate() {
2586 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2587 if let Some(outs) = outputs {
2588 if let Some(out) = outs.get(idx) {
2589 if let Some(entity) = out.entity.as_ref() {
2590 if let Some(kind) = public_returning_item_kind(entity) {
2591 values.insert(
2592 Arc::clone(&sys_key_rid()),
2593 Value::UnsignedInteger(out.id.raw()),
2594 );
2595 values.insert(
2596 Arc::clone(&sys_key_collection()),
2597 Value::text(entity.kind.collection().to_string()),
2598 );
2599 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2600 values.insert(
2601 Arc::clone(&sys_key_created_at()),
2602 Value::UnsignedInteger(entity.created_at),
2603 );
2604 values.insert(
2605 Arc::clone(&sys_key_updated_at()),
2606 Value::UnsignedInteger(entity.updated_at),
2607 );
2608 } else {
2609 values.insert(
2610 Arc::clone(&sys_key_rid()),
2611 Value::Integer(out.id.raw() as i64),
2612 );
2613 }
2614 } else {
2615 values.insert(
2616 Arc::clone(&sys_key_rid()),
2617 Value::Integer(out.id.raw() as i64),
2618 );
2619 }
2620 }
2621 }
2622 for (name, val) in snap {
2623 values.insert(Arc::from(name.as_str()), val.clone());
2624 }
2625 if !values.contains_key("tenant") {
2626 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2627 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2628 }
2629 let mut rec = UnifiedRecord::default();
2630 for col in &columns {
2632 if let Some(v) = values.get(col.as_str()) {
2633 rec.set_arc(Arc::from(col.as_str()), v.clone());
2634 }
2635 }
2636 records.push(rec);
2637 }
2638
2639 UnifiedResult {
2640 columns,
2641 records,
2642 stats: Default::default(),
2643 pre_serialized_json: None,
2644 }
2645}
2646
2647fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2648 match (&entity.kind, &entity.data) {
2649 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2650 Some("node")
2651 }
2652 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2653 Some("edge")
2654 }
2655 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2656 (_, crate::storage::EntityData::Vector(_)) => Some("vector"),
2661 _ => None,
2662 }
2663}
2664
2665fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2666 let Some(row) = entity.data.as_row() else {
2667 return "row";
2668 };
2669
2670 let is_kv = row.named.as_ref().is_some_and(|named| {
2671 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2672 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2673 });
2674 if is_kv {
2675 return "kv";
2676 }
2677
2678 let is_document = row
2679 .named
2680 .as_ref()
2681 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2682 || row.columns.iter().any(runtime_returning_documentish_value);
2683 if is_document {
2684 "document"
2685 } else {
2686 "row"
2687 }
2688}
2689
2690fn runtime_returning_documentish_value(value: &Value) -> bool {
2691 matches!(value, Value::Json(_) | Value::Blob(_))
2692}
2693
2694fn row_insert_returning_snapshots(
2695 outputs: &[CreateEntityOutput],
2696 fallback: Vec<Vec<(String, Value)>>,
2697) -> Vec<Vec<(String, Value)>> {
2698 outputs
2699 .iter()
2700 .enumerate()
2701 .map(|(idx, out)| {
2702 out.entity
2703 .as_ref()
2704 .map(entity_row_fields_snapshot)
2705 .filter(|snap| !snap.is_empty())
2706 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2707 })
2708 .collect()
2709}
2710
2711fn graph_insert_returning_snapshots(
2712 store: &crate::storage::unified::UnifiedStore,
2713 collection: &str,
2714 ids: &[EntityId],
2715) -> Vec<Vec<(String, Value)>> {
2716 let Some(manager) = store.get_collection(collection) else {
2717 return Vec::new();
2718 };
2719
2720 ids.iter()
2721 .filter_map(|id| manager.get(*id))
2722 .filter_map(|entity| {
2723 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2724 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2725 Some(record)
2726 })
2727 .map(|record| {
2728 record
2729 .iter_fields()
2730 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2731 .collect()
2732 })
2733 .collect()
2734}
2735
2736fn graph_update_returning_snapshots(
2737 runtime: &RedDBRuntime,
2738 collection: &str,
2739 ids: &[EntityId],
2740) -> Vec<Vec<(String, Value)>> {
2741 let store = runtime.db().store();
2742 let Some(manager) = store.get_collection(collection) else {
2743 return Vec::new();
2744 };
2745
2746 manager
2747 .get_many(ids)
2748 .into_iter()
2749 .flatten()
2750 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2751 .map(|record| {
2752 record
2753 .iter_fields()
2754 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2755 .collect()
2756 })
2757 .collect()
2758}
2759
2760fn ensure_update_target_contract(
2761 runtime: &RedDBRuntime,
2762 collection: &str,
2763 target: UpdateTarget,
2764) -> RedDBResult<()> {
2765 let Some(contract) = runtime.db().collection_contract(collection) else {
2766 return Ok(());
2767 };
2768 if update_target_contract_is_advisory(&contract)
2769 || update_target_allows_model(contract.declared_model, update_target_model(target))
2770 {
2771 return Ok(());
2772 }
2773 Err(RedDBError::InvalidOperation(format!(
2774 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2775 collection,
2776 update_model_name(contract.declared_model),
2777 update_model_name(update_target_model(target))
2778 )))
2779}
2780
2781fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2782 matches!(
2783 (&contract.origin, &contract.schema_mode),
2784 (
2785 crate::physical::ContractOrigin::Implicit,
2786 crate::catalog::SchemaMode::Dynamic,
2787 )
2788 )
2789}
2790
2791fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2792 match target {
2793 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2794 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2795 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2796 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2797 }
2798}
2799
2800fn update_target_allows_model(
2801 declared_model: crate::catalog::CollectionModel,
2802 requested_model: crate::catalog::CollectionModel,
2803) -> bool {
2804 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2805}
2806
2807fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2808 match model {
2809 crate::catalog::CollectionModel::Table => "table",
2810 crate::catalog::CollectionModel::Document => "document",
2811 crate::catalog::CollectionModel::Graph => "graph",
2812 crate::catalog::CollectionModel::Vector => "vector",
2813 crate::catalog::CollectionModel::Hll => "hll",
2814 crate::catalog::CollectionModel::Sketch => "sketch",
2815 crate::catalog::CollectionModel::Filter => "filter",
2816 crate::catalog::CollectionModel::Kv => "kv",
2817 crate::catalog::CollectionModel::Config => "config",
2818 crate::catalog::CollectionModel::Vault => "vault",
2819 crate::catalog::CollectionModel::Mixed => "mixed",
2820 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2821 crate::catalog::CollectionModel::Queue => "queue",
2822 crate::catalog::CollectionModel::Metrics => "metrics",
2823 }
2824}
2825
2826fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2827 let db = runtime.db();
2828 if let Some(contract) = db.collection_contract(collection) {
2829 let advisory_implicit_dynamic = matches!(
2830 (&contract.origin, &contract.schema_mode),
2831 (
2832 crate::physical::ContractOrigin::Implicit,
2833 crate::catalog::SchemaMode::Dynamic,
2834 )
2835 );
2836 if advisory_implicit_dynamic
2837 || matches!(
2838 contract.declared_model,
2839 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2840 )
2841 {
2842 return Ok(());
2843 }
2844 return Err(RedDBError::InvalidOperation(format!(
2845 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2846 collection, contract.declared_model
2847 )));
2848 }
2849
2850 let now = std::time::SystemTime::now()
2851 .duration_since(std::time::UNIX_EPOCH)
2852 .unwrap_or_default()
2853 .as_millis();
2854 db.save_collection_contract(crate::physical::CollectionContract {
2855 name: collection.to_string(),
2856 declared_model: crate::catalog::CollectionModel::Graph,
2857 schema_mode: crate::catalog::SchemaMode::Dynamic,
2858 origin: crate::physical::ContractOrigin::Implicit,
2859 version: 1,
2860 created_at_unix_ms: now,
2861 updated_at_unix_ms: now,
2862 default_ttl_ms: db.collection_default_ttl_ms(collection),
2863 vector_dimension: None,
2864 vector_metric: None,
2865 context_index_fields: Vec::new(),
2866 declared_columns: Vec::new(),
2867 table_def: None,
2868 timestamps_enabled: false,
2869 context_index_enabled: false,
2870 metrics_raw_retention_ms: None,
2871 metrics_rollup_policies: Vec::new(),
2872 metrics_tenant_identity: None,
2873 metrics_namespace: None,
2874 append_only: false,
2875 subscriptions: Vec::new(),
2876 analytics_config: Vec::new(),
2877 session_key: None,
2878 session_gap_ms: None,
2879 retention_duration_ms: None,
2880 analytical_storage: None,
2881
2882 ai_policy: None,
2883 })
2884 .map(|_| ())
2885 .map_err(|err| RedDBError::Internal(err.to_string()))
2886}
2887
2888fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2889 query
2890 .assignment_exprs
2891 .iter()
2892 .enumerate()
2893 .any(|(idx, (column, expr))| {
2894 query
2895 .compound_assignment_ops
2896 .get(idx)
2897 .is_some_and(|op| op.is_some())
2898 || expr_references_update_column(expr, &query.table, column)
2899 })
2900}
2901
2902fn evaluate_compound_update_assignment(
2903 column: &str,
2904 record: &UnifiedRecord,
2905 op: BinOp,
2906 rhs: Value,
2907) -> RedDBResult<Value> {
2908 let lhs = record.get(column).ok_or_else(|| {
2909 RedDBError::Query(format!(
2910 "compound assignment requires existing numeric field '{column}'"
2911 ))
2912 })?;
2913 if matches!(lhs, Value::Null) {
2914 return Err(RedDBError::Query(format!(
2915 "compound assignment requires non-null numeric field '{column}'"
2916 )));
2917 }
2918 apply_compound_numeric_op(column, op, lhs, &rhs)
2919}
2920
2921fn apply_compound_numeric_op(
2922 column: &str,
2923 op: BinOp,
2924 lhs: &Value,
2925 rhs: &Value,
2926) -> RedDBResult<Value> {
2927 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2928 return Err(RedDBError::Query(format!(
2929 "compound assignment requires numeric field '{column}'"
2930 )));
2931 };
2932 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2933 return Err(RedDBError::Query(format!(
2934 "compound assignment requires numeric right-hand value for field '{column}'"
2935 )));
2936 };
2937
2938 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2939 let a = lhs_number.as_f64();
2940 let b = rhs_number.as_f64();
2941 let out = match op {
2942 BinOp::Add => a + b,
2943 BinOp::Sub => a - b,
2944 BinOp::Mul => a * b,
2945 BinOp::Div => {
2946 if b == 0.0 {
2947 return Err(RedDBError::Query(format!(
2948 "division by zero in compound assignment for field '{column}'"
2949 )));
2950 }
2951 a / b
2952 }
2953 BinOp::Mod => {
2954 if b == 0.0 {
2955 return Err(RedDBError::Query(format!(
2956 "modulo by zero in compound assignment for field '{column}'"
2957 )));
2958 }
2959 a % b
2960 }
2961 _ => {
2962 return Err(RedDBError::Query(format!(
2963 "unsupported compound assignment operator for field '{column}'"
2964 )));
2965 }
2966 };
2967 if !out.is_finite() {
2968 return Err(RedDBError::Query(format!(
2969 "numeric overflow in compound assignment for field '{column}'"
2970 )));
2971 }
2972 return Ok(Value::Float(out));
2973 }
2974
2975 let a = lhs_number.as_i128();
2976 let b = rhs_number.as_i128();
2977 let out = match op {
2978 BinOp::Add => a.checked_add(b),
2979 BinOp::Sub => a.checked_sub(b),
2980 BinOp::Mul => a.checked_mul(b),
2981 BinOp::Mod => {
2982 if b == 0 {
2983 return Err(RedDBError::Query(format!(
2984 "modulo by zero in compound assignment for field '{column}'"
2985 )));
2986 }
2987 a.checked_rem(b)
2988 }
2989 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2990 _ => None,
2991 }
2992 .ok_or_else(|| {
2993 RedDBError::Query(format!(
2994 "numeric overflow in compound assignment for field '{column}'"
2995 ))
2996 })?;
2997
2998 if matches!(lhs, Value::UnsignedInteger(_)) {
2999 let value = u64::try_from(out).map_err(|_| {
3000 RedDBError::Query(format!(
3001 "numeric overflow in compound assignment for field '{column}'"
3002 ))
3003 })?;
3004 Ok(Value::UnsignedInteger(value))
3005 } else {
3006 let value = i64::try_from(out).map_err(|_| {
3007 RedDBError::Query(format!(
3008 "numeric overflow in compound assignment for field '{column}'"
3009 ))
3010 })?;
3011 Ok(Value::Integer(value))
3012 }
3013}
3014
3015#[derive(Clone, Copy)]
3016enum CompoundNumber {
3017 Integer(i128),
3018 Float(f64),
3019}
3020
3021impl CompoundNumber {
3022 fn from_value(value: &Value) -> Option<Self> {
3023 match value {
3024 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
3025 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
3026 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
3027 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
3028 _ => None,
3029 }
3030 }
3031
3032 fn is_float(self) -> bool {
3033 matches!(self, Self::Float(_))
3034 }
3035
3036 fn as_f64(self) -> f64 {
3037 match self {
3038 Self::Integer(value) => value as f64,
3039 Self::Float(value) => value,
3040 }
3041 }
3042
3043 fn as_i128(self) -> i128 {
3044 match self {
3045 Self::Integer(value) => value,
3046 Self::Float(_) => unreachable!("float compound number used as integer"),
3047 }
3048 }
3049}
3050
3051fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3052 match expr {
3053 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3054 Expr::Column { field, .. } => {
3055 field_ref_matches_update_column(field, table_name, target_column)
3056 }
3057 Expr::BinaryOp { lhs, rhs, .. } => {
3058 expr_references_update_column(lhs, table_name, target_column)
3059 || expr_references_update_column(rhs, table_name, target_column)
3060 }
3061 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3062 expr_references_update_column(operand, table_name, target_column)
3063 }
3064 Expr::FunctionCall { args, .. } => args
3065 .iter()
3066 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3067 Expr::Case {
3068 branches, else_, ..
3069 } => {
3070 branches.iter().any(|(cond, value)| {
3071 expr_references_update_column(cond, table_name, target_column)
3072 || expr_references_update_column(value, table_name, target_column)
3073 }) || else_
3074 .as_deref()
3075 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3076 }
3077 Expr::IsNull { operand, .. } => {
3078 expr_references_update_column(operand, table_name, target_column)
3079 }
3080 Expr::InList { target, values, .. } => {
3081 expr_references_update_column(target, table_name, target_column)
3082 || values
3083 .iter()
3084 .any(|value| expr_references_update_column(value, table_name, target_column))
3085 }
3086 Expr::Between {
3087 target, low, high, ..
3088 } => {
3089 expr_references_update_column(target, table_name, target_column)
3090 || expr_references_update_column(low, table_name, target_column)
3091 || expr_references_update_column(high, table_name, target_column)
3092 }
3093 Expr::WindowFunctionCall { args, window, .. } => {
3094 args.iter()
3095 .any(|arg| expr_references_update_column(arg, table_name, target_column))
3096 || window
3097 .partition_by
3098 .iter()
3099 .any(|e| expr_references_update_column(e, table_name, target_column))
3100 || window
3101 .order_by
3102 .iter()
3103 .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3104 }
3105 }
3106}
3107
3108fn field_ref_matches_update_column(
3109 field: &FieldRef,
3110 table_name: &str,
3111 target_column: &str,
3112) -> bool {
3113 match field {
3114 FieldRef::TableColumn { table, column } => {
3115 column.eq_ignore_ascii_case(target_column)
3116 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3117 }
3118 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3119 false
3120 }
3121 }
3122}
3123
3124fn resolve_update_entity_by_logical_id(
3125 runtime: &RedDBRuntime,
3126 table: &str,
3127 logical_id: EntityId,
3128) -> Option<UnifiedEntity> {
3129 let store = runtime.inner.db.store();
3130 if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3131 return Some(entity);
3132 }
3133 store.get(table, logical_id)
3136}
3137
3138fn update_cdc_item_kind(
3139 runtime: &RedDBRuntime,
3140 collection: &str,
3141 entity: &UnifiedEntity,
3142) -> &'static str {
3143 match &entity.data {
3144 EntityData::Node(_) => return "node",
3145 EntityData::Edge(_) => return "edge",
3146 _ => {}
3147 }
3148
3149 match runtime
3150 .db()
3151 .collection_contract(collection)
3152 .map(|contract| contract.declared_model)
3153 {
3154 Some(crate::catalog::CollectionModel::Document) => "document",
3155 Some(crate::catalog::CollectionModel::Kv)
3156 | Some(crate::catalog::CollectionModel::Vault) => "kv",
3157 _ => "row",
3158 }
3159}
3160
3161fn ordered_update_target_ids(
3162 manager: &Arc<crate::storage::SegmentManager>,
3163 entity_ids: &[EntityId],
3164 order_by: &[OrderByClause],
3165 limit: Option<usize>,
3166) -> Vec<EntityId> {
3167 let mut entities: Vec<UnifiedEntity> =
3168 manager.get_many(entity_ids).into_iter().flatten().collect();
3169 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3170 if let Some(limit) = limit {
3171 entities.truncate(limit);
3172 }
3173 entities.into_iter().map(|entity| entity.id).collect()
3174}
3175
3176fn compare_update_order(
3177 left: &UnifiedEntity,
3178 right: &UnifiedEntity,
3179 order_by: &[OrderByClause],
3180) -> Ordering {
3181 for clause in order_by {
3182 let left_value = update_order_value(left, &clause.field);
3183 let right_value = update_order_value(right, &clause.field);
3184 let ordering = compare_update_order_values(
3185 left_value.as_ref(),
3186 right_value.as_ref(),
3187 clause.nulls_first,
3188 );
3189 if ordering != Ordering::Equal {
3190 return if clause.ascending {
3191 ordering
3192 } else {
3193 ordering.reverse()
3194 };
3195 }
3196 }
3197 left.logical_id().raw().cmp(&right.logical_id().raw())
3198}
3199
3200fn compare_update_order_values(
3201 left: Option<&Value>,
3202 right: Option<&Value>,
3203 nulls_first: bool,
3204) -> Ordering {
3205 match (left, right) {
3206 (None, None) => Ordering::Equal,
3207 (None, Some(_)) => {
3208 if nulls_first {
3209 Ordering::Less
3210 } else {
3211 Ordering::Greater
3212 }
3213 }
3214 (Some(_), None) => {
3215 if nulls_first {
3216 Ordering::Greater
3217 } else {
3218 Ordering::Less
3219 }
3220 }
3221 (Some(left), Some(right)) => {
3222 crate::storage::query::value_compare::total_compare_values(left, right)
3223 }
3224 }
3225}
3226
3227fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3228 let FieldRef::TableColumn { table, column } = field else {
3229 return None;
3230 };
3231 if !table.is_empty() {
3232 return None;
3233 }
3234 if column.eq_ignore_ascii_case("rid") {
3235 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3236 }
3237 match &entity.data {
3238 EntityData::Row(row) => row.get_field(column).cloned(),
3239 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3240 .and_then(|record| record.get(column).cloned()),
3241 _ => None,
3242 }
3243}
3244
3245fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3246 if columns.is_empty() {
3247 return columns;
3248 }
3249
3250 let mut unique = Vec::with_capacity(columns.len());
3251 for column in columns.drain(..) {
3252 if !unique
3253 .iter()
3254 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3255 {
3256 unique.push(column);
3257 }
3258 }
3259 unique
3260}
3261
3262const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3267
3268fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3269 if column.eq_ignore_ascii_case("_ttl") {
3270 Some(SQL_TTL_METADATA_COLUMNS[0])
3271 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3272 Some(SQL_TTL_METADATA_COLUMNS[1])
3273 } else if column.eq_ignore_ascii_case("_expires_at") {
3274 Some(SQL_TTL_METADATA_COLUMNS[2])
3275 } else {
3276 None
3277 }
3278}
3279
3280fn canonicalize_sql_ttl_metadata(
3285 key: &'static str,
3286 value: MetadataValue,
3287) -> (&'static str, MetadataValue) {
3288 if key != "_ttl" {
3289 return (key, value);
3290 }
3291 let scaled = match value {
3292 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3293 MetadataValue::Timestamp(ms_or_s) => {
3294 MetadataValue::Timestamp(ms_or_s)
3297 }
3298 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3299 other => other,
3300 };
3301 ("_ttl_ms", scaled)
3302}
3303
3304pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3308
3309impl RedDBRuntime {
3310 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3316 match value {
3317 Value::Password(marked) => {
3318 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3319 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3320 } else {
3321 Ok(Value::Password(marked))
3322 }
3323 }
3324 Value::Secret(bytes) => {
3325 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3326 if !self.secret_auto_encrypt() {
3327 return Err(RedDBError::Query(
3328 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3329 is false. Insert pre-encrypted bytes directly instead."
3330 .to_string(),
3331 ));
3332 }
3333 let key = self.secret_aes_key().ok_or_else(|| {
3334 RedDBError::Query(
3335 "SECRET() column encryption requires a bootstrapped \
3336 vault (red.secret.aes_key is missing). Start the server \
3337 with --vault to enable."
3338 .to_string(),
3339 )
3340 })?;
3341 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3342 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3343 } else {
3344 Ok(Value::Secret(bytes))
3345 }
3346 }
3347 other => Ok(other),
3348 }
3349 }
3350}
3351
3352fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3355 let nonce_bytes = crate::auth::store::random_bytes(12);
3356 let mut nonce = [0u8; 12];
3357 nonce.copy_from_slice(&nonce_bytes[..12]);
3358 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3359 let mut out = Vec::with_capacity(12 + ct.len());
3360 out.extend_from_slice(&nonce);
3361 out.extend_from_slice(&ct);
3362 out
3363}
3364
3365pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3369 if payload.len() < 12 {
3370 return None;
3371 }
3372 let mut nonce = [0u8; 12];
3373 nonce.copy_from_slice(&payload[..12]);
3374 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3375}
3376
3377fn split_insert_metadata(
3378 runtime: &RedDBRuntime,
3379 columns: &[String],
3380 values: &[Value],
3381) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3382 let mut fields = Vec::new();
3383 let mut metadata = Vec::new();
3384
3385 for (column, value) in columns.iter().zip(values.iter()) {
3386 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3388 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3389 let (canonical_key, canonical_value) =
3390 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3391 metadata.push((canonical_key.to_string(), canonical_value));
3392 continue;
3393 }
3394 fields.push((
3395 column.clone(),
3396 runtime.resolve_crypto_sentinel(value.clone())?,
3397 ));
3398 }
3399
3400 Ok((fields, metadata))
3401}
3402
3403fn merge_with_clauses(
3405 metadata: &mut Vec<(String, MetadataValue)>,
3406 ttl_ms: Option<u64>,
3407 expires_at_ms: Option<u64>,
3408 with_metadata: &[(String, Value)],
3409) {
3410 if let Some(ms) = ttl_ms {
3411 metadata.push((
3412 "_ttl_ms".to_string(),
3413 if ms <= i64::MAX as u64 {
3414 MetadataValue::Int(ms as i64)
3415 } else {
3416 MetadataValue::Timestamp(ms)
3417 },
3418 ));
3419 }
3420 if let Some(ms) = expires_at_ms {
3421 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3422 }
3423 for (key, value) in with_metadata {
3424 let meta_value = match value {
3425 Value::Text(s) => MetadataValue::String(s.to_string()),
3426 Value::Integer(n) => MetadataValue::Int(*n),
3427 Value::Float(n) => MetadataValue::Float(*n),
3428 Value::Boolean(b) => MetadataValue::Bool(*b),
3429 _ => MetadataValue::String(value.to_string()),
3430 };
3431 metadata.push((key.clone(), meta_value));
3432 }
3433}
3434
3435fn merge_vector_metadata_column(
3436 metadata: &mut Vec<(String, MetadataValue)>,
3437 columns: &[String],
3438 values: &[Value],
3439) -> RedDBResult<()> {
3440 let Some(value) = columns
3441 .iter()
3442 .position(|column| column.eq_ignore_ascii_case("metadata"))
3443 .map(|index| &values[index])
3444 else {
3445 return Ok(());
3446 };
3447 let json = match value {
3448 Value::Null => return Ok(()),
3449 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3450 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3451 })?,
3452 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3453 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3454 })?,
3455 other => {
3456 return Err(RedDBError::Query(format!(
3457 "column 'metadata' expected JSON object, got {other:?}"
3458 )))
3459 }
3460 };
3461 let parsed = metadata_from_json(&json)?;
3462 for (key, value) in parsed.iter() {
3463 metadata.push((key.clone(), value.clone()));
3464 }
3465 Ok(())
3466}
3467
3468fn apply_collection_default_ttl_metadata(
3469 runtime: &RedDBRuntime,
3470 collection: &str,
3471 metadata: &mut Vec<(String, MetadataValue)>,
3472) {
3473 if has_internal_ttl_metadata(metadata) {
3474 return;
3475 }
3476
3477 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3478 return;
3479 };
3480
3481 metadata.push((
3482 "_ttl_ms".to_string(),
3483 if default_ttl_ms <= i64::MAX as u64 {
3484 MetadataValue::Int(default_ttl_ms as i64)
3485 } else {
3486 MetadataValue::Timestamp(default_ttl_ms)
3487 },
3488 ));
3489}
3490
3491fn ensure_non_tree_reserved_metadata_entries(
3492 metadata: &[(String, MetadataValue)],
3493) -> RedDBResult<()> {
3494 for (key, _) in metadata {
3495 ensure_non_tree_reserved_metadata_key(key)?;
3496 }
3497 Ok(())
3498}
3499
3500fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3501 if key.starts_with(TREE_METADATA_PREFIX) {
3502 return Err(RedDBError::Query(format!(
3503 "metadata key '{}' is reserved for managed trees",
3504 key
3505 )));
3506 }
3507 Ok(())
3508}
3509
3510fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3511 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3512 return Err(RedDBError::Query(format!(
3513 "edge label '{}' is reserved for managed trees",
3514 TREE_CHILD_EDGE_LABEL
3515 )));
3516 }
3517 Ok(())
3518}
3519
3520fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3521 let mut columns = Vec::with_capacity(pairs.len());
3522 let mut values = Vec::with_capacity(pairs.len());
3523
3524 for (column, value) in pairs {
3525 columns.push(column.clone());
3526 values.push(value.clone());
3527 }
3528
3529 (columns, values)
3530}
3531
3532fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3534 for (i, col) in columns.iter().enumerate() {
3535 if col.eq_ignore_ascii_case(name) {
3536 return Ok(values[i].clone());
3537 }
3538 }
3539 Err(RedDBError::Query(format!(
3540 "required column '{name}' not found in INSERT"
3541 )))
3542}
3543
3544fn find_column_value_string(
3546 columns: &[String],
3547 values: &[Value],
3548 name: &str,
3549) -> RedDBResult<String> {
3550 let val = find_column_value(columns, values, name)?;
3551 match val {
3552 Value::Text(s) => Ok(s.to_string()),
3553 Value::Integer(n) => Ok(n.to_string()),
3554 Value::Float(n) => Ok(n.to_string()),
3555 other => Err(RedDBError::Query(format!(
3556 "column '{name}' expected text, got {other:?}"
3557 ))),
3558 }
3559}
3560
3561fn find_document_body_json(
3562 columns: &[String],
3563 values: &[Value],
3564) -> RedDBResult<crate::json::Value> {
3565 let val = find_column_value(columns, values, "body")?;
3566 match val {
3567 Value::Json(bytes) | Value::Blob(bytes) => crate::json::from_slice(&bytes)
3568 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3569 Value::Text(text) => crate::json::from_str(text.as_ref())
3570 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3571 Value::Integer(value) => crate::json::from_str(&value.to_string())
3572 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3573 Value::UnsignedInteger(value) => crate::json::from_str(&value.to_string())
3574 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3575 Value::Float(value) => crate::json::from_str(&value.to_string())
3576 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3577 other => Err(RedDBError::Query(format!(
3578 "column 'body' expected JSON body, got {other:?}"
3579 ))),
3580 }
3581}
3582
3583fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3584 let val = find_column_value(columns, values, name)?;
3585 match val {
3586 Value::Float(n) => Ok(n),
3587 Value::Integer(n) => Ok(n as f64),
3588 Value::UnsignedInteger(n) => Ok(n as f64),
3589 Value::Text(s) => s
3590 .parse::<f64>()
3591 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3592 other => Err(RedDBError::Query(format!(
3593 "column '{name}' expected number, got {other:?}"
3594 ))),
3595 }
3596}
3597
3598fn find_column_value_opt_string(
3600 columns: &[String],
3601 values: &[Value],
3602 name: &str,
3603) -> Option<String> {
3604 for (i, col) in columns.iter().enumerate() {
3605 if col.eq_ignore_ascii_case(name) {
3606 return match &values[i] {
3607 Value::Null => None,
3608 Value::Text(s) => Some(s.to_string()),
3609 Value::Integer(n) => Some(n.to_string()),
3610 Value::Float(n) => Some(n.to_string()),
3611 _ => None,
3612 };
3613 }
3614 }
3615 None
3616}
3617
3618fn resolve_edge_endpoint(
3625 store: &crate::storage::unified::UnifiedStore,
3626 collection: &str,
3627 columns: &[String],
3628 values: &[Value],
3629 name: &str,
3630) -> RedDBResult<u64> {
3631 let val = find_column_value(columns, values, name)?;
3632 match val {
3633 Value::Integer(n) => Ok(n as u64),
3634 Value::UnsignedInteger(n) => Ok(n),
3635 Value::Text(s) => {
3636 if let Ok(n) = s.parse::<u64>() {
3637 return Ok(n);
3638 }
3639 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3640 match matches.len() {
3641 0 => Err(RedDBError::Query(format!(
3642 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3643 ))),
3644 1 => Ok(matches[0].raw()),
3645 n => Err(RedDBError::Query(format!(
3646 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3647 ))),
3648 }
3649 }
3650 other => Err(RedDBError::Query(format!(
3651 "column '{name}' expected integer or node label, got {other:?}"
3652 ))),
3653 }
3654}
3655
3656fn resolve_edge_endpoint_any(
3657 store: &crate::storage::unified::UnifiedStore,
3658 collection: &str,
3659 columns: &[String],
3660 values: &[Value],
3661 names: &[&str],
3662) -> RedDBResult<u64> {
3663 for name in names {
3664 if columns
3665 .iter()
3666 .any(|column| column.eq_ignore_ascii_case(name))
3667 {
3668 return resolve_edge_endpoint(store, collection, columns, values, name);
3669 }
3670 }
3671
3672 Err(RedDBError::Query(format!(
3673 "required column '{}' not found in INSERT",
3674 names.first().copied().unwrap_or("from_rid")
3675 )))
3676}
3677
3678fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3680 let val = find_column_value(columns, values, name)?;
3681 match val {
3682 Value::Integer(n) => Ok(n as u64),
3683 Value::UnsignedInteger(n) => Ok(n),
3684 Value::Text(s) => s
3685 .parse::<u64>()
3686 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3687 other => Err(RedDBError::Query(format!(
3688 "column '{name}' expected integer, got {other:?}"
3689 ))),
3690 }
3691}
3692
3693fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3695 for (i, col) in columns.iter().enumerate() {
3696 if col.eq_ignore_ascii_case(name) {
3697 return match &values[i] {
3698 Value::Float(n) => Some(*n as f32),
3699 Value::Integer(n) => Some(*n as f32),
3700 Value::Null => None,
3701 _ => None,
3702 };
3703 }
3704 }
3705 None
3706}
3707
3708fn find_column_value_vec_f32(
3710 columns: &[String],
3711 values: &[Value],
3712 name: &str,
3713) -> RedDBResult<Vec<f32>> {
3714 let val = find_column_value(columns, values, name)?;
3715 match val {
3716 Value::Vector(v) => Ok(v),
3717 Value::Json(bytes) => {
3718 let s = std::str::from_utf8(&bytes).map_err(|_| {
3720 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3721 })?;
3722 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3723 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3724 })?;
3725 Ok(arr)
3726 }
3727 other => Err(RedDBError::Query(format!(
3728 "column '{name}' expected vector, got {other:?}"
3729 ))),
3730 }
3731}
3732
3733fn find_column_value_vec_f32_any(
3734 columns: &[String],
3735 values: &[Value],
3736 names: &[&str],
3737) -> RedDBResult<Vec<f32>> {
3738 for name in names {
3739 if columns
3740 .iter()
3741 .any(|column| column.eq_ignore_ascii_case(name))
3742 {
3743 return find_column_value_vec_f32(columns, values, name);
3744 }
3745 }
3746 Err(RedDBError::Query(format!(
3747 "required vector column '{}' not found in INSERT",
3748 names.join("' or '")
3749 )))
3750}
3751
3752fn extract_remaining_properties(
3754 columns: &[String],
3755 values: &[Value],
3756 exclude: &[&str],
3757) -> Vec<(String, Value)> {
3758 columns
3759 .iter()
3760 .zip(values.iter())
3761 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3762 .map(|(col, val)| (col.clone(), val.clone()))
3763 .collect()
3764}
3765
3766fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3767 let mut invalid = Vec::new();
3768 for column in columns {
3769 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3770 invalid.push(column.clone());
3771 }
3772 }
3773
3774 if invalid.is_empty() {
3775 Ok(())
3776 } else {
3777 Err(RedDBError::Query(format!(
3778 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3779 invalid.join(", ")
3780 )))
3781 }
3782}
3783
3784fn is_timeseries_insert_column(column: &str) -> bool {
3785 matches!(
3786 column.to_ascii_lowercase().as_str(),
3787 "metric"
3788 | "value"
3789 | "tags"
3790 | "timestamp"
3791 | "timestamp_ns"
3792 | "time"
3793 | "event_name"
3798 | "payload"
3799 )
3800}
3801
3802fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3803 let mut found = None;
3804
3805 for alias in ["timestamp_ns", "timestamp", "time"] {
3806 for (index, column) in columns.iter().enumerate() {
3807 if !column.eq_ignore_ascii_case(alias) {
3808 continue;
3809 }
3810
3811 if found.is_some() {
3812 return Err(RedDBError::Query(
3813 "timeseries INSERT accepts only one timestamp column".to_string(),
3814 ));
3815 }
3816
3817 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3818 }
3819 }
3820
3821 Ok(found)
3822}
3823
3824fn find_timeseries_tags(
3825 columns: &[String],
3826 values: &[Value],
3827) -> RedDBResult<std::collections::HashMap<String, String>> {
3828 for (index, column) in columns.iter().enumerate() {
3829 if column.eq_ignore_ascii_case("tags") {
3830 return parse_timeseries_tags(&values[index]);
3831 }
3832 }
3833 Ok(std::collections::HashMap::new())
3834}
3835
3836fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3837 match value {
3838 Value::Null => Ok(std::collections::HashMap::new()),
3839 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3840 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3841 other => Err(RedDBError::Query(format!(
3842 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3843 ))),
3844 }
3845}
3846
3847fn parse_timeseries_tags_json(
3848 bytes: &[u8],
3849) -> RedDBResult<std::collections::HashMap<String, String>> {
3850 let json: crate::json::Value = crate::json::from_slice(bytes)
3851 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3852
3853 let object = match json {
3854 crate::json::Value::Object(object) => object,
3855 other => {
3856 return Err(RedDBError::Query(format!(
3857 "timeseries tags must be a JSON object, got {other:?}"
3858 )))
3859 }
3860 };
3861
3862 let mut tags = std::collections::HashMap::with_capacity(object.len());
3863 for (key, value) in object {
3864 tags.insert(key, json_tag_value_to_string(&value));
3865 }
3866 Ok(tags)
3867}
3868
3869fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3885 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3886 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3887 buf.push_str(&value.to_string_compact());
3888 buf
3889}
3890
3891fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3892 match value {
3893 Value::UnsignedInteger(value) => Ok(*value),
3894 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3895 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3896 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3897 RedDBError::Query(format!(
3898 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3899 ))
3900 }),
3901 other => Err(RedDBError::Query(format!(
3902 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3903 ))),
3904 }
3905}
3906
3907fn current_unix_ns() -> u64 {
3908 std::time::SystemTime::now()
3909 .duration_since(std::time::UNIX_EPOCH)
3910 .unwrap_or_default()
3911 .as_nanos()
3912 .min(u128::from(u64::MAX)) as u64
3913}
3914
3915fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3916 use crate::json::{Map, Value as JV};
3917 match value {
3918 MetadataValue::Null => JV::Null,
3919 MetadataValue::Bool(value) => JV::Bool(*value),
3920 MetadataValue::Int(value) => JV::Number(*value as f64),
3921 MetadataValue::Float(value) => JV::Number(*value),
3922 MetadataValue::String(value) => JV::String(value.clone()),
3923 MetadataValue::Bytes(value) => JV::Array(
3924 value
3925 .iter()
3926 .map(|value| JV::Number(*value as f64))
3927 .collect(),
3928 ),
3929 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3930 MetadataValue::Array(values) => {
3931 JV::Array(values.iter().map(metadata_value_to_json).collect())
3932 }
3933 MetadataValue::Object(object) => {
3934 let entries = object
3935 .iter()
3936 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3937 .collect();
3938 JV::Object(entries)
3939 }
3940 MetadataValue::Geo { lat, lon } => {
3941 let mut object = Map::new();
3942 object.insert("lat".to_string(), JV::Number(*lat));
3943 object.insert("lon".to_string(), JV::Number(*lon));
3944 JV::Object(object)
3945 }
3946 MetadataValue::Reference(target) => {
3947 let mut object = Map::new();
3948 object.insert(
3949 "collection".to_string(),
3950 JV::String(target.collection().to_string()),
3951 );
3952 object.insert(
3953 "entity_id".to_string(),
3954 JV::Number(target.entity_id().raw() as f64),
3955 );
3956 JV::Object(object)
3957 }
3958 MetadataValue::References(values) => {
3959 let refs = values
3960 .iter()
3961 .map(|target| {
3962 let mut object = Map::new();
3963 object.insert(
3964 "collection".to_string(),
3965 JV::String(target.collection().to_string()),
3966 );
3967 object.insert(
3968 "entity_id".to_string(),
3969 JV::Number(target.entity_id().raw() as f64),
3970 );
3971 JV::Object(object)
3972 })
3973 .collect();
3974 JV::Array(refs)
3975 }
3976 }
3977}
3978
3979fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3980 match value {
3981 Value::Null => MetadataValue::Null,
3982 Value::Boolean(value) => MetadataValue::Bool(*value),
3983 Value::Integer(value) => MetadataValue::Int(*value),
3984 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3985 Value::Float(value) => MetadataValue::Float(*value),
3986 Value::Text(value) => MetadataValue::String(value.to_string()),
3987 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3988 Value::Timestamp(value) => {
3989 if *value >= 0 {
3990 metadata_u64_to_value(*value as u64)
3991 } else {
3992 MetadataValue::Int(*value)
3993 }
3994 }
3995 Value::TimestampMs(value) => {
3996 if *value >= 0 {
3997 metadata_u64_to_value(*value as u64)
3998 } else {
3999 MetadataValue::Int(*value)
4000 }
4001 }
4002 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
4003 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
4004 Value::Date(value) => MetadataValue::String(value.to_string()),
4005 Value::Time(value) => MetadataValue::String(value.to_string()),
4006 Value::Decimal(value) => MetadataValue::String(value.to_string()),
4007 Value::Ipv4(value) => MetadataValue::String(format!(
4008 "{}.{}.{}.{}",
4009 (value >> 24) & 0xFF,
4010 (value >> 16) & 0xFF,
4011 (value >> 8) & 0xFF,
4012 value & 0xFF
4013 )),
4014 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
4015 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
4016 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
4017 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
4018 lat: *lat as f64 / 1_000_000.0,
4019 lon: *lon as f64 / 1_000_000.0,
4020 },
4021 Value::BigInt(value) => MetadataValue::String(value.to_string()),
4022 Value::TableRef(value) => MetadataValue::String(value.clone()),
4023 Value::PageRef(value) => MetadataValue::Int(*value as i64),
4024 Value::Password(value) => MetadataValue::String(value.clone()),
4025 Value::Array(values) => {
4026 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
4027 }
4028 _ => MetadataValue::String(value.to_string()),
4029 }
4030}
4031
4032fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
4033 match value {
4034 Value::Null => Ok(MetadataValue::Null),
4035 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
4036 Value::Integer(_) => Err(RedDBError::Query(format!(
4037 "column '{field}' must be non-negative for TTL metadata"
4038 ))),
4039 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
4040 Value::Float(value) if value.is_finite() => {
4041 if value.fract().abs() >= f64::EPSILON {
4042 return Err(RedDBError::Query(format!(
4043 "column '{field}' must be an integer (TTL metadata must be an integer)"
4044 )));
4045 }
4046 if *value < 0.0 {
4047 return Err(RedDBError::Query(format!(
4048 "column '{field}' must be non-negative for TTL metadata"
4049 )));
4050 }
4051 if *value > u64::MAX as f64 {
4052 return Err(RedDBError::Query(format!(
4053 "column '{field}' value is too large"
4054 )));
4055 }
4056 Ok(metadata_u64_to_value(*value as u64))
4057 }
4058 Value::Float(_) => Err(RedDBError::Query(format!(
4059 "column '{field}' must be a finite number"
4060 ))),
4061 Value::Text(value) => {
4062 let value = value.trim();
4063 if let Ok(value) = value.parse::<u64>() {
4064 Ok(metadata_u64_to_value(value))
4065 } else if let Ok(value) = value.parse::<i64>() {
4066 if value < 0 {
4067 return Err(RedDBError::Query(format!(
4068 "column '{field}' must be non-negative for TTL metadata"
4069 )));
4070 }
4071 Ok(metadata_u64_to_value(value as u64))
4072 } else if let Ok(value) = value.parse::<f64>() {
4073 if !value.is_finite() {
4074 return Err(RedDBError::Query(format!(
4075 "column '{field}' must be a finite number"
4076 )));
4077 }
4078 if value.fract().abs() >= f64::EPSILON {
4079 return Err(RedDBError::Query(format!(
4080 "column '{field}' must be an integer (TTL metadata must be an integer)"
4081 )));
4082 }
4083 if value < 0.0 {
4084 return Err(RedDBError::Query(format!(
4085 "column '{field}' must be non-negative for TTL metadata"
4086 )));
4087 }
4088 if value > u64::MAX as f64 {
4089 return Err(RedDBError::Query(format!(
4090 "column '{field}' value is too large"
4091 )));
4092 }
4093 Ok(metadata_u64_to_value(value as u64))
4094 } else {
4095 Err(RedDBError::Query(format!(
4096 "column '{field}' expects a numeric value for TTL metadata"
4097 )))
4098 }
4099 }
4100 _ => Err(RedDBError::Query(format!(
4101 "column '{field}' expects a numeric value for TTL metadata"
4102 ))),
4103 }
4104}
4105
4106fn metadata_u64_to_value(value: u64) -> MetadataValue {
4107 if value <= i64::MAX as u64 {
4108 MetadataValue::Int(value as i64)
4109 } else {
4110 MetadataValue::Timestamp(value)
4111 }
4112}
4113
4114fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4120 let json = match value {
4121 Value::Null => return false,
4122 Value::Json(bytes) | Value::Blob(bytes) => {
4123 match crate::json::from_slice::<crate::json::Value>(bytes) {
4124 Ok(v) => v,
4125 Err(_) => return false,
4126 }
4127 }
4128 Value::Text(s) => {
4129 let trimmed = s.trim_start();
4130 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4131 return false;
4132 }
4133 match crate::json::from_str::<crate::json::Value>(s) {
4134 Ok(v) => v,
4135 Err(_) => return false,
4136 }
4137 }
4138 _ => return false,
4139 };
4140 let mut cursor = &json;
4141 for seg in tail.split('.') {
4142 match cursor {
4143 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4144 Some((_, v)) => cursor = v,
4145 None => return false,
4146 },
4147 _ => return false,
4148 }
4149 }
4150 !matches!(cursor, crate::json::Value::Null)
4151}
4152
4153fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4164 let mut root = match current {
4165 Value::Null => crate::json::Value::Object(Default::default()),
4166 Value::Json(bytes) | Value::Blob(bytes) => {
4167 crate::json::from_slice(&bytes).map_err(|err| {
4168 RedDBError::Query(format!(
4169 "tenant auto-fill: root column is not valid JSON ({err})"
4170 ))
4171 })?
4172 }
4173 Value::Text(s) => {
4174 if s.trim().is_empty() {
4175 crate::json::Value::Object(Default::default())
4176 } else {
4177 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4178 RedDBError::Query(format!(
4179 "tenant auto-fill: text root is not valid JSON ({err})"
4180 ))
4181 })?
4182 }
4183 }
4184 other => {
4185 return Err(RedDBError::Query(format!(
4186 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4187 )));
4188 }
4189 };
4190
4191 let segments: Vec<&str> = tail.split('.').collect();
4193 let mut cursor: &mut crate::json::Value = &mut root;
4194 for (i, seg) in segments.iter().enumerate() {
4195 let is_last = i + 1 == segments.len();
4196 let map = match cursor {
4197 crate::json::Value::Object(m) => m,
4198 _ => {
4199 return Err(RedDBError::Query(format!(
4200 "tenant auto-fill: segment '{seg}' is not inside an object"
4201 )));
4202 }
4203 };
4204 if is_last {
4205 map.insert(
4206 seg.to_string(),
4207 crate::json::Value::String(tenant_id.to_string()),
4208 );
4209 break;
4210 }
4211 cursor = map
4212 .entry(seg.to_string())
4213 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4214 }
4215
4216 let bytes = crate::json::to_vec(&root).map_err(|err| {
4217 RedDBError::Query(format!(
4218 "tenant auto-fill: failed to re-serialize JSON ({err})"
4219 ))
4220 })?;
4221 Ok(Value::Json(bytes))
4222}
4223
4224#[cfg(test)]
4225mod tests {
4226 use crate::storage::schema::Value;
4227 use crate::storage::wal::{WalReader, WalRecord};
4228 use crate::storage::{DeployProfile, StoragePackaging, StorageProfileSelection};
4229 use crate::{RedDBOptions, RedDBRuntime};
4230 use std::path::Path;
4231
4232 fn persistent_operational_options(path: &Path) -> RedDBOptions {
4233 RedDBOptions::persistent(path)
4234 .with_storage_profile(StorageProfileSelection {
4235 deploy_profile: DeployProfile::Embedded,
4236 packaging: StoragePackaging::OperationalDirectory,
4237 replica_count: 0,
4238 managed_backup: false,
4239 wal_retention: false,
4240 })
4241 .unwrap()
4242 }
4243
4244 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4245 WalReader::open(wal_path)
4246 .expect("wal opens")
4247 .iter()
4248 .map(|record| record.expect("wal record decodes").1)
4249 .filter_map(|record| match record {
4250 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4251 _ => None,
4252 })
4253 .collect()
4254 }
4255
4256 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4257 action
4258 .windows(needle.len())
4259 .any(|window| window == needle.as_bytes())
4260 }
4261
4262 fn assert_statement_writes_collections_in_one_new_wal_batch(
4263 rt: &RedDBRuntime,
4264 wal_path: &Path,
4265 statement: &str,
4266 source: &str,
4267 event_queue: &str,
4268 ) {
4269 let before_batches = store_commit_batches(wal_path).len();
4270
4271 rt.execute_query(statement).unwrap();
4272
4273 let batches = store_commit_batches(wal_path);
4274 let statement_batches = &batches[before_batches..];
4275 let source_batch = statement_batches
4276 .iter()
4277 .position(|actions| {
4278 actions.iter().any(|action| {
4279 action_contains_text(action, source)
4280 && !action_contains_text(action, event_queue)
4281 })
4282 })
4283 .expect("source collection write batch is present");
4284 let event_batch = statement_batches
4285 .iter()
4286 .position(|actions| {
4287 actions
4288 .iter()
4289 .any(|action| action_contains_text(action, event_queue))
4290 })
4291 .expect("event queue write batch is present");
4292
4293 assert_eq!(
4294 source_batch, event_batch,
4295 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4296 );
4297 }
4298
4299 #[test]
4300 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4301 let dir = tempfile::tempdir().unwrap();
4302 let db_path = dir.path().join("events_dual_write.rdb");
4303 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4304 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4305
4306 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4307 .unwrap();
4308 assert_statement_writes_collections_in_one_new_wal_batch(
4309 &rt,
4310 &wal_path,
4311 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4312 "users",
4313 "users_events",
4314 );
4315 }
4316
4317 #[test]
4318 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4319 let dir = tempfile::tempdir().unwrap();
4320 let db_path = dir.path().join("events_update_atomic.rdb");
4321 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4322 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4323
4324 rt.execute_query(
4325 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4326 )
4327 .unwrap();
4328 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4329 .unwrap();
4330
4331 assert_statement_writes_collections_in_one_new_wal_batch(
4332 &rt,
4333 &wal_path,
4334 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4335 "users",
4336 "user_updates",
4337 );
4338 }
4339
4340 #[test]
4341 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4342 let dir = tempfile::tempdir().unwrap();
4343 let db_path = dir.path().join("events_delete_atomic.rdb");
4344 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4345 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4346
4347 rt.execute_query(
4348 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4349 )
4350 .unwrap();
4351 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4352 .unwrap();
4353
4354 assert_statement_writes_collections_in_one_new_wal_batch(
4355 &rt,
4356 &wal_path,
4357 "DELETE FROM users WHERE id = 1",
4358 "users",
4359 "user_deletes",
4360 );
4361 }
4362
4363 #[test]
4364 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4365 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4366 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4367 .unwrap();
4368 for id in 0..5 {
4369 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4370 .unwrap();
4371 }
4372 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4373 .unwrap();
4374
4375 let updated = rt
4376 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4377 .unwrap();
4378 assert_eq!(updated.affected_rows, 3);
4379
4380 let selected = rt
4381 .execute_query("SELECT id, score FROM users ORDER BY id")
4382 .unwrap();
4383 let scores: Vec<(i64, i64)> = selected
4384 .result
4385 .records
4386 .iter()
4387 .map(|record| {
4388 let id = match record.get("id").unwrap() {
4389 Value::Integer(value) => *value,
4390 other => panic!("expected integer id, got {other:?}"),
4391 };
4392 let score = match record.get("score").unwrap() {
4393 Value::Integer(value) => *value,
4394 other => panic!("expected integer score, got {other:?}"),
4395 };
4396 (id, score)
4397 })
4398 .collect();
4399 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4400 }
4401
4402 #[test]
4409 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4410 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4411 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4412 .unwrap();
4413 for id in 0..5 {
4414 rt.execute_query(&format!(
4415 "INSERT INTO items (id, score) VALUES ({id}, {})",
4416 id * 10
4417 ))
4418 .unwrap();
4419 }
4420 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4421 .unwrap();
4422
4423 let updated_one = rt
4427 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4428 .unwrap();
4429 assert_eq!(updated_one.affected_rows, 1);
4430
4431 let updated_many = rt
4435 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4436 .unwrap();
4437 assert_eq!(updated_many.affected_rows, 2);
4438
4439 let snapshot = rt
4440 .execute_query("SELECT id, score FROM items ORDER BY id")
4441 .unwrap();
4442 let pairs: Vec<(i64, i64)> = snapshot
4443 .result
4444 .records
4445 .iter()
4446 .map(|record| {
4447 let id = match record.get("id").unwrap() {
4448 Value::Integer(value) => *value,
4449 other => panic!("expected integer id, got {other:?}"),
4450 };
4451 let score = match record.get("score").unwrap() {
4452 Value::Integer(value) => *value,
4453 other => panic!("expected integer score, got {other:?}"),
4454 };
4455 (id, score)
4456 })
4457 .collect();
4458 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4459
4460 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4462 assert_eq!(updated_all.affected_rows, 5);
4463 let after = rt
4464 .execute_query("SELECT score FROM items ORDER BY id")
4465 .unwrap();
4466 let scores: Vec<i64> = after
4467 .result
4468 .records
4469 .iter()
4470 .map(|record| match record.get("score").unwrap() {
4471 Value::Integer(value) => *value,
4472 other => panic!("expected integer score, got {other:?}"),
4473 })
4474 .collect();
4475 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4476 }
4477
4478 #[test]
4484 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4485 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4486 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4487 .unwrap();
4488 for id in 0..5 {
4489 rt.execute_query(&format!(
4490 "INSERT INTO items (id, score) VALUES ({id}, {})",
4491 id * 10
4492 ))
4493 .unwrap();
4494 }
4495 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4496 .unwrap();
4497
4498 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4501 assert_eq!(deleted_one.affected_rows, 1);
4502
4503 let deleted_many = rt
4507 .execute_query("DELETE FROM items WHERE score > 25")
4508 .unwrap();
4509 assert_eq!(deleted_many.affected_rows, 2);
4510
4511 let surviving = rt
4512 .execute_query("SELECT id FROM items ORDER BY id")
4513 .unwrap();
4514 let ids: Vec<i64> = surviving
4515 .result
4516 .records
4517 .iter()
4518 .map(|record| match record.get("id").unwrap() {
4519 Value::Integer(value) => *value,
4520 other => panic!("expected integer id, got {other:?}"),
4521 })
4522 .collect();
4523 assert_eq!(ids, vec![0, 1]);
4524
4525 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4527 assert_eq!(deleted_rest.affected_rows, 2);
4528 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4529 assert!(empty.result.records.is_empty());
4530 }
4531
4532 #[test]
4537 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4538 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4539 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4540 .unwrap();
4541
4542 let inserted = rt
4545 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4546 .unwrap();
4547 assert_eq!(inserted.affected_rows, 1);
4548
4549 let update_err = rt
4551 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4552 .unwrap_err();
4553 let msg = format!("{update_err}");
4554 assert!(
4555 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4556 "expected UPDATE rejection message, got: {msg}"
4557 );
4558
4559 let delete_err = rt
4561 .execute_query("DELETE FROM events WHERE id = 1")
4562 .unwrap_err();
4563 let msg = format!("{delete_err}");
4564 assert!(
4565 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4566 "expected DELETE rejection message, got: {msg}"
4567 );
4568
4569 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4572 assert_eq!(surviving.result.records.len(), 1);
4573 }
4574
4575 #[test]
4579 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4580 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4581 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4582 .unwrap();
4583
4584 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4585 .unwrap();
4586 let updated = rt
4587 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4588 .unwrap();
4589 assert_eq!(updated.affected_rows, 1);
4590 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4591 assert_eq!(deleted.affected_rows, 1);
4592 }
4593
4594 #[test]
4595 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4596 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4597 rt.execute_query(
4598 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4599 )
4600 .unwrap();
4601
4602 let inserted = rt
4603 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4604 .unwrap();
4605 assert_eq!(inserted.affected_rows, 1);
4606
4607 let events = queue_payloads(&rt, "audit_log");
4608 assert_eq!(events.len(), 1);
4609 let event = events[0].as_object().expect("event payload object");
4610 assert!(event
4611 .get("event_id")
4612 .and_then(crate::json::Value::as_str)
4613 .is_some_and(|value| !value.is_empty()));
4614 assert_eq!(
4615 event.get("op").and_then(crate::json::Value::as_str),
4616 Some("insert")
4617 );
4618 assert_eq!(
4619 event.get("collection").and_then(crate::json::Value::as_str),
4620 Some("users")
4621 );
4622 assert_eq!(
4623 event.get("id").and_then(crate::json::Value::as_u64),
4624 Some(7)
4625 );
4626 assert!(event
4627 .get("ts")
4628 .and_then(crate::json::Value::as_u64)
4629 .is_some());
4630 assert!(event
4631 .get("lsn")
4632 .and_then(crate::json::Value::as_u64)
4633 .is_some());
4634 assert!(matches!(
4635 event.get("tenant"),
4636 Some(crate::json::Value::Null)
4637 ));
4638 assert!(matches!(
4639 event.get("before"),
4640 Some(crate::json::Value::Null)
4641 ));
4642 let after = event
4643 .get("after")
4644 .and_then(crate::json::Value::as_object)
4645 .expect("after object");
4646 assert_eq!(
4647 after.get("id").and_then(crate::json::Value::as_u64),
4648 Some(7)
4649 );
4650 assert_eq!(
4651 after.get("email").and_then(crate::json::Value::as_str),
4652 Some("a@example.com")
4653 );
4654 }
4655
4656 #[test]
4657 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4658 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4659 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4660 .unwrap();
4661
4662 rt.execute_query(
4663 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4664 )
4665 .unwrap();
4666
4667 let events = queue_payloads(&rt, "users_events");
4668 assert_eq!(events.len(), 2);
4669 let mut previous_lsn = 0;
4670 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4671 let object = event.as_object().expect("event payload object");
4672 assert_eq!(
4673 object.get("op").and_then(crate::json::Value::as_str),
4674 Some("insert")
4675 );
4676 assert_eq!(
4677 object.get("id").and_then(crate::json::Value::as_u64),
4678 Some(expected_id)
4679 );
4680 let lsn = object
4681 .get("lsn")
4682 .and_then(crate::json::Value::as_u64)
4683 .expect("event lsn");
4684 assert!(
4685 lsn > previous_lsn,
4686 "event LSNs should increase in row order"
4687 );
4688 previous_lsn = lsn;
4689 let after = object
4690 .get("after")
4691 .and_then(crate::json::Value::as_object)
4692 .expect("after object");
4693 assert_eq!(
4694 after.get("id").and_then(crate::json::Value::as_u64),
4695 Some(expected_id)
4696 );
4697 }
4698 }
4699
4700 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4701 let result = rt
4702 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4703 .expect("peek queue");
4704 result
4705 .result
4706 .records
4707 .iter()
4708 .map(
4709 |record| match record.get("payload").expect("payload column") {
4710 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4711 other => panic!("expected JSON queue payload, got {other:?}"),
4712 },
4713 )
4714 .collect()
4715 }
4716
4717 #[test]
4729 fn auto_index_id_fires_on_first_insert() {
4730 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4731 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4732 .unwrap();
4733
4734 assert!(
4736 rt.index_store_ref()
4737 .find_index_for_column("bench_users", "id")
4738 .is_none(),
4739 "freshly created collection should not have an `id` index"
4740 );
4741
4742 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4744 .unwrap();
4745
4746 let registered = rt
4748 .index_store_ref()
4749 .find_index_for_column("bench_users", "id")
4750 .expect("auto-index hook should have registered idx_id on first insert");
4751 assert_eq!(registered.name, "idx_id");
4752 assert_eq!(registered.collection, "bench_users");
4753 assert_eq!(registered.columns, vec!["id".to_string()]);
4754 assert!(matches!(
4755 registered.method,
4756 super::super::index_store::IndexMethodKind::Hash
4757 ));
4758
4759 for id in 2..=5 {
4762 rt.execute_query(&format!(
4763 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4764 id * 10
4765 ))
4766 .unwrap();
4767 }
4768 for id in 1..=5 {
4769 let result = rt
4770 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4771 .unwrap();
4772 assert_eq!(
4773 result.result.records.len(),
4774 1,
4775 "id={id} should match one row"
4776 );
4777 }
4778
4779 let deleted = rt
4784 .execute_query("DELETE FROM bench_users WHERE id = 3")
4785 .unwrap();
4786 assert_eq!(deleted.affected_rows, 1);
4787 }
4788
4789 #[test]
4794 fn auto_index_id_fires_on_first_bulk_insert() {
4795 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4796 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4797 .unwrap();
4798
4799 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4800 .unwrap();
4801
4802 let registered = rt
4803 .index_store_ref()
4804 .find_index_for_column("bench_bulk", "id")
4805 .expect("auto-index hook should fire on first bulk insert");
4806 assert_eq!(registered.name, "idx_id");
4807
4808 for id in 1..=3 {
4810 let result = rt
4811 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4812 .unwrap();
4813 assert_eq!(result.result.records.len(), 1);
4814 }
4815 }
4816
4817 #[test]
4821 fn auto_index_id_skips_when_no_id_column() {
4822 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4823 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4824 .unwrap();
4825 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4826 .unwrap();
4827
4828 assert!(rt
4829 .index_store_ref()
4830 .find_index_for_column("plain", "id")
4831 .is_none());
4832 assert!(rt
4833 .index_store_ref()
4834 .find_index_for_column("plain", "uid")
4835 .is_none());
4836 }
4837
4838 #[test]
4843 fn auto_index_id_skips_when_index_already_exists() {
4844 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4845 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4846 .unwrap();
4847 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4849 .unwrap();
4850 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4851 .unwrap();
4852
4853 let registered = rt
4854 .index_store_ref()
4855 .find_index_for_column("pre", "id")
4856 .expect("user index should still be there");
4857 assert_eq!(
4858 registered.name, "user_idx",
4859 "auto-index hook must not overwrite an existing index"
4860 );
4861 }
4862
4863 #[test]
4867 fn auto_index_id_dropped_with_collection() {
4868 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4869 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4870 .unwrap();
4871 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4872 .unwrap();
4873 assert!(rt
4874 .index_store_ref()
4875 .find_index_for_column("ephemeral", "id")
4876 .is_some());
4877
4878 rt.execute_query("DROP TABLE ephemeral").unwrap();
4879
4880 assert!(
4881 rt.index_store_ref()
4882 .find_index_for_column("ephemeral", "id")
4883 .is_none(),
4884 "implicit `idx_id` must be reaped when its collection drops"
4885 );
4886 }
4887
4888 #[test]
4893 fn auto_index_id_disabled_by_config() {
4894 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4895 let rt = RedDBRuntime::with_options(opts).unwrap();
4896
4897 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4898 .unwrap();
4899 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4900 .unwrap();
4901
4902 assert!(
4903 rt.index_store_ref()
4904 .find_index_for_column("off", "id")
4905 .is_none(),
4906 "with auto_index_id=false, no implicit index should be created"
4907 );
4908 }
4909
4910 #[test]
4913 fn update_single_row_emits_update_event() {
4914 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4915 rt.execute_query(
4916 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4917 )
4918 .unwrap();
4919 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4920 .unwrap();
4921
4922 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4923 .unwrap();
4924
4925 let events = queue_payloads(&rt, "audit_log");
4926 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4927 let event = events[0].as_object().expect("event payload object");
4928 assert_eq!(
4929 event.get("op").and_then(crate::json::Value::as_str),
4930 Some("update")
4931 );
4932 assert_eq!(
4933 event.get("collection").and_then(crate::json::Value::as_str),
4934 Some("users")
4935 );
4936 assert!(event
4937 .get("event_id")
4938 .and_then(crate::json::Value::as_str)
4939 .is_some_and(|v| !v.is_empty()));
4940 let before = event
4941 .get("before")
4942 .and_then(crate::json::Value::as_object)
4943 .expect("before must be an object");
4944 let after = event
4945 .get("after")
4946 .and_then(crate::json::Value::as_object)
4947 .expect("after must be an object");
4948 assert_eq!(
4949 before.get("name").and_then(crate::json::Value::as_str),
4950 Some("Alice"),
4951 "before.name should be the old value"
4952 );
4953 assert_eq!(
4954 after.get("name").and_then(crate::json::Value::as_str),
4955 Some("Bob"),
4956 "after.name should be the new value"
4957 );
4958 }
4959
4960 #[test]
4961 fn update_event_only_includes_changed_fields() {
4962 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4963 rt.execute_query(
4964 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4965 )
4966 .unwrap();
4967 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4968 .unwrap();
4969
4970 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4971 .unwrap();
4972
4973 let events = queue_payloads(&rt, "evts");
4974 assert_eq!(events.len(), 1);
4975 let event = events[0].as_object().unwrap();
4976 let before = event
4977 .get("before")
4978 .and_then(crate::json::Value::as_object)
4979 .unwrap();
4980 let after = event
4981 .get("after")
4982 .and_then(crate::json::Value::as_object)
4983 .unwrap();
4984 assert!(
4986 before.contains_key("name"),
4987 "before must include changed field"
4988 );
4989 assert!(
4990 after.contains_key("name"),
4991 "after must include changed field"
4992 );
4993 assert!(
4995 !before.contains_key("email"),
4996 "before must not include unchanged email"
4997 );
4998 assert!(
4999 !after.contains_key("email"),
5000 "after must not include unchanged email"
5001 );
5002 }
5003
5004 #[test]
5005 fn multi_row_update_emits_one_event_per_row() {
5006 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5007 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
5008 .unwrap();
5009 rt.execute_query(
5010 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
5011 )
5012 .unwrap();
5013
5014 rt.execute_query("UPDATE items SET status = 'done'")
5015 .unwrap();
5016
5017 let events = queue_payloads(&rt, "evts");
5018 assert_eq!(events.len(), 3, "expected one update event per row");
5019 for event in &events {
5020 let obj = event.as_object().unwrap();
5021 assert_eq!(
5022 obj.get("op").and_then(crate::json::Value::as_str),
5023 Some("update")
5024 );
5025 }
5026 }
5027
5028 #[test]
5029 fn delete_single_row_emits_delete_event() {
5030 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5031 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
5032 .unwrap();
5033 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
5034 .unwrap();
5035
5036 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
5037
5038 let events = queue_payloads(&rt, "del_log");
5039 assert_eq!(events.len(), 1);
5040 let event = events[0].as_object().expect("event payload object");
5041 assert_eq!(
5042 event.get("op").and_then(crate::json::Value::as_str),
5043 Some("delete")
5044 );
5045 assert_eq!(
5046 event.get("collection").and_then(crate::json::Value::as_str),
5047 Some("users")
5048 );
5049 assert!(event
5050 .get("event_id")
5051 .and_then(crate::json::Value::as_str)
5052 .is_some_and(|v| !v.is_empty()));
5053 let before = event
5054 .get("before")
5055 .and_then(crate::json::Value::as_object)
5056 .expect("before must be an object for delete");
5057 assert_eq!(
5058 before.get("id").and_then(crate::json::Value::as_u64),
5059 Some(42)
5060 );
5061 assert_eq!(
5062 before.get("name").and_then(crate::json::Value::as_str),
5063 Some("Alice")
5064 );
5065 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5066 }
5067
5068 #[test]
5069 fn multi_row_delete_emits_one_event_per_row() {
5070 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5071 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5072 .unwrap();
5073 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5074 .unwrap();
5075
5076 rt.execute_query("DELETE FROM items").unwrap();
5077
5078 let events = queue_payloads(&rt, "del_log");
5079 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5080 for event in &events {
5081 let obj = event.as_object().unwrap();
5082 assert_eq!(
5083 obj.get("op").and_then(crate::json::Value::as_str),
5084 Some("delete")
5085 );
5086 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5087 }
5088 }
5089
5090 #[test]
5091 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5092 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5093 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5094 .unwrap();
5095
5096 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5097 .unwrap();
5098 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5099
5100 let events = queue_payloads(&rt, "evts");
5101 assert!(
5102 events.is_empty(),
5103 "UPDATE-only filter must not emit INSERT or DELETE events"
5104 );
5105 }
5106
5107 #[test]
5110 fn suppress_events_on_insert_emits_no_events() {
5111 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5112 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5113 .unwrap();
5114
5115 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5116 .unwrap();
5117
5118 let events = queue_payloads(&rt, "evts");
5119 assert!(
5120 events.is_empty(),
5121 "SUPPRESS EVENTS must prevent INSERT events"
5122 );
5123 }
5124
5125 #[test]
5126 fn suppress_events_on_update_emits_no_events() {
5127 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5128 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5129 .unwrap();
5130 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5131 .unwrap();
5132 let _ = queue_payloads(&rt, "evts");
5134 rt.execute_query("QUEUE PURGE evts").unwrap();
5136
5137 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5138 .unwrap();
5139
5140 let events = queue_payloads(&rt, "evts");
5141 assert!(
5142 events.is_empty(),
5143 "SUPPRESS EVENTS must prevent UPDATE events"
5144 );
5145 }
5146
5147 #[test]
5148 fn suppress_events_on_delete_emits_no_events() {
5149 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5150 rt.execute_query(
5151 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5152 )
5153 .unwrap();
5154 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5155 .unwrap();
5156
5157 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5158 .unwrap();
5159
5160 let events = queue_payloads(&rt, "evts");
5161 assert!(
5162 events.is_empty(),
5163 "SUPPRESS EVENTS must prevent DELETE events"
5164 );
5165 }
5166
5167 #[test]
5168 fn normal_insert_after_suppress_still_emits() {
5169 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5170 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5171 .unwrap();
5172
5173 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5174 .unwrap();
5175 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5176 .unwrap();
5177
5178 let events = queue_payloads(&rt, "evts");
5179 assert_eq!(
5180 events.len(),
5181 1,
5182 "only the non-suppressed INSERT should emit"
5183 );
5184 assert_eq!(
5185 events[0].get("id").and_then(crate::json::Value::as_u64),
5186 Some(2)
5187 );
5188 }
5189}