1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28 sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100 if !outcome.ok {
101 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102 self.inner
103 .chain_integrity_broken
104 .lock()
105 .insert(collection.to_string(), true);
106 }
107 Some(outcome)
108 }
109
110 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114 let store = self.inner.db.store();
115 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116 return false;
117 }
118 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119 self.inner
120 .chain_integrity_broken
121 .lock()
122 .insert(collection.to_string(), false);
123 true
124 }
125
126 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130 {
131 let cache = self.inner.chain_integrity_broken.lock();
132 if let Some(v) = cache.get(collection) {
133 return *v;
134 }
135 }
136 let store = self.inner.db.store();
137 let persisted =
138 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139 .unwrap_or(false);
140 self.inner
141 .chain_integrity_broken
142 .lock()
143 .insert(collection.to_string(), persisted);
144 persisted
145 }
146
147 fn ensure_integrity_tombstones_loaded(&self) -> bool {
152 use std::sync::atomic::Ordering;
153 match self
154 .inner
155 .integrity_tombstones_state
156 .load(Ordering::Relaxed)
157 {
158 1 => return false,
159 2 => return true,
160 _ => {}
161 }
162 let mut guard = self.inner.integrity_tombstones.lock();
165 if self
166 .inner
167 .integrity_tombstones_state
168 .load(Ordering::Relaxed)
169 == 0
170 {
171 let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172 let present = !ranges.is_empty();
173 *guard = ranges;
174 self.inner
175 .integrity_tombstones_state
176 .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177 }
178 self.inner
179 .integrity_tombstones_state
180 .load(Ordering::Relaxed)
181 == 2
182 }
183
184 pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190 use std::sync::atomic::Ordering;
191 self.ensure_integrity_tombstones_loaded();
192 let mut guard = self.inner.integrity_tombstones.lock();
193 guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194 table.to_string(),
195 lo,
196 hi,
197 ));
198 crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199 self.inner
200 .integrity_tombstones_state
201 .store(2, Ordering::Relaxed);
202 }
203
204 pub fn integrity_tombstone_ranges(
208 &self,
209 ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210 self.ensure_integrity_tombstones_loaded();
211 self.inner.integrity_tombstones.lock().clone()
212 }
213
214 pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219 if !self.ensure_integrity_tombstones_loaded() {
220 return;
221 }
222 let guard = self.inner.integrity_tombstones.lock();
223 if guard.is_empty() {
224 return;
225 }
226 let before = result.records.len();
227 result.records.retain(|record| {
228 !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229 });
230 if result.records.len() != before {
231 result.pre_serialized_json = None;
232 }
233 }
234
235 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249 let Some(tenant_col) = self.tenant_column(&query.table) else {
250 return Ok(None);
251 };
252 if query
254 .columns
255 .iter()
256 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257 {
258 return Ok(None);
259 }
260
261 if let Some(dot_pos) = tenant_col.find('.') {
267 let (root, tail) = tenant_col.split_at(dot_pos);
268 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
270 }
271
272 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273 return Err(RedDBError::Query(format!(
274 "INSERT into tenant-scoped table '{}' requires an active tenant — \
275 run SET TENANT '<id>' first or name column '{}' explicitly",
276 query.table, tenant_col
277 )));
278 };
279
280 let mut augmented = query.clone();
281 augmented.columns.push(tenant_col);
282 let lit = Value::text(tenant_id.clone());
283 for row in augmented.values.iter_mut() {
284 row.push(lit.clone());
285 }
286 for row in augmented.value_exprs.iter_mut() {
287 row.push(crate::storage::query::ast::Expr::Literal {
288 value: lit.clone(),
289 span: crate::storage::query::ast::Span::synthetic(),
290 });
291 }
292 Ok(Some(augmented))
293 }
294
295 fn inject_dotted_tenant(
305 &self,
306 query: &InsertQuery,
307 root: &str,
308 tail: &str,
309 ) -> RedDBResult<Option<InsertQuery>> {
310 let active_tenant = crate::runtime::impl_core::current_tenant();
311 let mut augmented = query.clone();
312 let root_idx = augmented
313 .columns
314 .iter()
315 .position(|c| c.eq_ignore_ascii_case(root));
316
317 if let Some(idx) = root_idx {
318 for row in augmented.values.iter_mut() {
324 let Some(slot) = row.get_mut(idx) else {
325 continue;
326 };
327 if dotted_tail_already_set(slot, tail) {
328 continue;
329 }
330 let Some(tenant_id) = &active_tenant else {
331 return Err(RedDBError::Query(format!(
332 "INSERT into tenant-scoped table '{}' requires an active tenant — \
333 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334 query.table, root, tail
335 )));
336 };
337 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338 }
339 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343 if let Some(slot) = row.get_mut(idx) {
344 let new_value = augmented
345 .values
346 .get(row_idx)
347 .and_then(|v| v.get(idx))
348 .cloned()
349 .unwrap_or(Value::Null);
350 *slot = crate::storage::query::ast::Expr::Literal {
351 value: new_value,
352 span: crate::storage::query::ast::Span::synthetic(),
353 };
354 }
355 }
356 } else {
357 let Some(tenant_id) = &active_tenant else {
361 return Err(RedDBError::Query(format!(
362 "INSERT into tenant-scoped table '{}' requires an active tenant — \
363 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364 query.table, root, tail
365 )));
366 };
367 augmented.columns.push(root.to_string());
369 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370 for row in augmented.values.iter_mut() {
371 row.push(fresh.clone());
372 }
373 for row in augmented.value_exprs.iter_mut() {
374 row.push(crate::storage::query::ast::Expr::Literal {
375 value: fresh.clone(),
376 span: crate::storage::query::ast::Span::synthetic(),
377 });
378 }
379 }
380
381 Ok(Some(augmented))
382 }
383
384 fn delete_entities_batch(
387 &self,
388 collection: &str,
389 ids: &[EntityId],
390 ) -> RedDBResult<(u64, Vec<u64>)> {
391 if ids.is_empty() {
392 return Ok((0, vec![]));
393 }
394
395 let store = self.db().store();
396 let Some(manager) = store.get_collection(collection) else {
397 return Ok((0, vec![]));
398 };
399
400 let active_xid = self.current_xid();
401 let conn_id = crate::runtime::impl_core::current_connection_id();
402 let mut autocommit_xid = None;
403 let mut tombstoned_ids = Vec::new();
404 let mut tombstoned_entities = Vec::new();
405 let mut physical_delete_ids = Vec::new();
406 let table_row_resolver =
407 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408
409 for &id in ids {
410 let Some(mut entity) = manager.get(id) else {
411 continue;
412 };
413 if matches!(entity.data, EntityData::Row(_)) {
414 let previous_xmax = entity.xmax;
415 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
416 if table_row_resolver.resolve_candidate(&entity).is_none() {
417 continue;
418 }
419 } else if entity.xmax != 0 {
420 continue;
421 }
422
423 let xid = match active_xid {
424 Some(xid) => xid,
425 None => match autocommit_xid {
426 Some(xid) => xid,
427 None => {
428 let mgr = self.snapshot_manager();
429 let xid = mgr.begin();
430 autocommit_xid = Some(xid);
431 xid
432 }
433 },
434 };
435 entity.set_xmax(xid);
436 if manager.update(entity.clone()).is_ok() {
437 if active_xid.is_some() {
438 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
439 }
440 tombstoned_entities.push(entity);
441 tombstoned_ids.push(id);
442 }
443 } else {
444 physical_delete_ids.push(id);
445 }
446 }
447
448 if let Some(xid) = autocommit_xid {
449 self.snapshot_manager().commit(xid);
450 }
451
452 let mut affected = tombstoned_ids.len() as u64;
453 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
454 if active_xid.is_some() {
455 store
456 .persist_entities_to_pager(collection, &tombstoned_entities)
457 .map_err(|err| RedDBError::Internal(err.to_string()))?;
458 } else {
459 store
460 .persist_entities_to_pager(collection, &tombstoned_entities)
461 .map_err(|err| RedDBError::Internal(err.to_string()))?;
462 for id in &tombstoned_ids {
463 store.context_index().remove_entity(*id);
464 let lsn = self.cdc_emit(
465 crate::replication::cdc::ChangeOperation::Delete,
466 collection,
467 id.raw(),
468 "entity",
469 );
470 lsns.push(lsn);
471 }
472 }
473
474 let deleted_ids = store
475 .delete_batch(collection, &physical_delete_ids)
476 .map_err(|err| RedDBError::Internal(err.to_string()))?;
477 affected += deleted_ids.len() as u64;
478 for id in &deleted_ids {
479 store.context_index().remove_entity(*id);
480 let lsn = self.cdc_emit(
481 crate::replication::cdc::ChangeOperation::Delete,
482 collection,
483 id.raw(),
484 "entity",
485 );
486 lsns.push(lsn);
487 }
488
489 Ok((affected, lsns))
490 }
491
492 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
495 if applied.is_empty() {
496 return Ok(Vec::new());
497 }
498
499 let store = self.db().store();
500 if applied.iter().any(|item| item.context_index_dirty) {
501 store.context_index().index_entities(
502 &applied[0].collection,
503 applied
504 .iter()
505 .filter(|item| item.context_index_dirty)
506 .map(|item| &item.entity),
507 );
508 }
509
510 for item in applied {
511 self.refresh_update_secondary_indexes(item)?;
512 }
513
514 let mut lsns = Vec::with_capacity(applied.len());
515 for item in applied {
516 let lsn = self.cdc_emit_prebuilt(
517 crate::replication::cdc::ChangeOperation::Update,
518 &item.collection,
519 &item.entity,
520 update_cdc_item_kind(self, &item.collection, &item.entity),
521 item.metadata.as_ref(),
522 false,
523 );
524 lsns.push(lsn);
525 }
526 Ok(lsns)
527 }
528
529 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
530 self.persist_applied_entity_mutations(applied)
531 }
532
533 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
534 if applied.pre_mutation_fields.is_empty() {
535 return Ok(());
536 }
537 let post = entity_row_fields_snapshot(&applied.entity);
538 if post.is_empty() {
539 return Ok(());
540 }
541
542 let indexed_cols = self
543 .index_store_ref()
544 .indexed_columns_set(&applied.collection);
545 if indexed_cols.is_empty() {
546 return Ok(());
547 }
548
549 if let Some(old_version) = applied.replaced_entity.as_ref() {
550 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
551 .pre_mutation_fields
552 .iter()
553 .filter(|(col, _)| indexed_cols.contains(col))
554 .cloned()
555 .collect();
556 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
557 .iter()
558 .filter(|(col, _)| indexed_cols.contains(col))
559 .cloned()
560 .collect();
561 if !old_index_fields.is_empty() {
562 self.index_store_ref()
563 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
564 .map_err(crate::RedDBError::Internal)?;
565 }
566 if !new_index_fields.is_empty() {
567 self.index_store_ref()
568 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
569 .map_err(crate::RedDBError::Internal)?;
570 }
571 return Ok(());
572 }
573
574 let damage =
575 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
576 if damage
577 .touched_columns()
578 .into_iter()
579 .any(|col| indexed_cols.contains(col))
580 {
581 self.index_store_ref()
582 .index_entity_update(
583 &applied.collection,
584 applied.id,
585 &applied.pre_mutation_fields,
586 &post,
587 )
588 .map_err(crate::RedDBError::Internal)?;
589 }
590 Ok(())
591 }
592
593 pub fn execute_insert(
598 &self,
599 raw_query: &str,
600 query: &InsertQuery,
601 ) -> RedDBResult<RuntimeQueryResult> {
602 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
603 crate::runtime::collection_contract::CollectionContractGate::check(
609 self,
610 &query.table,
611 crate::runtime::collection_contract::MutationKind::Insert,
612 )?;
613 let augmented_owned;
622 let query = match self.maybe_inject_tenant_column(query)? {
623 Some(new_q) => {
624 augmented_owned = new_q;
625 &augmented_owned
626 }
627 None => query,
628 };
629 self.check_insert_column_policy(query)?;
630 if let Some(ref embed_config) = query.auto_embed {
631 let provider = crate::ai::parse_provider(&embed_config.provider)?;
632 crate::runtime::ai::provider_gate::enforce(self, &provider)?;
636 if matches!(provider, crate::ai::AiProvider::Local) {
637 let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
645 if model_name.is_empty() {
646 return Err(RedDBError::Query(
647 "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
648 the local provider does not have an implicit default model"
649 .to_string(),
650 ));
651 }
652 crate::runtime::ai::local_embedding::preflight_local_embedding(
653 &self.inner.db,
654 model_name,
655 )?;
656 }
657 }
658
659 let mut inserted_count: u64 = 0;
660 let effective_rows =
661 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
662
663 let store = self.inner.db.store();
665 let _ = store.get_or_create_collection(&query.table);
666 let declared_model = self
667 .db()
668 .collection_contract_arc(&query.table)
669 .map(|contract| contract.declared_model);
670
671 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
672 if query.returning.is_some() {
673 Some(Vec::with_capacity(effective_rows.len()))
674 } else {
675 None
676 };
677 let mut returning_result: Option<UnifiedResult> = None;
678
679 if matches!(query.entity_type, InsertEntityType::Row)
680 && !matches!(
681 declared_model,
682 Some(crate::catalog::CollectionModel::TimeSeries)
683 )
684 {
685 let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
696 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
697 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
698 } else {
699 None
700 };
701 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
702
703 if chain_mode && self.is_chain_integrity_broken(&query.table) {
706 return Err(RedDBError::InvalidOperation(format!(
707 "ChainIntegrityBroken: collection '{}' is locked until \
708 POST /collections/{}/clear-integrity-flag is called by an admin",
709 query.table, query.table
710 )));
711 }
712
713 let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
717 if chain_mode {
718 let mut cache = self.inner.chain_tip_cache.lock();
719 if let Some(existing) = cache.get(&query.table) {
720 Some(existing.clone())
721 } else if let Some(scanned) =
722 crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
723 {
724 cache.insert(query.table.clone(), scanned.clone());
725 Some(scanned)
726 } else {
727 None
728 }
729 } else {
730 None
731 };
732
733 let mut rows = Vec::with_capacity(effective_rows.len());
734 for row_values in &effective_rows {
735 if row_values.len() != query.columns.len() {
736 return Err(RedDBError::Query(format!(
737 "INSERT column count ({}) does not match value count ({})",
738 query.columns.len(),
739 row_values.len()
740 )));
741 }
742 let (mut fields, mut metadata) =
743 split_insert_metadata(self, &query.columns, row_values)?;
744 if chain_mode {
745 use crate::runtime::blockchain_kind::{
746 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
747 COL_TIMESTAMP, RESERVED_COLUMNS,
748 };
749 let supplied_height = fields
750 .iter()
751 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
752 .map(|(_, v)| v.clone());
753 let supplied_prev = fields
754 .iter()
755 .find(|(k, _)| k == COL_PREV_HASH)
756 .map(|(_, v)| v.clone());
757 let supplied_ts = fields
758 .iter()
759 .find(|(k, _)| k == COL_TIMESTAMP)
760 .map(|(_, v)| v.clone());
761 let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
762 let user_supplied_any = supplied_height.is_some()
763 || supplied_prev.is_some()
764 || supplied_ts.is_some()
765 || supplied_hash;
766
767 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
768 let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
769
770 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
771 Some(t) => (t.hash, t.height + 1),
772 None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
773 };
774 let server_now = crate::runtime::blockchain_kind::now_ms();
775
776 let (use_prev, use_height, use_ts) = if user_supplied_any {
777 if supplied_hash {
780 return Err(chain_conflict_error(
781 tip_next_height.saturating_sub(1),
782 tip_prev_hash,
783 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
784 server_now,
785 "hash column is engine-computed and cannot be supplied",
786 ));
787 }
788 let caller_prev = match &supplied_prev {
789 Some(Value::Blob(b)) if b.len() == 32 => {
790 let mut a = [0u8; 32];
791 a.copy_from_slice(b);
792 a
793 }
794 Some(Value::Text(s)) if s.len() == 64 => {
795 let mut a = [0u8; 32];
799 let mut ok = true;
800 for (i, slot) in a.iter_mut().enumerate() {
801 let pair = &s.as_ref()[i * 2..i * 2 + 2];
802 match u8::from_str_radix(pair, 16) {
803 Ok(byte) => *slot = byte,
804 Err(_) => {
805 ok = false;
806 break;
807 }
808 }
809 }
810 if !ok {
811 return Err(chain_conflict_error(
812 tip_next_height.saturating_sub(1),
813 tip_prev_hash,
814 chain_tip_full
815 .as_ref()
816 .map(|t| t.timestamp_ms)
817 .unwrap_or(0),
818 server_now,
819 "prev_hash is not valid hex",
820 ));
821 }
822 a
823 }
824 _ => {
825 return Err(chain_conflict_error(
826 tip_next_height.saturating_sub(1),
827 tip_prev_hash,
828 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
829 server_now,
830 "prev_hash missing or not a 32-byte Blob",
831 ));
832 }
833 };
834 if caller_prev != tip_prev_hash {
835 return Err(chain_conflict_error(
836 tip_next_height.saturating_sub(1),
837 tip_prev_hash,
838 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
839 server_now,
840 "prev_hash does not match current tip",
841 ));
842 }
843 let caller_height = match &supplied_height {
844 Some(Value::UnsignedInteger(v)) => *v,
845 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
846 _ => {
847 return Err(chain_conflict_error(
848 tip_next_height.saturating_sub(1),
849 tip_prev_hash,
850 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
851 server_now,
852 "block_height missing or not an unsigned integer",
853 ));
854 }
855 };
856 if caller_height != tip_next_height {
857 return Err(chain_conflict_error(
858 tip_next_height.saturating_sub(1),
859 tip_prev_hash,
860 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
861 server_now,
862 "block_height does not match tip+1",
863 ));
864 }
865 let caller_ts = match &supplied_ts {
866 Some(Value::UnsignedInteger(v)) => *v,
867 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
868 _ => {
869 return Err(chain_conflict_error(
870 tip_next_height.saturating_sub(1),
871 tip_prev_hash,
872 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
873 server_now,
874 "timestamp missing or not an unsigned integer",
875 ));
876 }
877 };
878 let drift = (caller_ts as i128) - (server_now as i128);
879 if drift.abs() > 60_000 {
880 return Err(chain_conflict_error(
881 tip_next_height.saturating_sub(1),
882 tip_prev_hash,
883 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
884 server_now,
885 "timestamp outside ±60s of server_time",
886 ));
887 }
888 (caller_prev, caller_height, caller_ts)
889 } else {
890 (tip_prev_hash, tip_next_height, server_now)
891 };
892
893 let (reserved, new_hash) =
894 crate::runtime::blockchain_kind::make_block_reserved_fields(
895 use_prev, use_height, use_ts, &payload,
896 );
897 fields.extend(reserved);
898 chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
899 height: use_height,
900 hash: new_hash,
901 timestamp_ms: use_ts,
902 });
903 }
904 if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
916 let (pk_col, sig_col, residual) =
917 crate::runtime::signed_writes_kind::split_signature_fields(fields);
918 let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
919 let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
920 crate::runtime::signed_writes_kind::verify_row(
921 ®,
922 pk_col.as_ref().map(|c| c.bytes.as_slice()),
923 sig_col.as_ref().map(|c| c.bytes.as_slice()),
924 &payload,
925 )
926 .map_err(crate::runtime::signed_writes_kind::map_error)?;
927 fields = residual;
928 if let Some(col) = pk_col {
933 fields.push((
934 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
935 col.raw_value,
936 ));
937 }
938 if let Some(col) = sig_col {
939 fields.push((
940 crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
941 col.raw_value,
942 ));
943 }
944 }
945 merge_with_clauses(
946 &mut metadata,
947 query.ttl_ms,
948 query.expires_at_ms,
949 &query.with_metadata,
950 );
951 if let Some(snaps) = returning_snapshots.as_mut() {
952 snaps.push(fields.clone());
953 }
954 rows.push(CreateRowInput {
955 collection: query.table.clone(),
956 fields,
957 metadata,
958 node_links: Vec::new(),
959 vector_links: Vec::new(),
960 });
961 }
962 let outputs = self.create_rows_batch(CreateRowsBatchInput {
963 collection: query.table.clone(),
964 rows,
965 suppress_events: query.suppress_events,
966 })?;
967 inserted_count = outputs.len() as u64;
968
969 if chain_mode {
973 if let Some(new_tip) = chain_tip_full.as_ref() {
974 self.inner
975 .chain_tip_cache
976 .lock()
977 .insert(query.table.clone(), new_tip.clone());
978 }
979 }
980
981 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
989 let time_col = &spec.time_column;
990 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
992 for row in &effective_rows {
993 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
994 if *n >= 0 {
995 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
996 }
997 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
998 let _ = self.inner.db.hypertables().route(&query.table, *n);
999 }
1000 }
1001 }
1002 }
1003
1004 if let (Some(items), Some(snaps)) =
1005 (query.returning.as_ref(), returning_snapshots.take())
1006 {
1007 let snaps = row_insert_returning_snapshots(&outputs, snaps);
1008 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1009 }
1010 } else {
1011 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1018 Vec::with_capacity(effective_rows.len());
1019 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1020 {
1021 Vec::with_capacity(effective_rows.len())
1022 } else {
1023 Vec::new()
1024 };
1025 if matches!(
1026 query.entity_type,
1027 InsertEntityType::Node | InsertEntityType::Edge
1028 ) {
1029 enum PreparedGraphInsert {
1030 Node {
1031 fields: Vec<(String, Value)>,
1032 input: CreateNodeInput,
1033 },
1034 Edge {
1035 fields: Vec<(String, Value)>,
1036 input: CreateEdgeInput,
1037 },
1038 }
1039
1040 let mut prepared = Vec::with_capacity(effective_rows.len());
1041 for row_values in &effective_rows {
1042 if row_values.len() != query.columns.len() {
1043 return Err(RedDBError::Query(format!(
1044 "INSERT column count ({}) does not match value count ({})",
1045 query.columns.len(),
1046 row_values.len()
1047 )));
1048 }
1049
1050 match query.entity_type {
1051 InsertEntityType::Node => {
1052 let (node_values, mut metadata) =
1053 split_insert_metadata(self, &query.columns, row_values)?;
1054 merge_with_clauses(
1055 &mut metadata,
1056 query.ttl_ms,
1057 query.expires_at_ms,
1058 &query.with_metadata,
1059 );
1060 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1061 apply_collection_default_ttl_metadata(
1062 self,
1063 &query.table,
1064 &mut metadata,
1065 );
1066 let (columns, values) = pairwise_columns_values(&node_values);
1067 let label = find_column_value_string(&columns, &values, "label")?;
1068 let node_type =
1069 find_column_value_opt_string(&columns, &values, "node_type");
1070 let properties = extract_remaining_properties(
1071 &columns,
1072 &values,
1073 &["label", "node_type"],
1074 );
1075 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1076 properties.iter().map(|(key, _)| key.as_str()),
1077 &format!("node '{}'", query.table),
1078 )?;
1079 prepared.push(PreparedGraphInsert::Node {
1080 fields: node_values,
1081 input: CreateNodeInput {
1082 collection: query.table.clone(),
1083 label,
1084 node_type,
1085 properties,
1086 metadata,
1087 embeddings: Vec::new(),
1088 table_links: Vec::new(),
1089 node_links: Vec::new(),
1090 },
1091 });
1092 }
1093 InsertEntityType::Edge => {
1094 let (edge_values, mut metadata) =
1095 split_insert_metadata(self, &query.columns, row_values)?;
1096 merge_with_clauses(
1097 &mut metadata,
1098 query.ttl_ms,
1099 query.expires_at_ms,
1100 &query.with_metadata,
1101 );
1102 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1103 apply_collection_default_ttl_metadata(
1104 self,
1105 &query.table,
1106 &mut metadata,
1107 );
1108 let (columns, values) = pairwise_columns_values(&edge_values);
1109 let label = find_column_value_string(&columns, &values, "label")?;
1110 ensure_non_tree_structural_edge_label(&label)?;
1111 let from_id = resolve_edge_endpoint_any(
1112 self.inner.db.store().as_ref(),
1113 &query.table,
1114 &columns,
1115 &values,
1116 &["from_rid", "from"],
1117 )?;
1118 let to_id = resolve_edge_endpoint_any(
1119 self.inner.db.store().as_ref(),
1120 &query.table,
1121 &columns,
1122 &values,
1123 &["to_rid", "to"],
1124 )?;
1125 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1126 let properties = extract_remaining_properties(
1127 &columns,
1128 &values,
1129 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1130 );
1131 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1132 properties.iter().map(|(key, _)| key.as_str()),
1133 &format!("edge '{}'", query.table),
1134 )?;
1135 prepared.push(PreparedGraphInsert::Edge {
1136 fields: edge_values,
1137 input: CreateEdgeInput {
1138 collection: query.table.clone(),
1139 label,
1140 from: EntityId::new(from_id),
1141 to: EntityId::new(to_id),
1142 weight,
1143 properties,
1144 metadata,
1145 },
1146 });
1147 }
1148 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1149 }
1150 }
1151
1152 ensure_graph_insert_contract(self, &query.table)?;
1153 let mut batch = self.inner.db.batch();
1154 for item in prepared {
1155 match item {
1156 PreparedGraphInsert::Node { fields, input } => {
1157 if query.returning.is_some() {
1158 returning_field_snaps.push(fields);
1159 }
1160 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1161 batch = batch.add_node_with_type(
1162 input.collection,
1163 input.label,
1164 node_type,
1165 input.properties.into_iter().collect(),
1166 input.metadata.into_iter().collect(),
1167 );
1168 }
1169 PreparedGraphInsert::Edge { fields, input } => {
1170 if query.returning.is_some() {
1171 returning_field_snaps.push(fields);
1172 }
1173 batch = batch.add_edge(
1174 input.collection,
1175 input.label,
1176 input.from,
1177 input.to,
1178 input.weight.unwrap_or(1.0),
1179 input.properties.into_iter().collect(),
1180 input.metadata.into_iter().collect(),
1181 );
1182 }
1183 }
1184 }
1185 let batch_result = batch
1186 .execute()
1187 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1188 let (ids, entity_kind) = match query.entity_type {
1189 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1190 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1191 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1192 };
1193 for id in &ids {
1194 self.stamp_xmin_if_in_txn(&query.table, *id);
1195 }
1196 if query.returning.is_some() {
1197 returning_field_snaps = graph_insert_returning_snapshots(
1198 self.inner.db.store().as_ref(),
1199 &query.table,
1200 &ids,
1201 );
1202 }
1203 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1204 let store = self.inner.db.store();
1205 entity_outputs.extend(ids.iter().map(|id| {
1206 crate::application::entity::CreateEntityOutput {
1207 id: *id,
1208 entity: store.get(&query.table, *id),
1209 }
1210 }));
1211 inserted_count = ids.len() as u64;
1212 } else {
1213 for row_values in &effective_rows {
1214 if row_values.len() != query.columns.len() {
1215 return Err(RedDBError::Query(format!(
1216 "INSERT column count ({}) does not match value count ({})",
1217 query.columns.len(),
1218 row_values.len()
1219 )));
1220 }
1221
1222 match query.entity_type {
1223 InsertEntityType::Row => {
1224 if query.returning.is_some() {
1225 return Err(RedDBError::Query(
1226 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1227 .to_string(),
1228 ));
1229 }
1230 let (fields, mut metadata) =
1231 split_insert_metadata(self, &query.columns, row_values)?;
1232 merge_with_clauses(
1233 &mut metadata,
1234 query.ttl_ms,
1235 query.expires_at_ms,
1236 &query.with_metadata,
1237 );
1238 self.insert_timeseries_point(&query.table, fields, metadata)?;
1239 }
1240 InsertEntityType::Node | InsertEntityType::Edge => {
1241 unreachable!("NODE and EDGE are handled by the prepared graph path")
1242 }
1243 InsertEntityType::Vector => {
1244 let (vector_values, mut metadata) =
1245 split_insert_metadata(self, &query.columns, row_values)?;
1246 merge_with_clauses(
1247 &mut metadata,
1248 query.ttl_ms,
1249 query.expires_at_ms,
1250 &query.with_metadata,
1251 );
1252 let (columns, values) = pairwise_columns_values(&vector_values);
1253 let dense = find_column_value_vec_f32_any(
1254 &columns,
1255 &values,
1256 &["dense", "embedding"],
1257 )?;
1258 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1259 let content =
1260 find_column_value_opt_string(&columns, &values, "content");
1261 if query.returning.is_some() {
1262 returning_field_snaps.push(vector_values.clone());
1263 }
1264 let input = CreateVectorInput {
1265 collection: query.table.clone(),
1266 dense,
1267 content,
1268 metadata,
1269 link_row: None,
1270 link_node: None,
1271 };
1272 entity_outputs.push(self.create_vector(input)?);
1273 }
1274 InsertEntityType::Document => {
1275 let (document_values, mut metadata) =
1276 split_insert_metadata(self, &query.columns, row_values)?;
1277 merge_with_clauses(
1278 &mut metadata,
1279 query.ttl_ms,
1280 query.expires_at_ms,
1281 &query.with_metadata,
1282 );
1283 let (columns, values) = pairwise_columns_values(&document_values);
1284 let body_str = find_column_value_string(&columns, &values, "body")?;
1285 let body: crate::json::Value = crate::json::from_str(&body_str)
1286 .map_err(|e| {
1287 RedDBError::Query(format!("invalid JSON body: {e}"))
1288 })?;
1289 let input = CreateDocumentInput {
1290 collection: query.table.clone(),
1291 body,
1292 metadata,
1293 node_links: Vec::new(),
1294 vector_links: Vec::new(),
1295 };
1296 let output = self.create_document(input)?;
1297 if query.returning.is_some() {
1298 let fields = output
1299 .entity
1300 .as_ref()
1301 .map(entity_row_fields_snapshot)
1302 .filter(|fields| !fields.is_empty())
1303 .unwrap_or(document_values);
1304 returning_field_snaps.push(fields);
1305 }
1306 entity_outputs.push(output);
1307 }
1308 InsertEntityType::Kv => {
1309 let (kv_values, mut metadata) =
1310 split_insert_metadata(self, &query.columns, row_values)?;
1311 merge_with_clauses(
1312 &mut metadata,
1313 query.ttl_ms,
1314 query.expires_at_ms,
1315 &query.with_metadata,
1316 );
1317 let (columns, values) = pairwise_columns_values(&kv_values);
1318 let key = find_column_value_string(&columns, &values, "key")?;
1319 let value = find_column_value(&columns, &values, "value")?;
1320 if query.returning.is_some() {
1321 returning_field_snaps.push(kv_values.clone());
1322 }
1323 let input = CreateKvInput {
1324 collection: query.table.clone(),
1325 key,
1326 value,
1327 metadata,
1328 };
1329 entity_outputs.push(self.create_kv(input)?);
1330 }
1331 }
1332
1333 inserted_count += 1;
1334 }
1335 }
1336
1337 if let Some(items) = query.returning.as_ref() {
1338 if !entity_outputs.is_empty() {
1339 returning_result = Some(build_returning_result(
1340 items,
1341 &returning_field_snaps,
1342 Some(&entity_outputs),
1343 ));
1344 }
1345 }
1346 }
1347
1348 if let Some(ref embed_config) = query.auto_embed {
1350 let store = self.inner.db.store();
1351 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1352 let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1353 let api_key = if is_local_provider {
1358 String::new()
1359 } else {
1360 crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1361 };
1362 let model = embed_config.model.clone().unwrap_or_else(|| {
1363 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1364 .ok()
1365 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1366 });
1367
1368 let manager = store
1370 .get_collection(&query.table)
1371 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1372 let entities = manager.query_all(|_| true);
1373 let recent: Vec<_> = entities
1374 .into_iter()
1375 .rev()
1376 .take(effective_rows.len())
1377 .collect();
1378
1379 let entity_combos: Vec<(usize, String)> = recent
1381 .iter()
1382 .enumerate()
1383 .filter_map(|(i, entity)| {
1384 if let EntityData::Row(ref row) = entity.data {
1385 if let Some(ref named) = row.named {
1386 let texts: Vec<String> = embed_config
1387 .fields
1388 .iter()
1389 .filter_map(|field| match named.get(field) {
1390 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1391 _ => None,
1392 })
1393 .collect();
1394 if !texts.is_empty() {
1395 return Some((i, texts.join(" ")));
1396 }
1397 }
1398 }
1399 None
1400 })
1401 .collect();
1402
1403 if !entity_combos.is_empty() {
1404 let batch_texts: Vec<String> =
1406 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1407
1408 let embeddings = if is_local_provider {
1418 let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1419 &self.inner.db,
1420 &model,
1421 batch_texts,
1422 )?;
1423 response.embeddings
1424 } else {
1425 let batch_client =
1426 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1427
1428 match tokio::runtime::Handle::try_current() {
1429 Ok(handle) => tokio::task::block_in_place(|| {
1430 handle.block_on(batch_client.embed_batch(
1431 &provider,
1432 &model,
1433 &api_key,
1434 batch_texts,
1435 ))
1436 }),
1437 Err(_) => {
1438 return Err(RedDBError::Query(
1439 "AUTO EMBED requires a Tokio runtime context".to_string(),
1440 ));
1441 }
1442 }
1443 .map_err(|e| RedDBError::Query(e.to_string()))?
1444 };
1445
1446 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1448 if dense.is_empty() {
1449 continue;
1450 }
1451 self.create_vector(CreateVectorInput {
1452 collection: query.table.clone(),
1453 dense,
1454 content: Some(combined.clone()),
1455 metadata: Vec::new(),
1456 link_row: None,
1457 link_node: None,
1458 })?;
1459 }
1460 }
1461 }
1462
1463 if inserted_count > 0 {
1464 self.note_table_write(&query.table);
1465 }
1466
1467 let mut result = RuntimeQueryResult::dml_result(
1468 raw_query.to_string(),
1469 inserted_count,
1470 "insert",
1471 "runtime-dml",
1472 );
1473 if let Some(returning) = returning_result {
1474 result.result = returning;
1475 }
1476 Ok(result)
1477 }
1478
1479 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1480 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1481 return Ok(());
1482 };
1483 if !auth_store.iam_authorization_enabled() {
1484 return Ok(());
1485 }
1486 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1487 return Ok(());
1488 };
1489
1490 let tenant = crate::runtime::impl_core::current_tenant();
1491 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1492 let request = crate::auth::ColumnAccessRequest {
1493 action: "insert".to_string(),
1494 schema: None,
1495 table: query.table.clone(),
1496 columns: query.columns.clone(),
1497 };
1498 let ctx = crate::auth::policies::EvalContext {
1499 principal_tenant: tenant.clone(),
1500 current_tenant: tenant,
1501 peer_ip: None,
1502 mfa_present: false,
1503 now_ms: crate::auth::now_ms(),
1504 principal_is_admin_role: role == crate::auth::Role::Admin,
1505 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1506 principal_is_platform_scoped: principal.tenant.is_none(),
1507 };
1508
1509 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1510 let table_allowed = matches!(
1511 outcome.table_decision,
1512 crate::auth::policies::Decision::Allow { .. }
1513 | crate::auth::policies::Decision::AdminBypass
1514 );
1515 if !table_allowed {
1516 return Err(RedDBError::Query(format!(
1517 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1518 outcome.table_resource.kind, outcome.table_resource.name
1519 )));
1520 }
1521 if let Some(denied) = outcome.first_denied_column() {
1522 return Err(RedDBError::Query(format!(
1523 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1524 denied.resource.kind, denied.resource.name
1525 )));
1526 }
1527
1528 Ok(())
1529 }
1530
1531 pub(crate) fn insert_timeseries_point(
1532 &self,
1533 collection: &str,
1534 fields: Vec<(String, Value)>,
1535 mut metadata: Vec<(String, MetadataValue)>,
1536 ) -> RedDBResult<EntityId> {
1537 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1538
1539 let (columns, values) = pairwise_columns_values(&fields);
1540 validate_timeseries_insert_columns(&columns)?;
1541
1542 let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1551 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1552 if let Some(event_name) = event_name_opt.as_deref() {
1553 let store_for_schema = self.inner.db.store();
1554 if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1555 .is_some()
1556 {
1557 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1558 super::analytics_schema_registry::validate(
1559 store_for_schema.as_ref(),
1560 event_name,
1561 payload_json,
1562 )
1563 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1564 }
1565 }
1566
1567 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1572 Some(m) => m,
1573 None => event_name_opt.clone().ok_or_else(|| {
1574 RedDBError::Query(
1575 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1576 )
1577 })?,
1578 };
1579 let value = match find_column_value_opt_string(&columns, &values, "value") {
1583 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1584 None => columns
1585 .iter()
1586 .position(|c| c.eq_ignore_ascii_case("value"))
1587 .and_then(|i| match &values[i] {
1588 Value::Float(f) => Some(*f),
1589 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1590 Value::UnsignedInteger(n) => Some(*n as f64),
1591 _ => None,
1592 })
1593 .unwrap_or(1.0),
1594 };
1595 let timestamp_ns =
1596 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1597 let mut tags = find_timeseries_tags(&columns, &values)?;
1598 if let Some(ref name) = event_name_opt {
1599 tags.entry("event_name".to_string())
1600 .or_insert_with(|| name.clone());
1601 }
1602 if let Some(ref payload) = payload_opt {
1603 tags.entry("payload".to_string())
1604 .or_insert_with(|| payload.clone());
1605 }
1606
1607 let mut entity = UnifiedEntity::new(
1608 EntityId::new(0),
1609 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1610 series: collection.to_string(),
1611 metric: metric.clone(),
1612 })),
1613 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1614 metric,
1615 timestamp_ns,
1616 value,
1617 tags,
1618 }),
1619 );
1620 let writer_xid = match self.current_xid() {
1624 Some(xid) => xid,
1625 None => {
1626 let mgr = self.snapshot_manager();
1627 let xid = mgr.begin();
1628 mgr.commit(xid);
1629 xid
1630 }
1631 };
1632 entity.set_xmin(writer_xid);
1633
1634 let store = self.inner.db.store();
1635 let id = store
1636 .insert_auto(collection, entity)
1637 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1638
1639 if !metadata.is_empty() {
1640 let _ = store.set_metadata(
1641 collection,
1642 id,
1643 Metadata::with_fields(metadata.into_iter().collect()),
1644 );
1645 }
1646
1647 self.cdc_emit(
1648 crate::replication::cdc::ChangeOperation::Insert,
1649 collection,
1650 id.raw(),
1651 "timeseries",
1652 );
1653
1654 Ok(id)
1655 }
1656
1657 pub fn execute_update(
1662 &self,
1663 raw_query: &str,
1664 query: &UpdateQuery,
1665 ) -> RedDBResult<RuntimeQueryResult> {
1666 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1667 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1671 return Err(RedDBError::InvalidOperation(format!(
1672 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1673 query.table
1674 )));
1675 }
1676 crate::runtime::collection_contract::CollectionContractGate::check(
1682 self,
1683 &query.table,
1684 crate::runtime::collection_contract::MutationKind::Update,
1685 )?;
1686 ensure_update_target_contract(self, &query.table, query.target)?;
1687 ensure_graph_identity_update_target_allowed(query)?;
1688
1689 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1695 let augmented_query: UpdateQuery;
1696 let effective_query: &UpdateQuery = if rls_gated {
1697 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1698 self,
1699 &query.table,
1700 crate::storage::query::ast::PolicyAction::Update,
1701 );
1702 let Some(policy) = rls_filter else {
1703 let mut response = RuntimeQueryResult::dml_result(
1706 raw_query.to_string(),
1707 0,
1708 "update",
1709 "runtime-dml-rls",
1710 );
1711 if let Some(items) = query.returning.clone() {
1712 response.result = build_returning_result(&items, &[], None);
1713 }
1714 return Ok(response);
1715 };
1716 let mut augmented = query.clone();
1717 augmented.filter = Some(match augmented.filter.take() {
1718 Some(existing) => {
1719 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1720 }
1721 None => policy,
1722 });
1723 augmented_query = augmented;
1724 &augmented_query
1725 } else {
1726 query
1727 };
1728
1729 if let Some(items) = effective_query.returning.clone() {
1734 let mut inner_query = effective_query.clone();
1735 inner_query.returning = None;
1736 let (mut response, touched_ids) =
1737 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1738
1739 let snapshots = if matches!(
1740 effective_query.target,
1741 UpdateTarget::Nodes | UpdateTarget::Edges
1742 ) {
1743 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1744 } else {
1745 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1746 .row_snapshots(&touched_ids)
1747 };
1748
1749 response.result = build_returning_result(&items, &snapshots, None);
1750 response.engine = "runtime-dml-returning";
1751 return Ok(response);
1752 }
1753
1754 self.execute_update_inner(raw_query, effective_query)
1755 }
1756
1757 fn execute_update_inner(
1759 &self,
1760 raw_query: &str,
1761 query: &UpdateQuery,
1762 ) -> RedDBResult<RuntimeQueryResult> {
1763 self.execute_update_inner_tracked(raw_query, query)
1764 .map(|(res, _)| res)
1765 }
1766
1767 fn execute_update_inner_tracked(
1768 &self,
1769 raw_query: &str,
1770 query: &UpdateQuery,
1771 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1772 let store = self.inner.db.store();
1773 let effective_filter = effective_update_filter(query);
1774 let compiled_plan = self.compile_update_plan(query)?;
1775 let needs_rmw_lock = update_needs_rmw_lock(query);
1776 let table_rmw_lock = if needs_rmw_lock {
1777 Some(
1778 self.inner
1779 .rmw_locks
1780 .lock_for(&query.table, "__table_rmw_update__"),
1781 )
1782 } else {
1783 None
1784 };
1785 let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1786 let mut touched_ids: Vec<EntityId> = Vec::new();
1787 let limit_cap = query.limit.map(|l| l as usize);
1788 let manager = store
1789 .get_collection(&query.table)
1790 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1791 let scan_limit = if query.order_by.is_empty() {
1792 limit_cap
1793 } else {
1794 None
1795 };
1796 let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1797 self,
1798 &query.table,
1799 effective_filter.as_ref(),
1800 scan_limit,
1801 query.target,
1802 );
1803 if needs_rmw_lock {
1804 target_scan = target_scan.with_live_table_rows();
1805 }
1806 let ids_to_update = target_scan.find_target_ids()?;
1807 let ids_to_update = if query.order_by.is_empty() {
1808 ids_to_update
1809 } else {
1810 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1811 };
1812
1813 if needs_rmw_lock {
1814 return self.execute_update_inner_tracked_locked(
1815 raw_query,
1816 query,
1817 &compiled_plan,
1818 &ids_to_update,
1819 effective_filter.as_ref(),
1820 );
1821 }
1822
1823 let mut affected: u64 = 0;
1824 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1825 let mut applied_chunk = Vec::with_capacity(chunk.len());
1826 for entity in manager.get_many(chunk).into_iter().flatten() {
1827 let assignments =
1828 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1829 let applied = self.apply_materialized_update_for_entity(
1830 query.table.clone(),
1831 entity,
1832 &compiled_plan,
1833 assignments,
1834 )?;
1835 touched_ids.push(applied.id);
1836 applied_chunk.push(applied);
1837 }
1838 self.persist_update_chunk(&applied_chunk)?;
1839 affected += applied_chunk.len() as u64;
1840 let lsns = self.flush_update_chunk(&applied_chunk)?;
1841 if !query.suppress_events {
1842 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1843 }
1844 }
1845
1846 if affected > 0 {
1847 self.note_table_write(&query.table);
1848 }
1849
1850 Ok((
1851 RuntimeQueryResult::dml_result(
1852 raw_query.to_string(),
1853 affected,
1854 "update",
1855 "runtime-dml",
1856 ),
1857 touched_ids,
1858 ))
1859 }
1860
1861 fn execute_update_inner_tracked_locked(
1862 &self,
1863 raw_query: &str,
1864 query: &UpdateQuery,
1865 compiled_plan: &CompiledUpdatePlan,
1866 ids_to_update: &[EntityId],
1867 effective_filter: Option<&Filter>,
1868 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1869 let store = self.inner.db.store();
1870 let mut touched_ids = Vec::new();
1871 let mut lock_entries = Vec::new();
1872
1873 for id in ids_to_update {
1874 let Some(candidate) = store.get(&query.table, *id) else {
1875 continue;
1876 };
1877 let logical_id = candidate.logical_id();
1878 let lock_key = format!("row:{}", logical_id.raw());
1879 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1880 lock_entries.push((lock_key, logical_id, rmw_lock));
1881 }
1882
1883 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1884 lock_entries.dedup_by(|left, right| left.0 == right.0);
1885 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1886
1887 let mut applied_chunk = Vec::new();
1888 for (_, logical_id, _) in &lock_entries {
1889 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1890 else {
1891 continue;
1892 };
1893 if let Some(filter) = effective_filter {
1894 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1895 Some(self.inner.db.as_ref()),
1896 &entity,
1897 filter,
1898 &query.table,
1899 &query.table,
1900 ) {
1901 continue;
1902 }
1903 }
1904
1905 let assignments =
1906 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1907 let applied = self.apply_materialized_update_for_entity(
1908 query.table.clone(),
1909 entity,
1910 compiled_plan,
1911 assignments,
1912 )?;
1913 touched_ids.push(applied.id);
1914 applied_chunk.push(applied);
1915 }
1916
1917 let affected = applied_chunk.len() as u64;
1918 if !applied_chunk.is_empty() {
1919 self.persist_update_chunk(&applied_chunk)?;
1920 let lsns = self.flush_update_chunk(&applied_chunk)?;
1921 if !query.suppress_events {
1922 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1923 }
1924 }
1925
1926 if affected > 0 {
1927 self.note_table_write(&query.table);
1928 }
1929
1930 Ok((
1931 RuntimeQueryResult::dml_result(
1932 raw_query.to_string(),
1933 affected,
1934 "update",
1935 "runtime-dml",
1936 ),
1937 touched_ids,
1938 ))
1939 }
1940
1941 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1942 let mut static_field_assignments = Vec::new();
1943 let mut static_metadata_assignments = Vec::new();
1944 let mut dynamic_assignments = Vec::new();
1945 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1946 let mut row_modified_columns = Vec::new();
1947
1948 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1949 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1950 let metadata_key = resolve_sql_ttl_metadata_key(column);
1951 if compound_op.is_some() && metadata_key.is_some() {
1952 return Err(RedDBError::Query(format!(
1953 "compound assignment is only supported for row fields: {column}"
1954 )));
1955 }
1956 if compound_op.is_none() {
1957 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1958 if let Some(metadata_key) = metadata_key {
1959 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1960 let (canonical_key, canonical_value) =
1961 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1962 static_metadata_assignments
1963 .push((canonical_key.to_string(), canonical_value));
1964 } else {
1965 let value = self.resolve_crypto_sentinel(value)?;
1966 static_field_assignments.push((
1967 column.clone(),
1968 normalize_row_update_assignment_with_plan(
1969 &query.table,
1970 column,
1971 value,
1972 row_contract_plan.as_ref(),
1973 )?,
1974 ));
1975 row_modified_columns.push(column.clone());
1976 }
1977 continue;
1978 }
1979 }
1980
1981 dynamic_assignments.push(CompiledUpdateAssignment {
1982 column: column.clone(),
1983 expr: expr.clone(),
1984 compound_op,
1985 metadata_key,
1986 row_rule: if metadata_key.is_none() {
1987 if let Some(plan) = row_contract_plan.as_ref() {
1988 if plan.timestamps_enabled
1989 && (column == "created_at" || column == "updated_at")
1990 {
1991 return Err(RedDBError::Query(format!(
1992 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1993 query.table, column
1994 )));
1995 }
1996 if let Some(rule) = plan.declared_rules.get(column) {
1997 Some(rule.clone())
1998 } else if plan.strict_schema {
1999 return Err(RedDBError::Query(format!(
2000 "collection '{}' is strict and does not allow undeclared fields: {}",
2001 query.table, column
2002 )));
2003 } else {
2004 None
2005 }
2006 } else {
2007 None
2008 }
2009 } else {
2010 None
2011 },
2012 });
2013 if metadata_key.is_none() {
2014 row_modified_columns.push(column.clone());
2015 }
2016 }
2017
2018 let row_modified_columns = dedupe_update_columns(row_modified_columns);
2019 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2020 row_modified_columns.iter().any(|column| {
2021 plan.unique_columns
2022 .keys()
2023 .any(|unique| unique.eq_ignore_ascii_case(column))
2024 })
2025 });
2026
2027 if let Some(ttl_ms) = query.ttl_ms {
2028 static_metadata_assignments
2029 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2030 }
2031 if let Some(expires_at_ms) = query.expires_at_ms {
2032 static_metadata_assignments.push((
2033 "_expires_at".to_string(),
2034 metadata_u64_to_value(expires_at_ms),
2035 ));
2036 }
2037 for (key, val) in &query.with_metadata {
2038 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2039 }
2040
2041 Ok(CompiledUpdatePlan {
2042 static_field_assignments,
2043 static_metadata_assignments,
2044 dynamic_assignments,
2045 row_contract_plan,
2046 row_modified_columns,
2047 row_touches_unique_columns,
2048 })
2049 }
2050
2051 fn materialize_update_assignments_for_entity(
2052 &self,
2053 query: &UpdateQuery,
2054 entity: &UnifiedEntity,
2055 compiled_plan: &CompiledUpdatePlan,
2056 ) -> RedDBResult<MaterializedUpdateAssignments> {
2057 let mut assignments = MaterializedUpdateAssignments::default();
2058 let mut record: Option<UnifiedRecord> = None;
2059
2060 for assignment in &compiled_plan.dynamic_assignments {
2061 if assignment.compound_op.is_some()
2062 && !matches!(
2063 entity.data,
2064 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2065 )
2066 {
2067 return Err(RedDBError::Query(format!(
2068 "compound assignment is only supported for row or graph UPDATE column '{}'",
2069 assignment.column
2070 )));
2071 }
2072 if record.is_none() {
2073 record = runtime_any_record_from_entity_ref(entity);
2074 }
2075 let Some(record) = record.as_ref() else {
2076 return Err(RedDBError::Query(format!(
2077 "UPDATE could not materialize runtime record for entity {} in '{}'",
2078 entity.id.raw(),
2079 query.table
2080 )));
2081 };
2082 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2083 Some(self.inner.db.as_ref()),
2084 &assignment.expr,
2085 record,
2086 Some(query.table.as_str()),
2087 Some(query.table.as_str()),
2088 )
2089 .ok_or_else(|| {
2090 RedDBError::Query(format!(
2091 "failed to evaluate UPDATE expression for column '{}'",
2092 assignment.column
2093 ))
2094 })?;
2095 let value = if let Some(op) = assignment.compound_op {
2096 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2097 } else {
2098 rhs
2099 };
2100
2101 if let Some(metadata_key) = assignment.metadata_key {
2102 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2103 let (canonical_key, canonical_value) =
2104 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2105 assignments
2106 .dynamic_metadata_assignments
2107 .push((canonical_key.to_string(), canonical_value));
2108 } else {
2109 assignments.dynamic_field_assignments.push((
2110 assignment.column.clone(),
2111 normalize_row_update_value_for_rule(
2112 &query.table,
2113 self.resolve_crypto_sentinel(value)?,
2114 assignment.row_rule.as_ref(),
2115 )?,
2116 ));
2117 }
2118 }
2119
2120 Ok(assignments)
2121 }
2122
2123 fn apply_materialized_update_for_entity(
2124 &self,
2125 collection: String,
2126 entity: UnifiedEntity,
2127 compiled_plan: &CompiledUpdatePlan,
2128 assignments: MaterializedUpdateAssignments,
2129 ) -> RedDBResult<AppliedEntityMutation> {
2130 if matches!(entity.data, EntityData::Row(_)) {
2131 return self.apply_loaded_sql_update_row_core(
2132 collection,
2133 entity,
2134 &compiled_plan.static_field_assignments,
2135 assignments.dynamic_field_assignments,
2136 &compiled_plan.static_metadata_assignments,
2137 assignments.dynamic_metadata_assignments,
2138 compiled_plan.row_contract_plan.as_ref(),
2139 &compiled_plan.row_modified_columns,
2140 compiled_plan.row_touches_unique_columns,
2141 );
2142 }
2143
2144 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2145
2146 let operations = build_patch_operations_from_materialized_assignments(
2147 &entity,
2148 compiled_plan,
2149 assignments,
2150 );
2151 self.apply_loaded_patch_entity_core(
2152 collection,
2153 entity,
2154 crate::json::Value::Null,
2155 operations,
2156 )
2157 }
2158
2159 pub fn execute_delete(
2161 &self,
2162 raw_query: &str,
2163 query: &DeleteQuery,
2164 ) -> RedDBResult<RuntimeQueryResult> {
2165 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2166 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2169 return Err(RedDBError::InvalidOperation(format!(
2170 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2171 query.table
2172 )));
2173 }
2174 crate::runtime::collection_contract::CollectionContractGate::check(
2178 self,
2179 &query.table,
2180 crate::runtime::collection_contract::MutationKind::Delete,
2181 )?;
2182
2183 if let Some(items) = query.returning.clone() {
2190 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2191 RedDBError::Query(
2192 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2193 )
2194 })?;
2195 let captured = self.execute_query(&select_sql)?;
2196
2197 let mut inner_query = query.clone();
2198 inner_query.returning = None;
2199 let _ = self.execute_delete(raw_query, &inner_query)?;
2200
2201 let snapshots: Vec<Vec<(String, Value)>> = captured
2202 .result
2203 .records
2204 .iter()
2205 .map(|rec| {
2206 rec.iter_fields()
2207 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2208 .collect()
2209 })
2210 .collect();
2211 let affected = snapshots.len() as u64;
2212 let result = build_returning_result(&items, &snapshots, None);
2213
2214 let mut response = RuntimeQueryResult::dml_result(
2215 raw_query.to_string(),
2216 affected,
2217 "delete",
2218 "runtime-dml-returning",
2219 );
2220 response.result = result;
2221 return Ok(response);
2222 }
2223 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2230 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2231 self,
2232 &query.table,
2233 crate::storage::query::ast::PolicyAction::Delete,
2234 );
2235 let Some(policy) = rls_filter else {
2236 return Ok(RuntimeQueryResult::dml_result(
2237 raw_query.to_string(),
2238 0,
2239 "delete",
2240 "runtime-dml-rls",
2241 ));
2242 };
2243 let mut augmented = query.clone();
2248 augmented.filter = Some(match augmented.filter.take() {
2249 Some(existing) => {
2250 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2251 }
2252 None => policy,
2253 });
2254 return self.execute_delete_inner(raw_query, &augmented);
2255 }
2256 self.execute_delete_inner(raw_query, query)
2257 }
2258
2259 fn execute_delete_inner(
2260 &self,
2261 raw_query: &str,
2262 query: &DeleteQuery,
2263 ) -> RedDBResult<RuntimeQueryResult> {
2264 let effective_filter = effective_delete_filter(query);
2265
2266 let scan = super::dml_target_scan::DmlTargetScan::new(
2270 self,
2271 &query.table,
2272 effective_filter.as_ref(),
2273 None,
2274 );
2275 let ids_to_delete = scan.find_target_ids()?;
2276
2277 let needs_delete_events =
2280 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2281 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2282 scan.row_json_pre_images(&ids_to_delete)
2283 } else {
2284 HashMap::new()
2285 };
2286
2287 let mut affected: u64 = 0;
2288 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2289 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2290 affected += count;
2291 if needs_delete_events && !lsns.is_empty() {
2292 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2296 self.emit_delete_events_for_collection(
2297 &query.table,
2298 deleted_chunk,
2299 &lsns,
2300 &pre_images,
2301 )?;
2302 }
2303 }
2304 pre_images.clear();
2305
2306 if affected > 0 {
2307 self.note_table_write(&query.table);
2308 }
2309
2310 Ok(RuntimeQueryResult::dml_result(
2311 raw_query.to_string(),
2312 affected,
2313 "delete",
2314 "runtime-dml",
2315 ))
2316 }
2317}
2318
2319fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2325 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2326 return Ok(());
2327 }
2328 for (column, _) in &query.assignment_exprs {
2329 if is_immutable_graph_identity_field(column) {
2330 return Err(RedDBError::Query(format!(
2331 "immutable graph field '{column}' cannot be updated"
2332 )));
2333 }
2334 }
2335 Ok(())
2336}
2337
2338fn ensure_graph_identity_update_allowed(
2339 entity: &UnifiedEntity,
2340 compiled_plan: &CompiledUpdatePlan,
2341 assignments: &MaterializedUpdateAssignments,
2342) -> RedDBResult<()> {
2343 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2344 return Ok(());
2345 }
2346
2347 for (column, _) in compiled_plan
2348 .static_field_assignments
2349 .iter()
2350 .chain(assignments.dynamic_field_assignments.iter())
2351 {
2352 if is_immutable_graph_identity_field(column) {
2353 return Err(RedDBError::Query(format!(
2354 "immutable graph field '{column}' cannot be updated"
2355 )));
2356 }
2357 }
2358
2359 Ok(())
2360}
2361
2362fn is_immutable_graph_identity_field(column: &str) -> bool {
2363 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2364 .iter()
2365 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2366}
2367
2368fn build_patch_operations_from_materialized_assignments(
2369 entity: &UnifiedEntity,
2370 compiled_plan: &CompiledUpdatePlan,
2371 assignments: MaterializedUpdateAssignments,
2372) -> Vec<PatchEntityOperation> {
2373 let mut operations = Vec::with_capacity(
2374 compiled_plan.static_field_assignments.len()
2375 + compiled_plan.static_metadata_assignments.len()
2376 + assignments.dynamic_field_assignments.len()
2377 + assignments.dynamic_metadata_assignments.len(),
2378 );
2379
2380 for (column, value) in &compiled_plan.static_field_assignments {
2381 operations.push(PatchEntityOperation {
2382 op: PatchEntityOperationType::Set,
2383 path: update_patch_path_for_entity(entity, column),
2384 value: Some(storage_value_to_json(value)),
2385 });
2386 }
2387
2388 for (column, value) in assignments.dynamic_field_assignments {
2389 operations.push(PatchEntityOperation {
2390 op: PatchEntityOperationType::Set,
2391 path: update_patch_path_for_entity(entity, &column),
2392 value: Some(storage_value_to_json(&value)),
2393 });
2394 }
2395
2396 for (key, value) in &compiled_plan.static_metadata_assignments {
2397 operations.push(PatchEntityOperation {
2398 op: PatchEntityOperationType::Set,
2399 path: vec!["metadata".to_string(), key.clone()],
2400 value: Some(metadata_value_to_json(value)),
2401 });
2402 }
2403
2404 for (key, value) in assignments.dynamic_metadata_assignments {
2405 operations.push(PatchEntityOperation {
2406 op: PatchEntityOperationType::Set,
2407 path: vec!["metadata".to_string(), key],
2408 value: Some(metadata_value_to_json(&value)),
2409 });
2410 }
2411
2412 operations
2413}
2414
2415fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2416 if matches!(
2417 (&entity.kind, &entity.data),
2418 (
2419 crate::storage::EntityKind::GraphNode(_),
2420 EntityData::Node(_)
2421 )
2422 ) && column.eq_ignore_ascii_case("node_type")
2423 {
2424 return vec!["node_type".to_string()];
2425 }
2426 if matches!(
2427 (&entity.kind, &entity.data),
2428 (
2429 crate::storage::EntityKind::GraphEdge(_),
2430 EntityData::Edge(_)
2431 )
2432 ) && column.eq_ignore_ascii_case("weight")
2433 {
2434 return vec!["weight".to_string()];
2435 }
2436 vec!["fields".to_string(), column.to_string()]
2437}
2438
2439fn delete_to_select_sql(sql: &str) -> Option<String> {
2449 let trimmed = sql.trim_start();
2450 let lowered = trimmed.to_ascii_lowercase();
2451 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2452 return None;
2453 }
2454 let from_idx = lowered.find(" from ")?;
2456 let after_from = &trimmed[from_idx + " from ".len()..];
2457 let after_from_lc = &lowered[from_idx + " from ".len()..];
2458
2459 let mut body = after_from.to_string();
2464 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2465 body.truncate(pos);
2466 }
2467 Some(format!("SELECT * FROM {}", body.trim_end()))
2468}
2469
2470fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2475 let bytes = haystack.as_bytes();
2476 let nlen = needle.len();
2477 let mut i = 0usize;
2478 let mut in_string = false;
2479 while i < bytes.len() {
2480 let c = bytes[i];
2481 if c == b'\'' {
2482 in_string = !in_string;
2483 i += 1;
2484 continue;
2485 }
2486 if !in_string
2487 && i + nlen <= bytes.len()
2488 && &bytes[i..i + nlen] == needle.as_bytes()
2489 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2490 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2491 {
2492 return Some(i);
2493 }
2494 i += 1;
2495 }
2496 None
2497}
2498
2499fn build_returning_result(
2506 items: &[ReturningItem],
2507 snapshots: &[Vec<(String, Value)>],
2508 outputs: Option<&[CreateEntityOutput]>,
2509) -> UnifiedResult {
2510 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2511 let public_item_outputs = outputs.is_some_and(|outs| {
2512 outs.first()
2513 .and_then(|out| out.entity.as_ref())
2514 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2515 });
2516
2517 let mut columns: Vec<String> = if project_all {
2518 let mut cols: Vec<String> = Vec::new();
2519 if public_item_outputs {
2520 cols.extend(
2521 [
2522 "rid",
2523 "collection",
2524 "kind",
2525 "tenant",
2526 "created_at",
2527 "updated_at",
2528 ]
2529 .into_iter()
2530 .map(str::to_string),
2531 );
2532 } else if outputs.is_some() {
2533 cols.push("red_entity_id".to_string());
2534 }
2535 if let Some(first) = snapshots.first() {
2536 for (name, _) in first {
2537 cols.push(name.clone());
2538 }
2539 }
2540 cols
2541 } else {
2542 items
2543 .iter()
2544 .filter_map(|it| match it {
2545 ReturningItem::Column(c) => Some(c.clone()),
2546 ReturningItem::All => None,
2547 })
2548 .collect()
2549 };
2550 {
2552 let mut seen = std::collections::HashSet::new();
2553 columns.retain(|c| seen.insert(c.clone()));
2554 }
2555
2556 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2557 for (idx, snap) in snapshots.iter().enumerate() {
2558 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2559 if let Some(outs) = outputs {
2560 if let Some(out) = outs.get(idx) {
2561 if let Some(entity) = out.entity.as_ref() {
2562 if let Some(kind) = public_returning_item_kind(entity) {
2563 values.insert(
2564 Arc::clone(&sys_key_rid()),
2565 Value::UnsignedInteger(out.id.raw()),
2566 );
2567 values.insert(
2568 Arc::clone(&sys_key_collection()),
2569 Value::text(entity.kind.collection().to_string()),
2570 );
2571 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2572 values.insert(
2573 Arc::clone(&sys_key_created_at()),
2574 Value::UnsignedInteger(entity.created_at),
2575 );
2576 values.insert(
2577 Arc::clone(&sys_key_updated_at()),
2578 Value::UnsignedInteger(entity.updated_at),
2579 );
2580 values.insert(
2585 Arc::clone(&sys_key_red_entity_id()),
2586 Value::UnsignedInteger(out.id.raw()),
2587 );
2588 } else {
2589 values.insert(
2590 Arc::clone(&sys_key_red_entity_id()),
2591 Value::Integer(out.id.raw() as i64),
2592 );
2593 }
2594 } else {
2595 values.insert(
2596 Arc::clone(&sys_key_red_entity_id()),
2597 Value::Integer(out.id.raw() as i64),
2598 );
2599 }
2600 }
2601 }
2602 for (name, val) in snap {
2603 values.insert(Arc::from(name.as_str()), val.clone());
2604 }
2605 if !values.contains_key("tenant") {
2606 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2607 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2608 }
2609 let mut rec = UnifiedRecord::default();
2610 for col in &columns {
2612 if let Some(v) = values.get(col.as_str()) {
2613 rec.set_arc(Arc::from(col.as_str()), v.clone());
2614 }
2615 }
2616 records.push(rec);
2617 }
2618
2619 UnifiedResult {
2620 columns,
2621 records,
2622 stats: Default::default(),
2623 pre_serialized_json: None,
2624 }
2625}
2626
2627fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2628 match (&entity.kind, &entity.data) {
2629 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2630 Some("node")
2631 }
2632 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2633 Some("edge")
2634 }
2635 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2636 _ => None,
2637 }
2638}
2639
2640fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2641 let Some(row) = entity.data.as_row() else {
2642 return "row";
2643 };
2644
2645 let is_kv = row.named.as_ref().is_some_and(|named| {
2646 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2647 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2648 });
2649 if is_kv {
2650 return "kv";
2651 }
2652
2653 let is_document = row
2654 .named
2655 .as_ref()
2656 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2657 || row.columns.iter().any(runtime_returning_documentish_value);
2658 if is_document {
2659 "document"
2660 } else {
2661 "row"
2662 }
2663}
2664
2665fn runtime_returning_documentish_value(value: &Value) -> bool {
2666 matches!(value, Value::Json(_) | Value::Blob(_))
2667}
2668
2669fn row_insert_returning_snapshots(
2670 outputs: &[CreateEntityOutput],
2671 fallback: Vec<Vec<(String, Value)>>,
2672) -> Vec<Vec<(String, Value)>> {
2673 outputs
2674 .iter()
2675 .enumerate()
2676 .map(|(idx, out)| {
2677 out.entity
2678 .as_ref()
2679 .map(entity_row_fields_snapshot)
2680 .filter(|snap| !snap.is_empty())
2681 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2682 })
2683 .collect()
2684}
2685
2686fn graph_insert_returning_snapshots(
2687 store: &crate::storage::unified::UnifiedStore,
2688 collection: &str,
2689 ids: &[EntityId],
2690) -> Vec<Vec<(String, Value)>> {
2691 let Some(manager) = store.get_collection(collection) else {
2692 return Vec::new();
2693 };
2694
2695 ids.iter()
2696 .filter_map(|id| manager.get(*id))
2697 .filter_map(|entity| {
2698 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2699 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2700 Some(record)
2701 })
2702 .map(|record| {
2703 record
2704 .iter_fields()
2705 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2706 .collect()
2707 })
2708 .collect()
2709}
2710
2711fn graph_update_returning_snapshots(
2712 runtime: &RedDBRuntime,
2713 collection: &str,
2714 ids: &[EntityId],
2715) -> Vec<Vec<(String, Value)>> {
2716 let store = runtime.db().store();
2717 let Some(manager) = store.get_collection(collection) else {
2718 return Vec::new();
2719 };
2720
2721 manager
2722 .get_many(ids)
2723 .into_iter()
2724 .flatten()
2725 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2726 .map(|record| {
2727 record
2728 .iter_fields()
2729 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2730 .collect()
2731 })
2732 .collect()
2733}
2734
2735fn ensure_update_target_contract(
2736 runtime: &RedDBRuntime,
2737 collection: &str,
2738 target: UpdateTarget,
2739) -> RedDBResult<()> {
2740 let Some(contract) = runtime.db().collection_contract(collection) else {
2741 return Ok(());
2742 };
2743 if update_target_contract_is_advisory(&contract)
2744 || update_target_allows_model(contract.declared_model, update_target_model(target))
2745 {
2746 return Ok(());
2747 }
2748 Err(RedDBError::InvalidOperation(format!(
2749 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2750 collection,
2751 update_model_name(contract.declared_model),
2752 update_model_name(update_target_model(target))
2753 )))
2754}
2755
2756fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2757 matches!(
2758 (&contract.origin, &contract.schema_mode),
2759 (
2760 crate::physical::ContractOrigin::Implicit,
2761 crate::catalog::SchemaMode::Dynamic,
2762 )
2763 )
2764}
2765
2766fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2767 match target {
2768 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2769 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2770 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2771 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2772 }
2773}
2774
2775fn update_target_allows_model(
2776 declared_model: crate::catalog::CollectionModel,
2777 requested_model: crate::catalog::CollectionModel,
2778) -> bool {
2779 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2780}
2781
2782fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2783 match model {
2784 crate::catalog::CollectionModel::Table => "table",
2785 crate::catalog::CollectionModel::Document => "document",
2786 crate::catalog::CollectionModel::Graph => "graph",
2787 crate::catalog::CollectionModel::Vector => "vector",
2788 crate::catalog::CollectionModel::Hll => "hll",
2789 crate::catalog::CollectionModel::Sketch => "sketch",
2790 crate::catalog::CollectionModel::Filter => "filter",
2791 crate::catalog::CollectionModel::Kv => "kv",
2792 crate::catalog::CollectionModel::Config => "config",
2793 crate::catalog::CollectionModel::Vault => "vault",
2794 crate::catalog::CollectionModel::Mixed => "mixed",
2795 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2796 crate::catalog::CollectionModel::Queue => "queue",
2797 crate::catalog::CollectionModel::Metrics => "metrics",
2798 }
2799}
2800
2801fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2802 let db = runtime.db();
2803 if let Some(contract) = db.collection_contract(collection) {
2804 let advisory_implicit_dynamic = matches!(
2805 (&contract.origin, &contract.schema_mode),
2806 (
2807 crate::physical::ContractOrigin::Implicit,
2808 crate::catalog::SchemaMode::Dynamic,
2809 )
2810 );
2811 if advisory_implicit_dynamic
2812 || matches!(
2813 contract.declared_model,
2814 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2815 )
2816 {
2817 return Ok(());
2818 }
2819 return Err(RedDBError::InvalidOperation(format!(
2820 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2821 collection, contract.declared_model
2822 )));
2823 }
2824
2825 let now = std::time::SystemTime::now()
2826 .duration_since(std::time::UNIX_EPOCH)
2827 .unwrap_or_default()
2828 .as_millis();
2829 db.save_collection_contract(crate::physical::CollectionContract {
2830 name: collection.to_string(),
2831 declared_model: crate::catalog::CollectionModel::Graph,
2832 schema_mode: crate::catalog::SchemaMode::Dynamic,
2833 origin: crate::physical::ContractOrigin::Implicit,
2834 version: 1,
2835 created_at_unix_ms: now,
2836 updated_at_unix_ms: now,
2837 default_ttl_ms: db.collection_default_ttl_ms(collection),
2838 vector_dimension: None,
2839 vector_metric: None,
2840 context_index_fields: Vec::new(),
2841 declared_columns: Vec::new(),
2842 table_def: None,
2843 timestamps_enabled: false,
2844 context_index_enabled: false,
2845 metrics_raw_retention_ms: None,
2846 metrics_rollup_policies: Vec::new(),
2847 metrics_tenant_identity: None,
2848 metrics_namespace: None,
2849 append_only: false,
2850 subscriptions: Vec::new(),
2851 analytics_config: Vec::new(),
2852 session_key: None,
2853 session_gap_ms: None,
2854 retention_duration_ms: None,
2855 analytical_storage: None,
2856 })
2857 .map(|_| ())
2858 .map_err(|err| RedDBError::Internal(err.to_string()))
2859}
2860
2861fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2862 query
2863 .assignment_exprs
2864 .iter()
2865 .enumerate()
2866 .any(|(idx, (column, expr))| {
2867 query
2868 .compound_assignment_ops
2869 .get(idx)
2870 .is_some_and(|op| op.is_some())
2871 || expr_references_update_column(expr, &query.table, column)
2872 })
2873}
2874
2875fn evaluate_compound_update_assignment(
2876 column: &str,
2877 record: &UnifiedRecord,
2878 op: BinOp,
2879 rhs: Value,
2880) -> RedDBResult<Value> {
2881 let lhs = record.get(column).ok_or_else(|| {
2882 RedDBError::Query(format!(
2883 "compound assignment requires existing numeric field '{column}'"
2884 ))
2885 })?;
2886 if matches!(lhs, Value::Null) {
2887 return Err(RedDBError::Query(format!(
2888 "compound assignment requires non-null numeric field '{column}'"
2889 )));
2890 }
2891 apply_compound_numeric_op(column, op, lhs, &rhs)
2892}
2893
2894fn apply_compound_numeric_op(
2895 column: &str,
2896 op: BinOp,
2897 lhs: &Value,
2898 rhs: &Value,
2899) -> RedDBResult<Value> {
2900 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2901 return Err(RedDBError::Query(format!(
2902 "compound assignment requires numeric field '{column}'"
2903 )));
2904 };
2905 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2906 return Err(RedDBError::Query(format!(
2907 "compound assignment requires numeric right-hand value for field '{column}'"
2908 )));
2909 };
2910
2911 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2912 let a = lhs_number.as_f64();
2913 let b = rhs_number.as_f64();
2914 let out = match op {
2915 BinOp::Add => a + b,
2916 BinOp::Sub => a - b,
2917 BinOp::Mul => a * b,
2918 BinOp::Div => {
2919 if b == 0.0 {
2920 return Err(RedDBError::Query(format!(
2921 "division by zero in compound assignment for field '{column}'"
2922 )));
2923 }
2924 a / b
2925 }
2926 BinOp::Mod => {
2927 if b == 0.0 {
2928 return Err(RedDBError::Query(format!(
2929 "modulo by zero in compound assignment for field '{column}'"
2930 )));
2931 }
2932 a % b
2933 }
2934 _ => {
2935 return Err(RedDBError::Query(format!(
2936 "unsupported compound assignment operator for field '{column}'"
2937 )));
2938 }
2939 };
2940 if !out.is_finite() {
2941 return Err(RedDBError::Query(format!(
2942 "numeric overflow in compound assignment for field '{column}'"
2943 )));
2944 }
2945 return Ok(Value::Float(out));
2946 }
2947
2948 let a = lhs_number.as_i128();
2949 let b = rhs_number.as_i128();
2950 let out = match op {
2951 BinOp::Add => a.checked_add(b),
2952 BinOp::Sub => a.checked_sub(b),
2953 BinOp::Mul => a.checked_mul(b),
2954 BinOp::Mod => {
2955 if b == 0 {
2956 return Err(RedDBError::Query(format!(
2957 "modulo by zero in compound assignment for field '{column}'"
2958 )));
2959 }
2960 a.checked_rem(b)
2961 }
2962 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2963 _ => None,
2964 }
2965 .ok_or_else(|| {
2966 RedDBError::Query(format!(
2967 "numeric overflow in compound assignment for field '{column}'"
2968 ))
2969 })?;
2970
2971 if matches!(lhs, Value::UnsignedInteger(_)) {
2972 let value = u64::try_from(out).map_err(|_| {
2973 RedDBError::Query(format!(
2974 "numeric overflow in compound assignment for field '{column}'"
2975 ))
2976 })?;
2977 Ok(Value::UnsignedInteger(value))
2978 } else {
2979 let value = i64::try_from(out).map_err(|_| {
2980 RedDBError::Query(format!(
2981 "numeric overflow in compound assignment for field '{column}'"
2982 ))
2983 })?;
2984 Ok(Value::Integer(value))
2985 }
2986}
2987
2988#[derive(Clone, Copy)]
2989enum CompoundNumber {
2990 Integer(i128),
2991 Float(f64),
2992}
2993
2994impl CompoundNumber {
2995 fn from_value(value: &Value) -> Option<Self> {
2996 match value {
2997 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2998 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2999 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
3000 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
3001 _ => None,
3002 }
3003 }
3004
3005 fn is_float(self) -> bool {
3006 matches!(self, Self::Float(_))
3007 }
3008
3009 fn as_f64(self) -> f64 {
3010 match self {
3011 Self::Integer(value) => value as f64,
3012 Self::Float(value) => value,
3013 }
3014 }
3015
3016 fn as_i128(self) -> i128 {
3017 match self {
3018 Self::Integer(value) => value,
3019 Self::Float(_) => unreachable!("float compound number used as integer"),
3020 }
3021 }
3022}
3023
3024fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3025 match expr {
3026 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3027 Expr::Column { field, .. } => {
3028 field_ref_matches_update_column(field, table_name, target_column)
3029 }
3030 Expr::BinaryOp { lhs, rhs, .. } => {
3031 expr_references_update_column(lhs, table_name, target_column)
3032 || expr_references_update_column(rhs, table_name, target_column)
3033 }
3034 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3035 expr_references_update_column(operand, table_name, target_column)
3036 }
3037 Expr::FunctionCall { args, .. } => args
3038 .iter()
3039 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3040 Expr::Case {
3041 branches, else_, ..
3042 } => {
3043 branches.iter().any(|(cond, value)| {
3044 expr_references_update_column(cond, table_name, target_column)
3045 || expr_references_update_column(value, table_name, target_column)
3046 }) || else_
3047 .as_deref()
3048 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3049 }
3050 Expr::IsNull { operand, .. } => {
3051 expr_references_update_column(operand, table_name, target_column)
3052 }
3053 Expr::InList { target, values, .. } => {
3054 expr_references_update_column(target, table_name, target_column)
3055 || values
3056 .iter()
3057 .any(|value| expr_references_update_column(value, table_name, target_column))
3058 }
3059 Expr::Between {
3060 target, low, high, ..
3061 } => {
3062 expr_references_update_column(target, table_name, target_column)
3063 || expr_references_update_column(low, table_name, target_column)
3064 || expr_references_update_column(high, table_name, target_column)
3065 }
3066 Expr::WindowFunctionCall { args, window, .. } => {
3067 args.iter()
3068 .any(|arg| expr_references_update_column(arg, table_name, target_column))
3069 || window
3070 .partition_by
3071 .iter()
3072 .any(|e| expr_references_update_column(e, table_name, target_column))
3073 || window
3074 .order_by
3075 .iter()
3076 .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3077 }
3078 }
3079}
3080
3081fn field_ref_matches_update_column(
3082 field: &FieldRef,
3083 table_name: &str,
3084 target_column: &str,
3085) -> bool {
3086 match field {
3087 FieldRef::TableColumn { table, column } => {
3088 column.eq_ignore_ascii_case(target_column)
3089 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3090 }
3091 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3092 false
3093 }
3094 }
3095}
3096
3097fn resolve_update_entity_by_logical_id(
3098 runtime: &RedDBRuntime,
3099 table: &str,
3100 logical_id: EntityId,
3101) -> Option<UnifiedEntity> {
3102 let store = runtime.inner.db.store();
3103 if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3104 return Some(entity);
3105 }
3106 store.get(table, logical_id)
3109}
3110
3111fn update_cdc_item_kind(
3112 runtime: &RedDBRuntime,
3113 collection: &str,
3114 entity: &UnifiedEntity,
3115) -> &'static str {
3116 match &entity.data {
3117 EntityData::Node(_) => return "node",
3118 EntityData::Edge(_) => return "edge",
3119 _ => {}
3120 }
3121
3122 match runtime
3123 .db()
3124 .collection_contract(collection)
3125 .map(|contract| contract.declared_model)
3126 {
3127 Some(crate::catalog::CollectionModel::Document) => "document",
3128 Some(crate::catalog::CollectionModel::Kv)
3129 | Some(crate::catalog::CollectionModel::Vault) => "kv",
3130 _ => "row",
3131 }
3132}
3133
3134fn ordered_update_target_ids(
3135 manager: &Arc<crate::storage::SegmentManager>,
3136 entity_ids: &[EntityId],
3137 order_by: &[OrderByClause],
3138 limit: Option<usize>,
3139) -> Vec<EntityId> {
3140 let mut entities: Vec<UnifiedEntity> =
3141 manager.get_many(entity_ids).into_iter().flatten().collect();
3142 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3143 if let Some(limit) = limit {
3144 entities.truncate(limit);
3145 }
3146 entities.into_iter().map(|entity| entity.id).collect()
3147}
3148
3149fn compare_update_order(
3150 left: &UnifiedEntity,
3151 right: &UnifiedEntity,
3152 order_by: &[OrderByClause],
3153) -> Ordering {
3154 for clause in order_by {
3155 let left_value = update_order_value(left, &clause.field);
3156 let right_value = update_order_value(right, &clause.field);
3157 let ordering = compare_update_order_values(
3158 left_value.as_ref(),
3159 right_value.as_ref(),
3160 clause.nulls_first,
3161 );
3162 if ordering != Ordering::Equal {
3163 return if clause.ascending {
3164 ordering
3165 } else {
3166 ordering.reverse()
3167 };
3168 }
3169 }
3170 left.logical_id().raw().cmp(&right.logical_id().raw())
3171}
3172
3173fn compare_update_order_values(
3174 left: Option<&Value>,
3175 right: Option<&Value>,
3176 nulls_first: bool,
3177) -> Ordering {
3178 match (left, right) {
3179 (None, None) => Ordering::Equal,
3180 (None, Some(_)) => {
3181 if nulls_first {
3182 Ordering::Less
3183 } else {
3184 Ordering::Greater
3185 }
3186 }
3187 (Some(_), None) => {
3188 if nulls_first {
3189 Ordering::Greater
3190 } else {
3191 Ordering::Less
3192 }
3193 }
3194 (Some(left), Some(right)) => {
3195 crate::storage::query::value_compare::total_compare_values(left, right)
3196 }
3197 }
3198}
3199
3200fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3201 let FieldRef::TableColumn { table, column } = field else {
3202 return None;
3203 };
3204 if !table.is_empty() {
3205 return None;
3206 }
3207 if column.eq_ignore_ascii_case("rid") {
3208 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3209 }
3210 match &entity.data {
3211 EntityData::Row(row) => row.get_field(column).cloned(),
3212 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3213 .and_then(|record| record.get(column).cloned()),
3214 _ => None,
3215 }
3216}
3217
3218fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3219 if columns.is_empty() {
3220 return columns;
3221 }
3222
3223 let mut unique = Vec::with_capacity(columns.len());
3224 for column in columns.drain(..) {
3225 if !unique
3226 .iter()
3227 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3228 {
3229 unique.push(column);
3230 }
3231 }
3232 unique
3233}
3234
3235const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3240
3241fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3242 if column.eq_ignore_ascii_case("_ttl") {
3243 Some(SQL_TTL_METADATA_COLUMNS[0])
3244 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3245 Some(SQL_TTL_METADATA_COLUMNS[1])
3246 } else if column.eq_ignore_ascii_case("_expires_at") {
3247 Some(SQL_TTL_METADATA_COLUMNS[2])
3248 } else {
3249 None
3250 }
3251}
3252
3253fn canonicalize_sql_ttl_metadata(
3258 key: &'static str,
3259 value: MetadataValue,
3260) -> (&'static str, MetadataValue) {
3261 if key != "_ttl" {
3262 return (key, value);
3263 }
3264 let scaled = match value {
3265 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3266 MetadataValue::Timestamp(ms_or_s) => {
3267 MetadataValue::Timestamp(ms_or_s)
3270 }
3271 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3272 other => other,
3273 };
3274 ("_ttl_ms", scaled)
3275}
3276
3277pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3281
3282impl RedDBRuntime {
3283 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3289 match value {
3290 Value::Password(marked) => {
3291 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3292 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3293 } else {
3294 Ok(Value::Password(marked))
3295 }
3296 }
3297 Value::Secret(bytes) => {
3298 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3299 if !self.secret_auto_encrypt() {
3300 return Err(RedDBError::Query(
3301 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3302 is false. Insert pre-encrypted bytes directly instead."
3303 .to_string(),
3304 ));
3305 }
3306 let key = self.secret_aes_key().ok_or_else(|| {
3307 RedDBError::Query(
3308 "SECRET() column encryption requires a bootstrapped \
3309 vault (red.secret.aes_key is missing). Start the server \
3310 with --vault to enable."
3311 .to_string(),
3312 )
3313 })?;
3314 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3315 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3316 } else {
3317 Ok(Value::Secret(bytes))
3318 }
3319 }
3320 other => Ok(other),
3321 }
3322 }
3323}
3324
3325fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3328 let nonce_bytes = crate::auth::store::random_bytes(12);
3329 let mut nonce = [0u8; 12];
3330 nonce.copy_from_slice(&nonce_bytes[..12]);
3331 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3332 let mut out = Vec::with_capacity(12 + ct.len());
3333 out.extend_from_slice(&nonce);
3334 out.extend_from_slice(&ct);
3335 out
3336}
3337
3338pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3342 if payload.len() < 12 {
3343 return None;
3344 }
3345 let mut nonce = [0u8; 12];
3346 nonce.copy_from_slice(&payload[..12]);
3347 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3348}
3349
3350fn split_insert_metadata(
3351 runtime: &RedDBRuntime,
3352 columns: &[String],
3353 values: &[Value],
3354) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3355 let mut fields = Vec::new();
3356 let mut metadata = Vec::new();
3357
3358 for (column, value) in columns.iter().zip(values.iter()) {
3359 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3361 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3362 let (canonical_key, canonical_value) =
3363 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3364 metadata.push((canonical_key.to_string(), canonical_value));
3365 continue;
3366 }
3367 fields.push((
3368 column.clone(),
3369 runtime.resolve_crypto_sentinel(value.clone())?,
3370 ));
3371 }
3372
3373 Ok((fields, metadata))
3374}
3375
3376fn merge_with_clauses(
3378 metadata: &mut Vec<(String, MetadataValue)>,
3379 ttl_ms: Option<u64>,
3380 expires_at_ms: Option<u64>,
3381 with_metadata: &[(String, Value)],
3382) {
3383 if let Some(ms) = ttl_ms {
3384 metadata.push((
3385 "_ttl_ms".to_string(),
3386 if ms <= i64::MAX as u64 {
3387 MetadataValue::Int(ms as i64)
3388 } else {
3389 MetadataValue::Timestamp(ms)
3390 },
3391 ));
3392 }
3393 if let Some(ms) = expires_at_ms {
3394 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3395 }
3396 for (key, value) in with_metadata {
3397 let meta_value = match value {
3398 Value::Text(s) => MetadataValue::String(s.to_string()),
3399 Value::Integer(n) => MetadataValue::Int(*n),
3400 Value::Float(n) => MetadataValue::Float(*n),
3401 Value::Boolean(b) => MetadataValue::Bool(*b),
3402 _ => MetadataValue::String(value.to_string()),
3403 };
3404 metadata.push((key.clone(), meta_value));
3405 }
3406}
3407
3408fn merge_vector_metadata_column(
3409 metadata: &mut Vec<(String, MetadataValue)>,
3410 columns: &[String],
3411 values: &[Value],
3412) -> RedDBResult<()> {
3413 let Some(value) = columns
3414 .iter()
3415 .position(|column| column.eq_ignore_ascii_case("metadata"))
3416 .map(|index| &values[index])
3417 else {
3418 return Ok(());
3419 };
3420 let json = match value {
3421 Value::Null => return Ok(()),
3422 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3423 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3424 })?,
3425 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3426 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3427 })?,
3428 other => {
3429 return Err(RedDBError::Query(format!(
3430 "column 'metadata' expected JSON object, got {other:?}"
3431 )))
3432 }
3433 };
3434 let parsed = metadata_from_json(&json)?;
3435 for (key, value) in parsed.iter() {
3436 metadata.push((key.clone(), value.clone()));
3437 }
3438 Ok(())
3439}
3440
3441fn apply_collection_default_ttl_metadata(
3442 runtime: &RedDBRuntime,
3443 collection: &str,
3444 metadata: &mut Vec<(String, MetadataValue)>,
3445) {
3446 if has_internal_ttl_metadata(metadata) {
3447 return;
3448 }
3449
3450 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3451 return;
3452 };
3453
3454 metadata.push((
3455 "_ttl_ms".to_string(),
3456 if default_ttl_ms <= i64::MAX as u64 {
3457 MetadataValue::Int(default_ttl_ms as i64)
3458 } else {
3459 MetadataValue::Timestamp(default_ttl_ms)
3460 },
3461 ));
3462}
3463
3464fn ensure_non_tree_reserved_metadata_entries(
3465 metadata: &[(String, MetadataValue)],
3466) -> RedDBResult<()> {
3467 for (key, _) in metadata {
3468 ensure_non_tree_reserved_metadata_key(key)?;
3469 }
3470 Ok(())
3471}
3472
3473fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3474 if key.starts_with(TREE_METADATA_PREFIX) {
3475 return Err(RedDBError::Query(format!(
3476 "metadata key '{}' is reserved for managed trees",
3477 key
3478 )));
3479 }
3480 Ok(())
3481}
3482
3483fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3484 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3485 return Err(RedDBError::Query(format!(
3486 "edge label '{}' is reserved for managed trees",
3487 TREE_CHILD_EDGE_LABEL
3488 )));
3489 }
3490 Ok(())
3491}
3492
3493fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3494 let mut columns = Vec::with_capacity(pairs.len());
3495 let mut values = Vec::with_capacity(pairs.len());
3496
3497 for (column, value) in pairs {
3498 columns.push(column.clone());
3499 values.push(value.clone());
3500 }
3501
3502 (columns, values)
3503}
3504
3505fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3507 for (i, col) in columns.iter().enumerate() {
3508 if col.eq_ignore_ascii_case(name) {
3509 return Ok(values[i].clone());
3510 }
3511 }
3512 Err(RedDBError::Query(format!(
3513 "required column '{name}' not found in INSERT"
3514 )))
3515}
3516
3517fn find_column_value_string(
3519 columns: &[String],
3520 values: &[Value],
3521 name: &str,
3522) -> RedDBResult<String> {
3523 let val = find_column_value(columns, values, name)?;
3524 match val {
3525 Value::Text(s) => Ok(s.to_string()),
3526 Value::Integer(n) => Ok(n.to_string()),
3527 Value::Float(n) => Ok(n.to_string()),
3528 other => Err(RedDBError::Query(format!(
3529 "column '{name}' expected text, got {other:?}"
3530 ))),
3531 }
3532}
3533
3534fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3535 let val = find_column_value(columns, values, name)?;
3536 match val {
3537 Value::Float(n) => Ok(n),
3538 Value::Integer(n) => Ok(n as f64),
3539 Value::UnsignedInteger(n) => Ok(n as f64),
3540 Value::Text(s) => s
3541 .parse::<f64>()
3542 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3543 other => Err(RedDBError::Query(format!(
3544 "column '{name}' expected number, got {other:?}"
3545 ))),
3546 }
3547}
3548
3549fn find_column_value_opt_string(
3551 columns: &[String],
3552 values: &[Value],
3553 name: &str,
3554) -> Option<String> {
3555 for (i, col) in columns.iter().enumerate() {
3556 if col.eq_ignore_ascii_case(name) {
3557 return match &values[i] {
3558 Value::Null => None,
3559 Value::Text(s) => Some(s.to_string()),
3560 Value::Integer(n) => Some(n.to_string()),
3561 Value::Float(n) => Some(n.to_string()),
3562 _ => None,
3563 };
3564 }
3565 }
3566 None
3567}
3568
3569fn resolve_edge_endpoint(
3576 store: &crate::storage::unified::UnifiedStore,
3577 collection: &str,
3578 columns: &[String],
3579 values: &[Value],
3580 name: &str,
3581) -> RedDBResult<u64> {
3582 let val = find_column_value(columns, values, name)?;
3583 match val {
3584 Value::Integer(n) => Ok(n as u64),
3585 Value::UnsignedInteger(n) => Ok(n),
3586 Value::Text(s) => {
3587 if let Ok(n) = s.parse::<u64>() {
3588 return Ok(n);
3589 }
3590 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3591 match matches.len() {
3592 0 => Err(RedDBError::Query(format!(
3593 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3594 ))),
3595 1 => Ok(matches[0].raw()),
3596 n => Err(RedDBError::Query(format!(
3597 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3598 ))),
3599 }
3600 }
3601 other => Err(RedDBError::Query(format!(
3602 "column '{name}' expected integer or node label, got {other:?}"
3603 ))),
3604 }
3605}
3606
3607fn resolve_edge_endpoint_any(
3608 store: &crate::storage::unified::UnifiedStore,
3609 collection: &str,
3610 columns: &[String],
3611 values: &[Value],
3612 names: &[&str],
3613) -> RedDBResult<u64> {
3614 for name in names {
3615 if columns
3616 .iter()
3617 .any(|column| column.eq_ignore_ascii_case(name))
3618 {
3619 return resolve_edge_endpoint(store, collection, columns, values, name);
3620 }
3621 }
3622
3623 Err(RedDBError::Query(format!(
3624 "required column '{}' not found in INSERT",
3625 names.first().copied().unwrap_or("from_rid")
3626 )))
3627}
3628
3629fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3631 let val = find_column_value(columns, values, name)?;
3632 match val {
3633 Value::Integer(n) => Ok(n as u64),
3634 Value::UnsignedInteger(n) => Ok(n),
3635 Value::Text(s) => s
3636 .parse::<u64>()
3637 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3638 other => Err(RedDBError::Query(format!(
3639 "column '{name}' expected integer, got {other:?}"
3640 ))),
3641 }
3642}
3643
3644fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3646 for (i, col) in columns.iter().enumerate() {
3647 if col.eq_ignore_ascii_case(name) {
3648 return match &values[i] {
3649 Value::Float(n) => Some(*n as f32),
3650 Value::Integer(n) => Some(*n as f32),
3651 Value::Null => None,
3652 _ => None,
3653 };
3654 }
3655 }
3656 None
3657}
3658
3659fn find_column_value_vec_f32(
3661 columns: &[String],
3662 values: &[Value],
3663 name: &str,
3664) -> RedDBResult<Vec<f32>> {
3665 let val = find_column_value(columns, values, name)?;
3666 match val {
3667 Value::Vector(v) => Ok(v),
3668 Value::Json(bytes) => {
3669 let s = std::str::from_utf8(&bytes).map_err(|_| {
3671 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3672 })?;
3673 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3674 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3675 })?;
3676 Ok(arr)
3677 }
3678 other => Err(RedDBError::Query(format!(
3679 "column '{name}' expected vector, got {other:?}"
3680 ))),
3681 }
3682}
3683
3684fn find_column_value_vec_f32_any(
3685 columns: &[String],
3686 values: &[Value],
3687 names: &[&str],
3688) -> RedDBResult<Vec<f32>> {
3689 for name in names {
3690 if columns
3691 .iter()
3692 .any(|column| column.eq_ignore_ascii_case(name))
3693 {
3694 return find_column_value_vec_f32(columns, values, name);
3695 }
3696 }
3697 Err(RedDBError::Query(format!(
3698 "required vector column '{}' not found in INSERT",
3699 names.join("' or '")
3700 )))
3701}
3702
3703fn extract_remaining_properties(
3705 columns: &[String],
3706 values: &[Value],
3707 exclude: &[&str],
3708) -> Vec<(String, Value)> {
3709 columns
3710 .iter()
3711 .zip(values.iter())
3712 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3713 .map(|(col, val)| (col.clone(), val.clone()))
3714 .collect()
3715}
3716
3717fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3718 let mut invalid = Vec::new();
3719 for column in columns {
3720 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3721 invalid.push(column.clone());
3722 }
3723 }
3724
3725 if invalid.is_empty() {
3726 Ok(())
3727 } else {
3728 Err(RedDBError::Query(format!(
3729 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3730 invalid.join(", ")
3731 )))
3732 }
3733}
3734
3735fn is_timeseries_insert_column(column: &str) -> bool {
3736 matches!(
3737 column.to_ascii_lowercase().as_str(),
3738 "metric"
3739 | "value"
3740 | "tags"
3741 | "timestamp"
3742 | "timestamp_ns"
3743 | "time"
3744 | "event_name"
3749 | "payload"
3750 )
3751}
3752
3753fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3754 let mut found = None;
3755
3756 for alias in ["timestamp_ns", "timestamp", "time"] {
3757 for (index, column) in columns.iter().enumerate() {
3758 if !column.eq_ignore_ascii_case(alias) {
3759 continue;
3760 }
3761
3762 if found.is_some() {
3763 return Err(RedDBError::Query(
3764 "timeseries INSERT accepts only one timestamp column".to_string(),
3765 ));
3766 }
3767
3768 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3769 }
3770 }
3771
3772 Ok(found)
3773}
3774
3775fn find_timeseries_tags(
3776 columns: &[String],
3777 values: &[Value],
3778) -> RedDBResult<std::collections::HashMap<String, String>> {
3779 for (index, column) in columns.iter().enumerate() {
3780 if column.eq_ignore_ascii_case("tags") {
3781 return parse_timeseries_tags(&values[index]);
3782 }
3783 }
3784 Ok(std::collections::HashMap::new())
3785}
3786
3787fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3788 match value {
3789 Value::Null => Ok(std::collections::HashMap::new()),
3790 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3791 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3792 other => Err(RedDBError::Query(format!(
3793 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3794 ))),
3795 }
3796}
3797
3798fn parse_timeseries_tags_json(
3799 bytes: &[u8],
3800) -> RedDBResult<std::collections::HashMap<String, String>> {
3801 let json: crate::json::Value = crate::json::from_slice(bytes)
3802 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3803
3804 let object = match json {
3805 crate::json::Value::Object(object) => object,
3806 other => {
3807 return Err(RedDBError::Query(format!(
3808 "timeseries tags must be a JSON object, got {other:?}"
3809 )))
3810 }
3811 };
3812
3813 let mut tags = std::collections::HashMap::with_capacity(object.len());
3814 for (key, value) in object {
3815 tags.insert(key, json_tag_value_to_string(&value));
3816 }
3817 Ok(tags)
3818}
3819
3820fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3836 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3837 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3838 buf.push_str(&value.to_string_compact());
3839 buf
3840}
3841
3842fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3843 match value {
3844 Value::UnsignedInteger(value) => Ok(*value),
3845 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3846 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3847 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3848 RedDBError::Query(format!(
3849 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3850 ))
3851 }),
3852 other => Err(RedDBError::Query(format!(
3853 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3854 ))),
3855 }
3856}
3857
3858fn current_unix_ns() -> u64 {
3859 std::time::SystemTime::now()
3860 .duration_since(std::time::UNIX_EPOCH)
3861 .unwrap_or_default()
3862 .as_nanos()
3863 .min(u128::from(u64::MAX)) as u64
3864}
3865
3866fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3867 use crate::json::{Map, Value as JV};
3868 match value {
3869 MetadataValue::Null => JV::Null,
3870 MetadataValue::Bool(value) => JV::Bool(*value),
3871 MetadataValue::Int(value) => JV::Number(*value as f64),
3872 MetadataValue::Float(value) => JV::Number(*value),
3873 MetadataValue::String(value) => JV::String(value.clone()),
3874 MetadataValue::Bytes(value) => JV::Array(
3875 value
3876 .iter()
3877 .map(|value| JV::Number(*value as f64))
3878 .collect(),
3879 ),
3880 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3881 MetadataValue::Array(values) => {
3882 JV::Array(values.iter().map(metadata_value_to_json).collect())
3883 }
3884 MetadataValue::Object(object) => {
3885 let entries = object
3886 .iter()
3887 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3888 .collect();
3889 JV::Object(entries)
3890 }
3891 MetadataValue::Geo { lat, lon } => {
3892 let mut object = Map::new();
3893 object.insert("lat".to_string(), JV::Number(*lat));
3894 object.insert("lon".to_string(), JV::Number(*lon));
3895 JV::Object(object)
3896 }
3897 MetadataValue::Reference(target) => {
3898 let mut object = Map::new();
3899 object.insert(
3900 "collection".to_string(),
3901 JV::String(target.collection().to_string()),
3902 );
3903 object.insert(
3904 "entity_id".to_string(),
3905 JV::Number(target.entity_id().raw() as f64),
3906 );
3907 JV::Object(object)
3908 }
3909 MetadataValue::References(values) => {
3910 let refs = values
3911 .iter()
3912 .map(|target| {
3913 let mut object = Map::new();
3914 object.insert(
3915 "collection".to_string(),
3916 JV::String(target.collection().to_string()),
3917 );
3918 object.insert(
3919 "entity_id".to_string(),
3920 JV::Number(target.entity_id().raw() as f64),
3921 );
3922 JV::Object(object)
3923 })
3924 .collect();
3925 JV::Array(refs)
3926 }
3927 }
3928}
3929
3930fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3931 match value {
3932 Value::Null => MetadataValue::Null,
3933 Value::Boolean(value) => MetadataValue::Bool(*value),
3934 Value::Integer(value) => MetadataValue::Int(*value),
3935 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3936 Value::Float(value) => MetadataValue::Float(*value),
3937 Value::Text(value) => MetadataValue::String(value.to_string()),
3938 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3939 Value::Timestamp(value) => {
3940 if *value >= 0 {
3941 metadata_u64_to_value(*value as u64)
3942 } else {
3943 MetadataValue::Int(*value)
3944 }
3945 }
3946 Value::TimestampMs(value) => {
3947 if *value >= 0 {
3948 metadata_u64_to_value(*value as u64)
3949 } else {
3950 MetadataValue::Int(*value)
3951 }
3952 }
3953 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3954 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3955 Value::Date(value) => MetadataValue::String(value.to_string()),
3956 Value::Time(value) => MetadataValue::String(value.to_string()),
3957 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3958 Value::Ipv4(value) => MetadataValue::String(format!(
3959 "{}.{}.{}.{}",
3960 (value >> 24) & 0xFF,
3961 (value >> 16) & 0xFF,
3962 (value >> 8) & 0xFF,
3963 value & 0xFF
3964 )),
3965 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3966 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3967 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3968 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3969 lat: *lat as f64 / 1_000_000.0,
3970 lon: *lon as f64 / 1_000_000.0,
3971 },
3972 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3973 Value::TableRef(value) => MetadataValue::String(value.clone()),
3974 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3975 Value::Password(value) => MetadataValue::String(value.clone()),
3976 Value::Array(values) => {
3977 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3978 }
3979 _ => MetadataValue::String(value.to_string()),
3980 }
3981}
3982
3983fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3984 match value {
3985 Value::Null => Ok(MetadataValue::Null),
3986 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3987 Value::Integer(_) => Err(RedDBError::Query(format!(
3988 "column '{field}' must be non-negative for TTL metadata"
3989 ))),
3990 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3991 Value::Float(value) if value.is_finite() => {
3992 if value.fract().abs() >= f64::EPSILON {
3993 return Err(RedDBError::Query(format!(
3994 "column '{field}' must be an integer (TTL metadata must be an integer)"
3995 )));
3996 }
3997 if *value < 0.0 {
3998 return Err(RedDBError::Query(format!(
3999 "column '{field}' must be non-negative for TTL metadata"
4000 )));
4001 }
4002 if *value > u64::MAX as f64 {
4003 return Err(RedDBError::Query(format!(
4004 "column '{field}' value is too large"
4005 )));
4006 }
4007 Ok(metadata_u64_to_value(*value as u64))
4008 }
4009 Value::Float(_) => Err(RedDBError::Query(format!(
4010 "column '{field}' must be a finite number"
4011 ))),
4012 Value::Text(value) => {
4013 let value = value.trim();
4014 if let Ok(value) = value.parse::<u64>() {
4015 Ok(metadata_u64_to_value(value))
4016 } else if let Ok(value) = value.parse::<i64>() {
4017 if value < 0 {
4018 return Err(RedDBError::Query(format!(
4019 "column '{field}' must be non-negative for TTL metadata"
4020 )));
4021 }
4022 Ok(metadata_u64_to_value(value as u64))
4023 } else if let Ok(value) = value.parse::<f64>() {
4024 if !value.is_finite() {
4025 return Err(RedDBError::Query(format!(
4026 "column '{field}' must be a finite number"
4027 )));
4028 }
4029 if value.fract().abs() >= f64::EPSILON {
4030 return Err(RedDBError::Query(format!(
4031 "column '{field}' must be an integer (TTL metadata must be an integer)"
4032 )));
4033 }
4034 if value < 0.0 {
4035 return Err(RedDBError::Query(format!(
4036 "column '{field}' must be non-negative for TTL metadata"
4037 )));
4038 }
4039 if value > u64::MAX as f64 {
4040 return Err(RedDBError::Query(format!(
4041 "column '{field}' value is too large"
4042 )));
4043 }
4044 Ok(metadata_u64_to_value(value as u64))
4045 } else {
4046 Err(RedDBError::Query(format!(
4047 "column '{field}' expects a numeric value for TTL metadata"
4048 )))
4049 }
4050 }
4051 _ => Err(RedDBError::Query(format!(
4052 "column '{field}' expects a numeric value for TTL metadata"
4053 ))),
4054 }
4055}
4056
4057fn metadata_u64_to_value(value: u64) -> MetadataValue {
4058 if value <= i64::MAX as u64 {
4059 MetadataValue::Int(value as i64)
4060 } else {
4061 MetadataValue::Timestamp(value)
4062 }
4063}
4064
4065fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4071 let json = match value {
4072 Value::Null => return false,
4073 Value::Json(bytes) | Value::Blob(bytes) => {
4074 match crate::json::from_slice::<crate::json::Value>(bytes) {
4075 Ok(v) => v,
4076 Err(_) => return false,
4077 }
4078 }
4079 Value::Text(s) => {
4080 let trimmed = s.trim_start();
4081 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4082 return false;
4083 }
4084 match crate::json::from_str::<crate::json::Value>(s) {
4085 Ok(v) => v,
4086 Err(_) => return false,
4087 }
4088 }
4089 _ => return false,
4090 };
4091 let mut cursor = &json;
4092 for seg in tail.split('.') {
4093 match cursor {
4094 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4095 Some((_, v)) => cursor = v,
4096 None => return false,
4097 },
4098 _ => return false,
4099 }
4100 }
4101 !matches!(cursor, crate::json::Value::Null)
4102}
4103
4104fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4115 let mut root = match current {
4116 Value::Null => crate::json::Value::Object(Default::default()),
4117 Value::Json(bytes) | Value::Blob(bytes) => {
4118 crate::json::from_slice(&bytes).map_err(|err| {
4119 RedDBError::Query(format!(
4120 "tenant auto-fill: root column is not valid JSON ({err})"
4121 ))
4122 })?
4123 }
4124 Value::Text(s) => {
4125 if s.trim().is_empty() {
4126 crate::json::Value::Object(Default::default())
4127 } else {
4128 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4129 RedDBError::Query(format!(
4130 "tenant auto-fill: text root is not valid JSON ({err})"
4131 ))
4132 })?
4133 }
4134 }
4135 other => {
4136 return Err(RedDBError::Query(format!(
4137 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4138 )));
4139 }
4140 };
4141
4142 let segments: Vec<&str> = tail.split('.').collect();
4144 let mut cursor: &mut crate::json::Value = &mut root;
4145 for (i, seg) in segments.iter().enumerate() {
4146 let is_last = i + 1 == segments.len();
4147 let map = match cursor {
4148 crate::json::Value::Object(m) => m,
4149 _ => {
4150 return Err(RedDBError::Query(format!(
4151 "tenant auto-fill: segment '{seg}' is not inside an object"
4152 )));
4153 }
4154 };
4155 if is_last {
4156 map.insert(
4157 seg.to_string(),
4158 crate::json::Value::String(tenant_id.to_string()),
4159 );
4160 break;
4161 }
4162 cursor = map
4163 .entry(seg.to_string())
4164 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4165 }
4166
4167 let bytes = crate::json::to_vec(&root).map_err(|err| {
4168 RedDBError::Query(format!(
4169 "tenant auto-fill: failed to re-serialize JSON ({err})"
4170 ))
4171 })?;
4172 Ok(Value::Json(bytes))
4173}
4174
4175#[cfg(test)]
4176mod tests {
4177 use crate::storage::schema::Value;
4178 use crate::storage::wal::{WalReader, WalRecord};
4179 use crate::{RedDBOptions, RedDBRuntime};
4180 use std::path::Path;
4181
4182 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4183 WalReader::open(wal_path)
4184 .expect("wal opens")
4185 .iter()
4186 .map(|record| record.expect("wal record decodes").1)
4187 .filter_map(|record| match record {
4188 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4189 _ => None,
4190 })
4191 .collect()
4192 }
4193
4194 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4195 action
4196 .windows(needle.len())
4197 .any(|window| window == needle.as_bytes())
4198 }
4199
4200 fn assert_statement_writes_collections_in_one_new_wal_batch(
4201 rt: &RedDBRuntime,
4202 wal_path: &Path,
4203 statement: &str,
4204 source: &str,
4205 event_queue: &str,
4206 ) {
4207 let before_batches = store_commit_batches(wal_path).len();
4208
4209 rt.execute_query(statement).unwrap();
4210
4211 let batches = store_commit_batches(wal_path);
4212 let statement_batches = &batches[before_batches..];
4213 let source_batch = statement_batches
4214 .iter()
4215 .position(|actions| {
4216 actions.iter().any(|action| {
4217 action_contains_text(action, source)
4218 && !action_contains_text(action, event_queue)
4219 })
4220 })
4221 .expect("source collection write batch is present");
4222 let event_batch = statement_batches
4223 .iter()
4224 .position(|actions| {
4225 actions
4226 .iter()
4227 .any(|action| action_contains_text(action, event_queue))
4228 })
4229 .expect("event queue write batch is present");
4230
4231 assert_eq!(
4232 source_batch, event_batch,
4233 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4234 );
4235 }
4236
4237 #[test]
4238 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4239 let dir = tempfile::tempdir().unwrap();
4240 let db_path = dir.path().join("events_dual_write.rdb");
4241 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4242 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4243
4244 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4245 .unwrap();
4246 assert_statement_writes_collections_in_one_new_wal_batch(
4247 &rt,
4248 &wal_path,
4249 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4250 "users",
4251 "users_events",
4252 );
4253 }
4254
4255 #[test]
4256 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4257 let dir = tempfile::tempdir().unwrap();
4258 let db_path = dir.path().join("events_update_atomic.rdb");
4259 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4260 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4261
4262 rt.execute_query(
4263 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4264 )
4265 .unwrap();
4266 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4267 .unwrap();
4268
4269 assert_statement_writes_collections_in_one_new_wal_batch(
4270 &rt,
4271 &wal_path,
4272 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4273 "users",
4274 "user_updates",
4275 );
4276 }
4277
4278 #[test]
4279 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4280 let dir = tempfile::tempdir().unwrap();
4281 let db_path = dir.path().join("events_delete_atomic.rdb");
4282 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4283 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4284
4285 rt.execute_query(
4286 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4287 )
4288 .unwrap();
4289 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4290 .unwrap();
4291
4292 assert_statement_writes_collections_in_one_new_wal_batch(
4293 &rt,
4294 &wal_path,
4295 "DELETE FROM users WHERE id = 1",
4296 "users",
4297 "user_deletes",
4298 );
4299 }
4300
4301 #[test]
4302 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4303 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4304 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4305 .unwrap();
4306 for id in 0..5 {
4307 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4308 .unwrap();
4309 }
4310 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4311 .unwrap();
4312
4313 let updated = rt
4314 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4315 .unwrap();
4316 assert_eq!(updated.affected_rows, 3);
4317
4318 let selected = rt
4319 .execute_query("SELECT id, score FROM users ORDER BY id")
4320 .unwrap();
4321 let scores: Vec<(i64, i64)> = selected
4322 .result
4323 .records
4324 .iter()
4325 .map(|record| {
4326 let id = match record.get("id").unwrap() {
4327 Value::Integer(value) => *value,
4328 other => panic!("expected integer id, got {other:?}"),
4329 };
4330 let score = match record.get("score").unwrap() {
4331 Value::Integer(value) => *value,
4332 other => panic!("expected integer score, got {other:?}"),
4333 };
4334 (id, score)
4335 })
4336 .collect();
4337 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4338 }
4339
4340 #[test]
4347 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4348 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4349 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4350 .unwrap();
4351 for id in 0..5 {
4352 rt.execute_query(&format!(
4353 "INSERT INTO items (id, score) VALUES ({id}, {})",
4354 id * 10
4355 ))
4356 .unwrap();
4357 }
4358 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4359 .unwrap();
4360
4361 let updated_one = rt
4365 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4366 .unwrap();
4367 assert_eq!(updated_one.affected_rows, 1);
4368
4369 let updated_many = rt
4373 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4374 .unwrap();
4375 assert_eq!(updated_many.affected_rows, 2);
4376
4377 let snapshot = rt
4378 .execute_query("SELECT id, score FROM items ORDER BY id")
4379 .unwrap();
4380 let pairs: Vec<(i64, i64)> = snapshot
4381 .result
4382 .records
4383 .iter()
4384 .map(|record| {
4385 let id = match record.get("id").unwrap() {
4386 Value::Integer(value) => *value,
4387 other => panic!("expected integer id, got {other:?}"),
4388 };
4389 let score = match record.get("score").unwrap() {
4390 Value::Integer(value) => *value,
4391 other => panic!("expected integer score, got {other:?}"),
4392 };
4393 (id, score)
4394 })
4395 .collect();
4396 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4397
4398 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4400 assert_eq!(updated_all.affected_rows, 5);
4401 let after = rt
4402 .execute_query("SELECT score FROM items ORDER BY id")
4403 .unwrap();
4404 let scores: Vec<i64> = after
4405 .result
4406 .records
4407 .iter()
4408 .map(|record| match record.get("score").unwrap() {
4409 Value::Integer(value) => *value,
4410 other => panic!("expected integer score, got {other:?}"),
4411 })
4412 .collect();
4413 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4414 }
4415
4416 #[test]
4422 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4423 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4424 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4425 .unwrap();
4426 for id in 0..5 {
4427 rt.execute_query(&format!(
4428 "INSERT INTO items (id, score) VALUES ({id}, {})",
4429 id * 10
4430 ))
4431 .unwrap();
4432 }
4433 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4434 .unwrap();
4435
4436 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4439 assert_eq!(deleted_one.affected_rows, 1);
4440
4441 let deleted_many = rt
4445 .execute_query("DELETE FROM items WHERE score > 25")
4446 .unwrap();
4447 assert_eq!(deleted_many.affected_rows, 2);
4448
4449 let surviving = rt
4450 .execute_query("SELECT id FROM items ORDER BY id")
4451 .unwrap();
4452 let ids: Vec<i64> = surviving
4453 .result
4454 .records
4455 .iter()
4456 .map(|record| match record.get("id").unwrap() {
4457 Value::Integer(value) => *value,
4458 other => panic!("expected integer id, got {other:?}"),
4459 })
4460 .collect();
4461 assert_eq!(ids, vec![0, 1]);
4462
4463 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4465 assert_eq!(deleted_rest.affected_rows, 2);
4466 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4467 assert!(empty.result.records.is_empty());
4468 }
4469
4470 #[test]
4475 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4476 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4477 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4478 .unwrap();
4479
4480 let inserted = rt
4483 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4484 .unwrap();
4485 assert_eq!(inserted.affected_rows, 1);
4486
4487 let update_err = rt
4489 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4490 .unwrap_err();
4491 let msg = format!("{update_err}");
4492 assert!(
4493 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4494 "expected UPDATE rejection message, got: {msg}"
4495 );
4496
4497 let delete_err = rt
4499 .execute_query("DELETE FROM events WHERE id = 1")
4500 .unwrap_err();
4501 let msg = format!("{delete_err}");
4502 assert!(
4503 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4504 "expected DELETE rejection message, got: {msg}"
4505 );
4506
4507 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4510 assert_eq!(surviving.result.records.len(), 1);
4511 }
4512
4513 #[test]
4517 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4518 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4519 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4520 .unwrap();
4521
4522 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4523 .unwrap();
4524 let updated = rt
4525 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4526 .unwrap();
4527 assert_eq!(updated.affected_rows, 1);
4528 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4529 assert_eq!(deleted.affected_rows, 1);
4530 }
4531
4532 #[test]
4533 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4534 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4535 rt.execute_query(
4536 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4537 )
4538 .unwrap();
4539
4540 let inserted = rt
4541 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4542 .unwrap();
4543 assert_eq!(inserted.affected_rows, 1);
4544
4545 let events = queue_payloads(&rt, "audit_log");
4546 assert_eq!(events.len(), 1);
4547 let event = events[0].as_object().expect("event payload object");
4548 assert!(event
4549 .get("event_id")
4550 .and_then(crate::json::Value::as_str)
4551 .is_some_and(|value| !value.is_empty()));
4552 assert_eq!(
4553 event.get("op").and_then(crate::json::Value::as_str),
4554 Some("insert")
4555 );
4556 assert_eq!(
4557 event.get("collection").and_then(crate::json::Value::as_str),
4558 Some("users")
4559 );
4560 assert_eq!(
4561 event.get("id").and_then(crate::json::Value::as_u64),
4562 Some(7)
4563 );
4564 assert!(event
4565 .get("ts")
4566 .and_then(crate::json::Value::as_u64)
4567 .is_some());
4568 assert!(event
4569 .get("lsn")
4570 .and_then(crate::json::Value::as_u64)
4571 .is_some());
4572 assert!(matches!(
4573 event.get("tenant"),
4574 Some(crate::json::Value::Null)
4575 ));
4576 assert!(matches!(
4577 event.get("before"),
4578 Some(crate::json::Value::Null)
4579 ));
4580 let after = event
4581 .get("after")
4582 .and_then(crate::json::Value::as_object)
4583 .expect("after object");
4584 assert_eq!(
4585 after.get("id").and_then(crate::json::Value::as_u64),
4586 Some(7)
4587 );
4588 assert_eq!(
4589 after.get("email").and_then(crate::json::Value::as_str),
4590 Some("a@example.com")
4591 );
4592 }
4593
4594 #[test]
4595 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4596 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4597 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4598 .unwrap();
4599
4600 rt.execute_query(
4601 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4602 )
4603 .unwrap();
4604
4605 let events = queue_payloads(&rt, "users_events");
4606 assert_eq!(events.len(), 2);
4607 let mut previous_lsn = 0;
4608 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4609 let object = event.as_object().expect("event payload object");
4610 assert_eq!(
4611 object.get("op").and_then(crate::json::Value::as_str),
4612 Some("insert")
4613 );
4614 assert_eq!(
4615 object.get("id").and_then(crate::json::Value::as_u64),
4616 Some(expected_id)
4617 );
4618 let lsn = object
4619 .get("lsn")
4620 .and_then(crate::json::Value::as_u64)
4621 .expect("event lsn");
4622 assert!(
4623 lsn > previous_lsn,
4624 "event LSNs should increase in row order"
4625 );
4626 previous_lsn = lsn;
4627 let after = object
4628 .get("after")
4629 .and_then(crate::json::Value::as_object)
4630 .expect("after object");
4631 assert_eq!(
4632 after.get("id").and_then(crate::json::Value::as_u64),
4633 Some(expected_id)
4634 );
4635 }
4636 }
4637
4638 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4639 let result = rt
4640 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4641 .expect("peek queue");
4642 result
4643 .result
4644 .records
4645 .iter()
4646 .map(
4647 |record| match record.get("payload").expect("payload column") {
4648 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4649 other => panic!("expected JSON queue payload, got {other:?}"),
4650 },
4651 )
4652 .collect()
4653 }
4654
4655 #[test]
4667 fn auto_index_id_fires_on_first_insert() {
4668 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4669 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4670 .unwrap();
4671
4672 assert!(
4674 rt.index_store_ref()
4675 .find_index_for_column("bench_users", "id")
4676 .is_none(),
4677 "freshly created collection should not have an `id` index"
4678 );
4679
4680 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4682 .unwrap();
4683
4684 let registered = rt
4686 .index_store_ref()
4687 .find_index_for_column("bench_users", "id")
4688 .expect("auto-index hook should have registered idx_id on first insert");
4689 assert_eq!(registered.name, "idx_id");
4690 assert_eq!(registered.collection, "bench_users");
4691 assert_eq!(registered.columns, vec!["id".to_string()]);
4692 assert!(matches!(
4693 registered.method,
4694 super::super::index_store::IndexMethodKind::Hash
4695 ));
4696
4697 for id in 2..=5 {
4700 rt.execute_query(&format!(
4701 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4702 id * 10
4703 ))
4704 .unwrap();
4705 }
4706 for id in 1..=5 {
4707 let result = rt
4708 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4709 .unwrap();
4710 assert_eq!(
4711 result.result.records.len(),
4712 1,
4713 "id={id} should match one row"
4714 );
4715 }
4716
4717 let deleted = rt
4722 .execute_query("DELETE FROM bench_users WHERE id = 3")
4723 .unwrap();
4724 assert_eq!(deleted.affected_rows, 1);
4725 }
4726
4727 #[test]
4732 fn auto_index_id_fires_on_first_bulk_insert() {
4733 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4734 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4735 .unwrap();
4736
4737 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4738 .unwrap();
4739
4740 let registered = rt
4741 .index_store_ref()
4742 .find_index_for_column("bench_bulk", "id")
4743 .expect("auto-index hook should fire on first bulk insert");
4744 assert_eq!(registered.name, "idx_id");
4745
4746 for id in 1..=3 {
4748 let result = rt
4749 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4750 .unwrap();
4751 assert_eq!(result.result.records.len(), 1);
4752 }
4753 }
4754
4755 #[test]
4759 fn auto_index_id_skips_when_no_id_column() {
4760 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4761 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4762 .unwrap();
4763 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4764 .unwrap();
4765
4766 assert!(rt
4767 .index_store_ref()
4768 .find_index_for_column("plain", "id")
4769 .is_none());
4770 assert!(rt
4771 .index_store_ref()
4772 .find_index_for_column("plain", "uid")
4773 .is_none());
4774 }
4775
4776 #[test]
4781 fn auto_index_id_skips_when_index_already_exists() {
4782 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4783 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4784 .unwrap();
4785 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4787 .unwrap();
4788 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4789 .unwrap();
4790
4791 let registered = rt
4792 .index_store_ref()
4793 .find_index_for_column("pre", "id")
4794 .expect("user index should still be there");
4795 assert_eq!(
4796 registered.name, "user_idx",
4797 "auto-index hook must not overwrite an existing index"
4798 );
4799 }
4800
4801 #[test]
4805 fn auto_index_id_dropped_with_collection() {
4806 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4807 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4808 .unwrap();
4809 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4810 .unwrap();
4811 assert!(rt
4812 .index_store_ref()
4813 .find_index_for_column("ephemeral", "id")
4814 .is_some());
4815
4816 rt.execute_query("DROP TABLE ephemeral").unwrap();
4817
4818 assert!(
4819 rt.index_store_ref()
4820 .find_index_for_column("ephemeral", "id")
4821 .is_none(),
4822 "implicit `idx_id` must be reaped when its collection drops"
4823 );
4824 }
4825
4826 #[test]
4831 fn auto_index_id_disabled_by_config() {
4832 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4833 let rt = RedDBRuntime::with_options(opts).unwrap();
4834
4835 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4836 .unwrap();
4837 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4838 .unwrap();
4839
4840 assert!(
4841 rt.index_store_ref()
4842 .find_index_for_column("off", "id")
4843 .is_none(),
4844 "with auto_index_id=false, no implicit index should be created"
4845 );
4846 }
4847
4848 #[test]
4851 fn update_single_row_emits_update_event() {
4852 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4853 rt.execute_query(
4854 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4855 )
4856 .unwrap();
4857 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4858 .unwrap();
4859
4860 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4861 .unwrap();
4862
4863 let events = queue_payloads(&rt, "audit_log");
4864 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4865 let event = events[0].as_object().expect("event payload object");
4866 assert_eq!(
4867 event.get("op").and_then(crate::json::Value::as_str),
4868 Some("update")
4869 );
4870 assert_eq!(
4871 event.get("collection").and_then(crate::json::Value::as_str),
4872 Some("users")
4873 );
4874 assert!(event
4875 .get("event_id")
4876 .and_then(crate::json::Value::as_str)
4877 .is_some_and(|v| !v.is_empty()));
4878 let before = event
4879 .get("before")
4880 .and_then(crate::json::Value::as_object)
4881 .expect("before must be an object");
4882 let after = event
4883 .get("after")
4884 .and_then(crate::json::Value::as_object)
4885 .expect("after must be an object");
4886 assert_eq!(
4887 before.get("name").and_then(crate::json::Value::as_str),
4888 Some("Alice"),
4889 "before.name should be the old value"
4890 );
4891 assert_eq!(
4892 after.get("name").and_then(crate::json::Value::as_str),
4893 Some("Bob"),
4894 "after.name should be the new value"
4895 );
4896 }
4897
4898 #[test]
4899 fn update_event_only_includes_changed_fields() {
4900 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4901 rt.execute_query(
4902 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4903 )
4904 .unwrap();
4905 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4906 .unwrap();
4907
4908 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4909 .unwrap();
4910
4911 let events = queue_payloads(&rt, "evts");
4912 assert_eq!(events.len(), 1);
4913 let event = events[0].as_object().unwrap();
4914 let before = event
4915 .get("before")
4916 .and_then(crate::json::Value::as_object)
4917 .unwrap();
4918 let after = event
4919 .get("after")
4920 .and_then(crate::json::Value::as_object)
4921 .unwrap();
4922 assert!(
4924 before.contains_key("name"),
4925 "before must include changed field"
4926 );
4927 assert!(
4928 after.contains_key("name"),
4929 "after must include changed field"
4930 );
4931 assert!(
4933 !before.contains_key("email"),
4934 "before must not include unchanged email"
4935 );
4936 assert!(
4937 !after.contains_key("email"),
4938 "after must not include unchanged email"
4939 );
4940 }
4941
4942 #[test]
4943 fn multi_row_update_emits_one_event_per_row() {
4944 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4945 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4946 .unwrap();
4947 rt.execute_query(
4948 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4949 )
4950 .unwrap();
4951
4952 rt.execute_query("UPDATE items SET status = 'done'")
4953 .unwrap();
4954
4955 let events = queue_payloads(&rt, "evts");
4956 assert_eq!(events.len(), 3, "expected one update event per row");
4957 for event in &events {
4958 let obj = event.as_object().unwrap();
4959 assert_eq!(
4960 obj.get("op").and_then(crate::json::Value::as_str),
4961 Some("update")
4962 );
4963 }
4964 }
4965
4966 #[test]
4967 fn delete_single_row_emits_delete_event() {
4968 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4969 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4970 .unwrap();
4971 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4972 .unwrap();
4973
4974 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4975
4976 let events = queue_payloads(&rt, "del_log");
4977 assert_eq!(events.len(), 1);
4978 let event = events[0].as_object().expect("event payload object");
4979 assert_eq!(
4980 event.get("op").and_then(crate::json::Value::as_str),
4981 Some("delete")
4982 );
4983 assert_eq!(
4984 event.get("collection").and_then(crate::json::Value::as_str),
4985 Some("users")
4986 );
4987 assert!(event
4988 .get("event_id")
4989 .and_then(crate::json::Value::as_str)
4990 .is_some_and(|v| !v.is_empty()));
4991 let before = event
4992 .get("before")
4993 .and_then(crate::json::Value::as_object)
4994 .expect("before must be an object for delete");
4995 assert_eq!(
4996 before.get("id").and_then(crate::json::Value::as_u64),
4997 Some(42)
4998 );
4999 assert_eq!(
5000 before.get("name").and_then(crate::json::Value::as_str),
5001 Some("Alice")
5002 );
5003 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5004 }
5005
5006 #[test]
5007 fn multi_row_delete_emits_one_event_per_row() {
5008 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5009 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5010 .unwrap();
5011 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5012 .unwrap();
5013
5014 rt.execute_query("DELETE FROM items").unwrap();
5015
5016 let events = queue_payloads(&rt, "del_log");
5017 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5018 for event in &events {
5019 let obj = event.as_object().unwrap();
5020 assert_eq!(
5021 obj.get("op").and_then(crate::json::Value::as_str),
5022 Some("delete")
5023 );
5024 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5025 }
5026 }
5027
5028 #[test]
5029 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5030 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5031 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5032 .unwrap();
5033
5034 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5035 .unwrap();
5036 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5037
5038 let events = queue_payloads(&rt, "evts");
5039 assert!(
5040 events.is_empty(),
5041 "UPDATE-only filter must not emit INSERT or DELETE events"
5042 );
5043 }
5044
5045 #[test]
5048 fn suppress_events_on_insert_emits_no_events() {
5049 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5050 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5051 .unwrap();
5052
5053 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5054 .unwrap();
5055
5056 let events = queue_payloads(&rt, "evts");
5057 assert!(
5058 events.is_empty(),
5059 "SUPPRESS EVENTS must prevent INSERT events"
5060 );
5061 }
5062
5063 #[test]
5064 fn suppress_events_on_update_emits_no_events() {
5065 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5066 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5067 .unwrap();
5068 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5069 .unwrap();
5070 let _ = queue_payloads(&rt, "evts");
5072 rt.execute_query("QUEUE PURGE evts").unwrap();
5074
5075 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5076 .unwrap();
5077
5078 let events = queue_payloads(&rt, "evts");
5079 assert!(
5080 events.is_empty(),
5081 "SUPPRESS EVENTS must prevent UPDATE events"
5082 );
5083 }
5084
5085 #[test]
5086 fn suppress_events_on_delete_emits_no_events() {
5087 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5088 rt.execute_query(
5089 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5090 )
5091 .unwrap();
5092 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5093 .unwrap();
5094
5095 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5096 .unwrap();
5097
5098 let events = queue_payloads(&rt, "evts");
5099 assert!(
5100 events.is_empty(),
5101 "SUPPRESS EVENTS must prevent DELETE events"
5102 );
5103 }
5104
5105 #[test]
5106 fn normal_insert_after_suppress_still_emits() {
5107 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5108 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5109 .unwrap();
5110
5111 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5112 .unwrap();
5113 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5114 .unwrap();
5115
5116 let events = queue_payloads(&rt, "evts");
5117 assert_eq!(
5118 events.len(),
5119 1,
5120 "only the non-suppressed INSERT should emit"
5121 );
5122 assert_eq!(
5123 events[0].get("id").and_then(crate::json::Value::as_u64),
5124 Some(2)
5125 );
5126 }
5127}