1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_rid, sys_key_tenant,
28 sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100 if !outcome.ok {
101 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102 self.inner
103 .chain_integrity_broken
104 .lock()
105 .insert(collection.to_string(), true);
106 }
107 Some(outcome)
108 }
109
110 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114 let store = self.inner.db.store();
115 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116 return false;
117 }
118 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119 self.inner
120 .chain_integrity_broken
121 .lock()
122 .insert(collection.to_string(), false);
123 true
124 }
125
126 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130 {
131 let cache = self.inner.chain_integrity_broken.lock();
132 if let Some(v) = cache.get(collection) {
133 return *v;
134 }
135 }
136 let store = self.inner.db.store();
137 let persisted =
138 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139 .unwrap_or(false);
140 self.inner
141 .chain_integrity_broken
142 .lock()
143 .insert(collection.to_string(), persisted);
144 persisted
145 }
146
147 fn ensure_integrity_tombstones_loaded(&self) -> bool {
152 use std::sync::atomic::Ordering;
153 match self
154 .inner
155 .integrity_tombstones_state
156 .load(Ordering::Relaxed)
157 {
158 1 => return false,
159 2 => return true,
160 _ => {}
161 }
162 let mut guard = self.inner.integrity_tombstones.lock();
165 if self
166 .inner
167 .integrity_tombstones_state
168 .load(Ordering::Relaxed)
169 == 0
170 {
171 let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172 let present = !ranges.is_empty();
173 *guard = ranges;
174 self.inner
175 .integrity_tombstones_state
176 .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177 }
178 self.inner
179 .integrity_tombstones_state
180 .load(Ordering::Relaxed)
181 == 2
182 }
183
184 pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190 use std::sync::atomic::Ordering;
191 self.ensure_integrity_tombstones_loaded();
192 let mut guard = self.inner.integrity_tombstones.lock();
193 guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194 table.to_string(),
195 lo,
196 hi,
197 ));
198 crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199 self.inner
200 .integrity_tombstones_state
201 .store(2, Ordering::Relaxed);
202 }
203
204 pub fn integrity_tombstone_ranges(
208 &self,
209 ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210 self.ensure_integrity_tombstones_loaded();
211 self.inner.integrity_tombstones.lock().clone()
212 }
213
214 pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219 if !self.ensure_integrity_tombstones_loaded() {
220 return;
221 }
222 let guard = self.inner.integrity_tombstones.lock();
223 if guard.is_empty() {
224 return;
225 }
226 let before = result.records.len();
227 result.records.retain(|record| {
228 !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229 });
230 if result.records.len() != before {
231 result.pre_serialized_json = None;
232 }
233 }
234
235 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249 let Some(tenant_col) = self.tenant_column(&query.table) else {
250 return Ok(None);
251 };
252 if query
254 .columns
255 .iter()
256 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257 {
258 return Ok(None);
259 }
260
261 if let Some(dot_pos) = tenant_col.find('.') {
267 let (root, tail) = tenant_col.split_at(dot_pos);
268 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
270 }
271
272 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273 return Err(RedDBError::Query(format!(
274 "INSERT into tenant-scoped table '{}' requires an active tenant — \
275 run SET TENANT '<id>' first or name column '{}' explicitly",
276 query.table, tenant_col
277 )));
278 };
279
280 let mut augmented = query.clone();
281 augmented.columns.push(tenant_col);
282 let lit = Value::text(tenant_id.clone());
283 for row in augmented.values.iter_mut() {
284 row.push(lit.clone());
285 }
286 for row in augmented.value_exprs.iter_mut() {
287 row.push(crate::storage::query::ast::Expr::Literal {
288 value: lit.clone(),
289 span: crate::storage::query::ast::Span::synthetic(),
290 });
291 }
292 Ok(Some(augmented))
293 }
294
295 fn inject_dotted_tenant(
305 &self,
306 query: &InsertQuery,
307 root: &str,
308 tail: &str,
309 ) -> RedDBResult<Option<InsertQuery>> {
310 let active_tenant = crate::runtime::impl_core::current_tenant();
311 let mut augmented = query.clone();
312 let root_idx = augmented
313 .columns
314 .iter()
315 .position(|c| c.eq_ignore_ascii_case(root));
316
317 if let Some(idx) = root_idx {
318 for row in augmented.values.iter_mut() {
324 let Some(slot) = row.get_mut(idx) else {
325 continue;
326 };
327 if dotted_tail_already_set(slot, tail) {
328 continue;
329 }
330 let Some(tenant_id) = &active_tenant else {
331 return Err(RedDBError::Query(format!(
332 "INSERT into tenant-scoped table '{}' requires an active tenant — \
333 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334 query.table, root, tail
335 )));
336 };
337 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338 }
339 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343 if let Some(slot) = row.get_mut(idx) {
344 let new_value = augmented
345 .values
346 .get(row_idx)
347 .and_then(|v| v.get(idx))
348 .cloned()
349 .unwrap_or(Value::Null);
350 *slot = crate::storage::query::ast::Expr::Literal {
351 value: new_value,
352 span: crate::storage::query::ast::Span::synthetic(),
353 };
354 }
355 }
356 } else {
357 let Some(tenant_id) = &active_tenant else {
361 return Err(RedDBError::Query(format!(
362 "INSERT into tenant-scoped table '{}' requires an active tenant — \
363 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364 query.table, root, tail
365 )));
366 };
367 augmented.columns.push(root.to_string());
369 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370 for row in augmented.values.iter_mut() {
371 row.push(fresh.clone());
372 }
373 for row in augmented.value_exprs.iter_mut() {
374 row.push(crate::storage::query::ast::Expr::Literal {
375 value: fresh.clone(),
376 span: crate::storage::query::ast::Span::synthetic(),
377 });
378 }
379 }
380
381 Ok(Some(augmented))
382 }
383
384 fn delete_entities_batch(
387 &self,
388 collection: &str,
389 ids: &[EntityId],
390 ) -> RedDBResult<(u64, Vec<u64>)> {
391 if ids.is_empty() {
392 return Ok((0, vec![]));
393 }
394
395 let store = self.db().store();
396 let Some(manager) = store.get_collection(collection) else {
397 return Ok((0, vec![]));
398 };
399
400 let active_xid = self.current_xid();
401 let conn_id = crate::runtime::impl_core::current_connection_id();
402 let mut autocommit_xid = None;
403 let mut tombstoned_ids = Vec::new();
404 let mut tombstoned_entities = Vec::new();
405 let mut physical_delete_ids = Vec::new();
406 let table_row_resolver =
407 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408
409 for &id in ids {
410 let Some(mut entity) = manager.get(id) else {
411 continue;
412 };
413 if matches!(entity.data, EntityData::Row(_)) {
414 let previous_xmax = entity.xmax;
415 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
416 if table_row_resolver.resolve_candidate(&entity).is_none() {
417 continue;
418 }
419 } else if entity.xmax != 0 {
420 continue;
421 }
422
423 let xid = match active_xid {
424 Some(xid) => xid,
425 None => match autocommit_xid {
426 Some(xid) => xid,
427 None => {
428 let mgr = self.snapshot_manager();
429 let xid = mgr.begin();
430 autocommit_xid = Some(xid);
431 xid
432 }
433 },
434 };
435 entity.set_xmax(xid);
436 if manager.update(entity.clone()).is_ok() {
437 if active_xid.is_some() {
438 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
439 }
440 tombstoned_entities.push(entity);
441 tombstoned_ids.push(id);
442 }
443 } else {
444 physical_delete_ids.push(id);
445 }
446 }
447
448 if let Some(xid) = autocommit_xid {
449 self.snapshot_manager().commit(xid);
450 }
451
452 let mut affected = tombstoned_ids.len() as u64;
453 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
454 if active_xid.is_some() {
455 store
456 .persist_entities_to_pager(collection, &tombstoned_entities)
457 .map_err(|err| RedDBError::Internal(err.to_string()))?;
458 } else {
459 store
460 .persist_entities_to_pager(collection, &tombstoned_entities)
461 .map_err(|err| RedDBError::Internal(err.to_string()))?;
462 for id in &tombstoned_ids {
463 store.context_index().remove_entity(*id);
464 let lsn = self.cdc_emit(
465 crate::replication::cdc::ChangeOperation::Delete,
466 collection,
467 id.raw(),
468 "entity",
469 );
470 lsns.push(lsn);
471 }
472 }
473
474 let deleted_ids = store
475 .delete_batch(collection, &physical_delete_ids)
476 .map_err(|err| RedDBError::Internal(err.to_string()))?;
477 affected += deleted_ids.len() as u64;
478 for id in &deleted_ids {
479 store.context_index().remove_entity(*id);
480 let lsn = self.cdc_emit(
481 crate::replication::cdc::ChangeOperation::Delete,
482 collection,
483 id.raw(),
484 "entity",
485 );
486 lsns.push(lsn);
487 }
488
489 Ok((affected, lsns))
490 }
491
492 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
495 if applied.is_empty() {
496 return Ok(Vec::new());
497 }
498
499 let store = self.db().store();
500 if applied.iter().any(|item| item.context_index_dirty) {
501 store.context_index().index_entities(
502 &applied[0].collection,
503 applied
504 .iter()
505 .filter(|item| item.context_index_dirty)
506 .map(|item| &item.entity),
507 );
508 }
509
510 for item in applied {
511 self.refresh_update_secondary_indexes(item)?;
512 }
513
514 let mut lsns = Vec::with_capacity(applied.len());
515 for item in applied {
516 let lsn = self.cdc_emit_prebuilt(
517 crate::replication::cdc::ChangeOperation::Update,
518 &item.collection,
519 &item.entity,
520 update_cdc_item_kind(self, &item.collection, &item.entity),
521 item.metadata.as_ref(),
522 false,
523 );
524 lsns.push(lsn);
525 }
526 Ok(lsns)
527 }
528
529 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
530 self.persist_applied_entity_mutations(applied)
531 }
532
533 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
534 if applied.pre_mutation_fields.is_empty() {
535 return Ok(());
536 }
537 let post = entity_row_fields_snapshot(&applied.entity);
538 if post.is_empty() {
539 return Ok(());
540 }
541
542 let indexed_cols = self
543 .index_store_ref()
544 .indexed_columns_set(&applied.collection);
545 if indexed_cols.is_empty() {
546 return Ok(());
547 }
548
549 if let Some(old_version) = applied.replaced_entity.as_ref() {
550 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
551 .pre_mutation_fields
552 .iter()
553 .filter(|(col, _)| indexed_cols.contains(col))
554 .cloned()
555 .collect();
556 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
557 .iter()
558 .filter(|(col, _)| indexed_cols.contains(col))
559 .cloned()
560 .collect();
561 if !old_index_fields.is_empty() {
562 self.index_store_ref()
563 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
564 .map_err(crate::RedDBError::Internal)?;
565 }
566 if !new_index_fields.is_empty() {
567 self.index_store_ref()
568 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
569 .map_err(crate::RedDBError::Internal)?;
570 }
571 return Ok(());
572 }
573
574 let damage =
575 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
576 if damage
577 .touched_columns()
578 .into_iter()
579 .any(|col| indexed_cols.contains(col))
580 {
581 self.index_store_ref()
582 .index_entity_update(
583 &applied.collection,
584 applied.id,
585 &applied.pre_mutation_fields,
586 &post,
587 )
588 .map_err(crate::RedDBError::Internal)?;
589 }
590 Ok(())
591 }
592
593 pub fn execute_insert(
598 &self,
599 raw_query: &str,
600 query: &InsertQuery,
601 ) -> RedDBResult<RuntimeQueryResult> {
602 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
603 crate::runtime::collection_contract::CollectionContractGate::check(
609 self,
610 &query.table,
611 crate::runtime::collection_contract::MutationKind::Insert,
612 )?;
613 let augmented_owned;
622 let query = match self.maybe_inject_tenant_column(query)? {
623 Some(new_q) => {
624 augmented_owned = new_q;
625 &augmented_owned
626 }
627 None => query,
628 };
629 self.check_insert_column_policy(query)?;
630 if let Some(ref embed_config) = query.auto_embed {
631 let provider = crate::ai::parse_provider(&embed_config.provider)?;
632 crate::runtime::ai::provider_gate::enforce(self, &provider)?;
636 if matches!(provider, crate::ai::AiProvider::Local) {
637 crate::runtime::ai::local_embedding::ensure_local_embedding_available()?;
638 let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
646 if model_name.is_empty() {
647 return Err(RedDBError::Query(
648 "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
649 the local provider does not have an implicit default model"
650 .to_string(),
651 ));
652 }
653 crate::runtime::ai::local_embedding::preflight_local_embedding(
654 &self.inner.db,
655 model_name,
656 )?;
657 }
658 }
659
660 let mut inserted_count: u64 = 0;
661 let effective_rows =
662 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
663
664 let store = self.inner.db.store();
666 let _ = store.get_or_create_collection(&query.table);
667 let declared_model = self
668 .db()
669 .collection_contract_arc(&query.table)
670 .map(|contract| contract.declared_model);
671
672 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
673 if query.returning.is_some() {
674 Some(Vec::with_capacity(effective_rows.len()))
675 } else {
676 None
677 };
678 let mut returning_result: Option<UnifiedResult> = None;
679
680 if matches!(query.entity_type, InsertEntityType::Row)
681 && !matches!(
682 declared_model,
683 Some(crate::catalog::CollectionModel::TimeSeries)
684 )
685 {
686 let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
697 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
698 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
699 } else {
700 None
701 };
702 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
703
704 if chain_mode && self.is_chain_integrity_broken(&query.table) {
707 return Err(RedDBError::InvalidOperation(format!(
708 "ChainIntegrityBroken: collection '{}' is locked until \
709 POST /collections/{}/clear-integrity-flag is called by an admin",
710 query.table, query.table
711 )));
712 }
713
714 let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
718 if chain_mode {
719 let mut cache = self.inner.chain_tip_cache.lock();
720 if let Some(existing) = cache.get(&query.table) {
721 Some(existing.clone())
722 } else if let Some(scanned) =
723 crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
724 {
725 cache.insert(query.table.clone(), scanned.clone());
726 Some(scanned)
727 } else {
728 None
729 }
730 } else {
731 None
732 };
733
734 let mut rows = Vec::with_capacity(effective_rows.len());
735 for row_values in &effective_rows {
736 if row_values.len() != query.columns.len() {
737 return Err(RedDBError::Query(format!(
738 "INSERT column count ({}) does not match value count ({})",
739 query.columns.len(),
740 row_values.len()
741 )));
742 }
743 let (mut fields, mut metadata) =
744 split_insert_metadata(self, &query.columns, row_values)?;
745 if chain_mode {
746 use crate::runtime::blockchain_kind::{
747 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
748 COL_TIMESTAMP, RESERVED_COLUMNS,
749 };
750 let supplied_height = fields
751 .iter()
752 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
753 .map(|(_, v)| v.clone());
754 let supplied_prev = fields
755 .iter()
756 .find(|(k, _)| k == COL_PREV_HASH)
757 .map(|(_, v)| v.clone());
758 let supplied_ts = fields
759 .iter()
760 .find(|(k, _)| k == COL_TIMESTAMP)
761 .map(|(_, v)| v.clone());
762 let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
763 let user_supplied_any = supplied_height.is_some()
764 || supplied_prev.is_some()
765 || supplied_ts.is_some()
766 || supplied_hash;
767
768 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
769 let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
770
771 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
772 Some(t) => (t.hash, t.height + 1),
773 None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
774 };
775 let server_now = crate::runtime::blockchain_kind::now_ms();
776
777 let (use_prev, use_height, use_ts) = if user_supplied_any {
778 if supplied_hash {
781 return Err(chain_conflict_error(
782 tip_next_height.saturating_sub(1),
783 tip_prev_hash,
784 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
785 server_now,
786 "hash column is engine-computed and cannot be supplied",
787 ));
788 }
789 let caller_prev = match &supplied_prev {
790 Some(Value::Blob(b)) if b.len() == 32 => {
791 let mut a = [0u8; 32];
792 a.copy_from_slice(b);
793 a
794 }
795 Some(Value::Text(s)) if s.len() == 64 => {
796 let mut a = [0u8; 32];
800 let mut ok = true;
801 for (i, slot) in a.iter_mut().enumerate() {
802 let pair = &s.as_ref()[i * 2..i * 2 + 2];
803 match u8::from_str_radix(pair, 16) {
804 Ok(byte) => *slot = byte,
805 Err(_) => {
806 ok = false;
807 break;
808 }
809 }
810 }
811 if !ok {
812 return Err(chain_conflict_error(
813 tip_next_height.saturating_sub(1),
814 tip_prev_hash,
815 chain_tip_full
816 .as_ref()
817 .map(|t| t.timestamp_ms)
818 .unwrap_or(0),
819 server_now,
820 "prev_hash is not valid hex",
821 ));
822 }
823 a
824 }
825 _ => {
826 return Err(chain_conflict_error(
827 tip_next_height.saturating_sub(1),
828 tip_prev_hash,
829 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
830 server_now,
831 "prev_hash missing or not a 32-byte Blob",
832 ));
833 }
834 };
835 if caller_prev != tip_prev_hash {
836 return Err(chain_conflict_error(
837 tip_next_height.saturating_sub(1),
838 tip_prev_hash,
839 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
840 server_now,
841 "prev_hash does not match current tip",
842 ));
843 }
844 let caller_height = match &supplied_height {
845 Some(Value::UnsignedInteger(v)) => *v,
846 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
847 _ => {
848 return Err(chain_conflict_error(
849 tip_next_height.saturating_sub(1),
850 tip_prev_hash,
851 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
852 server_now,
853 "block_height missing or not an unsigned integer",
854 ));
855 }
856 };
857 if caller_height != tip_next_height {
858 return Err(chain_conflict_error(
859 tip_next_height.saturating_sub(1),
860 tip_prev_hash,
861 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
862 server_now,
863 "block_height does not match tip+1",
864 ));
865 }
866 let caller_ts = match &supplied_ts {
867 Some(Value::UnsignedInteger(v)) => *v,
868 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
869 _ => {
870 return Err(chain_conflict_error(
871 tip_next_height.saturating_sub(1),
872 tip_prev_hash,
873 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
874 server_now,
875 "timestamp missing or not an unsigned integer",
876 ));
877 }
878 };
879 let drift = (caller_ts as i128) - (server_now as i128);
880 if drift.abs() > 60_000 {
881 return Err(chain_conflict_error(
882 tip_next_height.saturating_sub(1),
883 tip_prev_hash,
884 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
885 server_now,
886 "timestamp outside ±60s of server_time",
887 ));
888 }
889 (caller_prev, caller_height, caller_ts)
890 } else {
891 (tip_prev_hash, tip_next_height, server_now)
892 };
893
894 let (reserved, new_hash) =
895 crate::runtime::blockchain_kind::make_block_reserved_fields(
896 use_prev, use_height, use_ts, &payload,
897 );
898 fields.extend(reserved);
899 chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
900 height: use_height,
901 hash: new_hash,
902 timestamp_ms: use_ts,
903 });
904 }
905 if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
917 let (pk_col, sig_col, residual) =
918 crate::runtime::signed_writes_kind::split_signature_fields(fields);
919 let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
920 let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
921 crate::runtime::signed_writes_kind::verify_row(
922 ®,
923 pk_col.as_ref().map(|c| c.bytes.as_slice()),
924 sig_col.as_ref().map(|c| c.bytes.as_slice()),
925 &payload,
926 )
927 .map_err(crate::runtime::signed_writes_kind::map_error)?;
928 fields = residual;
929 if let Some(col) = pk_col {
934 fields.push((
935 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
936 col.raw_value,
937 ));
938 }
939 if let Some(col) = sig_col {
940 fields.push((
941 crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
942 col.raw_value,
943 ));
944 }
945 }
946 merge_with_clauses(
947 &mut metadata,
948 query.ttl_ms,
949 query.expires_at_ms,
950 &query.with_metadata,
951 );
952 if let Some(snaps) = returning_snapshots.as_mut() {
953 snaps.push(fields.clone());
954 }
955 rows.push(CreateRowInput {
956 collection: query.table.clone(),
957 fields,
958 metadata,
959 node_links: Vec::new(),
960 vector_links: Vec::new(),
961 });
962 }
963 let outputs = self.create_rows_batch(CreateRowsBatchInput {
964 collection: query.table.clone(),
965 rows,
966 suppress_events: query.suppress_events,
967 })?;
968 inserted_count = outputs.len() as u64;
969
970 if chain_mode {
974 if let Some(new_tip) = chain_tip_full.as_ref() {
975 self.inner
976 .chain_tip_cache
977 .lock()
978 .insert(query.table.clone(), new_tip.clone());
979 }
980 }
981
982 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
990 let time_col = &spec.time_column;
991 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
993 for row in &effective_rows {
994 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
995 if *n >= 0 {
996 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
997 }
998 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
999 let _ = self.inner.db.hypertables().route(&query.table, *n);
1000 }
1001 }
1002 }
1003 }
1004
1005 if let (Some(items), Some(snaps)) =
1006 (query.returning.as_ref(), returning_snapshots.take())
1007 {
1008 let snaps = row_insert_returning_snapshots(&outputs, snaps);
1009 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1010 }
1011 } else {
1012 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1019 Vec::with_capacity(effective_rows.len());
1020 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1021 {
1022 Vec::with_capacity(effective_rows.len())
1023 } else {
1024 Vec::new()
1025 };
1026 if matches!(
1027 query.entity_type,
1028 InsertEntityType::Node | InsertEntityType::Edge
1029 ) {
1030 enum PreparedGraphInsert {
1031 Node {
1032 fields: Vec<(String, Value)>,
1033 input: CreateNodeInput,
1034 },
1035 Edge {
1036 fields: Vec<(String, Value)>,
1037 input: CreateEdgeInput,
1038 },
1039 }
1040
1041 let mut prepared = Vec::with_capacity(effective_rows.len());
1042 for row_values in &effective_rows {
1043 if row_values.len() != query.columns.len() {
1044 return Err(RedDBError::Query(format!(
1045 "INSERT column count ({}) does not match value count ({})",
1046 query.columns.len(),
1047 row_values.len()
1048 )));
1049 }
1050
1051 match query.entity_type {
1052 InsertEntityType::Node => {
1053 let (node_values, mut metadata) =
1054 split_insert_metadata(self, &query.columns, row_values)?;
1055 merge_with_clauses(
1056 &mut metadata,
1057 query.ttl_ms,
1058 query.expires_at_ms,
1059 &query.with_metadata,
1060 );
1061 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1062 apply_collection_default_ttl_metadata(
1063 self,
1064 &query.table,
1065 &mut metadata,
1066 );
1067 let (columns, values) = pairwise_columns_values(&node_values);
1068 let label = find_column_value_string(&columns, &values, "label")?;
1069 let node_type =
1070 find_column_value_opt_string(&columns, &values, "node_type");
1071 let properties = extract_remaining_properties(
1072 &columns,
1073 &values,
1074 &["label", "node_type"],
1075 );
1076 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1077 properties.iter().map(|(key, _)| key.as_str()),
1078 &format!("node '{}'", query.table),
1079 )?;
1080 prepared.push(PreparedGraphInsert::Node {
1081 fields: node_values,
1082 input: CreateNodeInput {
1083 collection: query.table.clone(),
1084 label,
1085 node_type,
1086 properties,
1087 metadata,
1088 embeddings: Vec::new(),
1089 table_links: Vec::new(),
1090 node_links: Vec::new(),
1091 },
1092 });
1093 }
1094 InsertEntityType::Edge => {
1095 let (edge_values, mut metadata) =
1096 split_insert_metadata(self, &query.columns, row_values)?;
1097 merge_with_clauses(
1098 &mut metadata,
1099 query.ttl_ms,
1100 query.expires_at_ms,
1101 &query.with_metadata,
1102 );
1103 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1104 apply_collection_default_ttl_metadata(
1105 self,
1106 &query.table,
1107 &mut metadata,
1108 );
1109 let (columns, values) = pairwise_columns_values(&edge_values);
1110 let label = find_column_value_string(&columns, &values, "label")?;
1111 ensure_non_tree_structural_edge_label(&label)?;
1112 let from_id = resolve_edge_endpoint_any(
1113 self.inner.db.store().as_ref(),
1114 &query.table,
1115 &columns,
1116 &values,
1117 &["from_rid", "from"],
1118 )?;
1119 let to_id = resolve_edge_endpoint_any(
1120 self.inner.db.store().as_ref(),
1121 &query.table,
1122 &columns,
1123 &values,
1124 &["to_rid", "to"],
1125 )?;
1126 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1127 let properties = extract_remaining_properties(
1128 &columns,
1129 &values,
1130 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1131 );
1132 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1133 properties.iter().map(|(key, _)| key.as_str()),
1134 &format!("edge '{}'", query.table),
1135 )?;
1136 prepared.push(PreparedGraphInsert::Edge {
1137 fields: edge_values,
1138 input: CreateEdgeInput {
1139 collection: query.table.clone(),
1140 label,
1141 from: EntityId::new(from_id),
1142 to: EntityId::new(to_id),
1143 weight,
1144 properties,
1145 metadata,
1146 },
1147 });
1148 }
1149 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1150 }
1151 }
1152
1153 ensure_graph_insert_contract(self, &query.table)?;
1154 let mut batch = self.inner.db.batch();
1155 for item in prepared {
1156 match item {
1157 PreparedGraphInsert::Node { fields, input } => {
1158 if query.returning.is_some() {
1159 returning_field_snaps.push(fields);
1160 }
1161 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1162 batch = batch.add_node_with_type(
1163 input.collection,
1164 input.label,
1165 node_type,
1166 input.properties.into_iter().collect(),
1167 input.metadata.into_iter().collect(),
1168 );
1169 }
1170 PreparedGraphInsert::Edge { fields, input } => {
1171 if query.returning.is_some() {
1172 returning_field_snaps.push(fields);
1173 }
1174 batch = batch.add_edge(
1175 input.collection,
1176 input.label,
1177 input.from,
1178 input.to,
1179 input.weight.unwrap_or(1.0),
1180 input.properties.into_iter().collect(),
1181 input.metadata.into_iter().collect(),
1182 );
1183 }
1184 }
1185 }
1186 let batch_result = batch
1187 .execute()
1188 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1189 let (ids, entity_kind) = match query.entity_type {
1190 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1191 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1192 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1193 };
1194 for id in &ids {
1195 self.stamp_xmin_if_in_txn(&query.table, *id);
1196 }
1197 if query.returning.is_some() {
1198 returning_field_snaps = graph_insert_returning_snapshots(
1199 self.inner.db.store().as_ref(),
1200 &query.table,
1201 &ids,
1202 );
1203 }
1204 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1205 let store = self.inner.db.store();
1206 entity_outputs.extend(ids.iter().map(|id| {
1207 crate::application::entity::CreateEntityOutput {
1208 id: *id,
1209 entity: store.get(&query.table, *id),
1210 }
1211 }));
1212 inserted_count = ids.len() as u64;
1213 } else {
1214 for row_values in &effective_rows {
1215 if row_values.len() != query.columns.len() {
1216 return Err(RedDBError::Query(format!(
1217 "INSERT column count ({}) does not match value count ({})",
1218 query.columns.len(),
1219 row_values.len()
1220 )));
1221 }
1222
1223 match query.entity_type {
1224 InsertEntityType::Row => {
1225 if query.returning.is_some() {
1226 return Err(RedDBError::Query(
1227 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1228 .to_string(),
1229 ));
1230 }
1231 let (fields, mut metadata) =
1232 split_insert_metadata(self, &query.columns, row_values)?;
1233 merge_with_clauses(
1234 &mut metadata,
1235 query.ttl_ms,
1236 query.expires_at_ms,
1237 &query.with_metadata,
1238 );
1239 self.insert_timeseries_point(&query.table, fields, metadata)?;
1240 }
1241 InsertEntityType::Node | InsertEntityType::Edge => {
1242 unreachable!("NODE and EDGE are handled by the prepared graph path")
1243 }
1244 InsertEntityType::Vector => {
1245 let (vector_values, mut metadata) =
1246 split_insert_metadata(self, &query.columns, row_values)?;
1247 merge_with_clauses(
1248 &mut metadata,
1249 query.ttl_ms,
1250 query.expires_at_ms,
1251 &query.with_metadata,
1252 );
1253 let (columns, values) = pairwise_columns_values(&vector_values);
1254 let dense = find_column_value_vec_f32_any(
1255 &columns,
1256 &values,
1257 &["dense", "embedding"],
1258 )?;
1259 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1260 let content =
1261 find_column_value_opt_string(&columns, &values, "content");
1262 if query.returning.is_some() {
1263 returning_field_snaps.push(vector_values.clone());
1264 }
1265 let input = CreateVectorInput {
1266 collection: query.table.clone(),
1267 dense,
1268 content,
1269 metadata,
1270 link_row: None,
1271 link_node: None,
1272 };
1273 entity_outputs.push(self.create_vector(input)?);
1274 }
1275 InsertEntityType::Document => {
1276 let (document_values, mut metadata) =
1277 split_insert_metadata(self, &query.columns, row_values)?;
1278 merge_with_clauses(
1279 &mut metadata,
1280 query.ttl_ms,
1281 query.expires_at_ms,
1282 &query.with_metadata,
1283 );
1284 let (columns, values) = pairwise_columns_values(&document_values);
1285 let body = find_document_body_json(&columns, &values)?;
1286 let input = CreateDocumentInput {
1287 collection: query.table.clone(),
1288 body,
1289 metadata,
1290 node_links: Vec::new(),
1291 vector_links: Vec::new(),
1292 };
1293 let output = self.create_document(input)?;
1294 if query.returning.is_some() {
1295 let fields = output
1296 .entity
1297 .as_ref()
1298 .map(entity_row_fields_snapshot)
1299 .filter(|fields| !fields.is_empty())
1300 .unwrap_or(document_values);
1301 returning_field_snaps.push(fields);
1302 }
1303 entity_outputs.push(output);
1304 }
1305 InsertEntityType::Kv => {
1306 let (kv_values, mut metadata) =
1307 split_insert_metadata(self, &query.columns, row_values)?;
1308 merge_with_clauses(
1309 &mut metadata,
1310 query.ttl_ms,
1311 query.expires_at_ms,
1312 &query.with_metadata,
1313 );
1314 let (columns, values) = pairwise_columns_values(&kv_values);
1315 let key = find_column_value_string(&columns, &values, "key")?;
1316 let value = find_column_value(&columns, &values, "value")?;
1317 if query.returning.is_some() {
1318 returning_field_snaps.push(kv_values.clone());
1319 }
1320 let input = CreateKvInput {
1321 collection: query.table.clone(),
1322 key,
1323 value,
1324 metadata,
1325 };
1326 entity_outputs.push(self.create_kv(input)?);
1327 }
1328 }
1329
1330 inserted_count += 1;
1331 }
1332 }
1333
1334 if let Some(items) = query.returning.as_ref() {
1335 if !entity_outputs.is_empty() {
1336 returning_result = Some(build_returning_result(
1337 items,
1338 &returning_field_snaps,
1339 Some(&entity_outputs),
1340 ));
1341 }
1342 }
1343 }
1344
1345 if let Some(ref embed_config) = query.auto_embed {
1347 let store = self.inner.db.store();
1348 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1349 let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1350 let api_key = if is_local_provider {
1355 String::new()
1356 } else {
1357 crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1358 };
1359 let model = embed_config.model.clone().unwrap_or_else(|| {
1360 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1361 .ok()
1362 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1363 });
1364
1365 let manager = store
1367 .get_collection(&query.table)
1368 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1369 let entities = manager.query_all(|_| true);
1370 let recent: Vec<_> = entities
1371 .into_iter()
1372 .rev()
1373 .take(effective_rows.len())
1374 .collect();
1375
1376 let entity_combos: Vec<(usize, String)> = recent
1378 .iter()
1379 .enumerate()
1380 .filter_map(|(i, entity)| {
1381 if let EntityData::Row(ref row) = entity.data {
1382 if let Some(ref named) = row.named {
1383 let texts: Vec<String> = embed_config
1384 .fields
1385 .iter()
1386 .filter_map(|field| match named.get(field) {
1387 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1388 _ => None,
1389 })
1390 .collect();
1391 if !texts.is_empty() {
1392 return Some((i, texts.join(" ")));
1393 }
1394 }
1395 }
1396 None
1397 })
1398 .collect();
1399
1400 if !entity_combos.is_empty() {
1401 let batch_texts: Vec<String> =
1403 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1404
1405 let embeddings = if is_local_provider {
1415 let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1416 &self.inner.db,
1417 &model,
1418 batch_texts,
1419 )?;
1420 response.embeddings
1421 } else {
1422 let batch_client =
1423 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1424
1425 match tokio::runtime::Handle::try_current() {
1426 Ok(handle) => tokio::task::block_in_place(|| {
1427 handle.block_on(batch_client.embed_batch(
1428 &provider,
1429 &model,
1430 &api_key,
1431 batch_texts,
1432 ))
1433 }),
1434 Err(_) => {
1435 return Err(RedDBError::Query(
1436 "AUTO EMBED requires a Tokio runtime context".to_string(),
1437 ));
1438 }
1439 }
1440 .map_err(|e| RedDBError::Query(e.to_string()))?
1441 };
1442
1443 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1445 if dense.is_empty() {
1446 continue;
1447 }
1448 self.create_vector(CreateVectorInput {
1449 collection: query.table.clone(),
1450 dense,
1451 content: Some(combined.clone()),
1452 metadata: Vec::new(),
1453 link_row: None,
1454 link_node: None,
1455 })?;
1456 }
1457 }
1458 }
1459
1460 if inserted_count > 0 {
1461 self.note_table_write(&query.table);
1462 }
1463
1464 let mut result = RuntimeQueryResult::dml_result(
1465 raw_query.to_string(),
1466 inserted_count,
1467 "insert",
1468 "runtime-dml",
1469 );
1470 if let Some(returning) = returning_result {
1471 result.result = returning;
1472 }
1473 Ok(result)
1474 }
1475
1476 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1477 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1478 return Ok(());
1479 };
1480 if !auth_store.iam_authorization_enabled() {
1481 return Ok(());
1482 }
1483 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1484 return Ok(());
1485 };
1486
1487 let tenant = crate::runtime::impl_core::current_tenant();
1488 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1489 let request = crate::auth::ColumnAccessRequest {
1490 action: "insert".to_string(),
1491 schema: None,
1492 table: query.table.clone(),
1493 columns: query.columns.clone(),
1494 };
1495 let ctx = crate::auth::policies::EvalContext {
1496 principal_tenant: tenant.clone(),
1497 current_tenant: tenant,
1498 peer_ip: None,
1499 mfa_present: false,
1500 now_ms: crate::auth::now_ms(),
1501 principal_is_admin_role: role == crate::auth::Role::Admin,
1502 principal_is_platform_scoped: principal.tenant.is_none(),
1503 };
1504
1505 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1506 let table_allowed = matches!(
1507 outcome.table_decision,
1508 crate::auth::policies::Decision::Allow { .. }
1509 | crate::auth::policies::Decision::AdminBypass
1510 );
1511 if !table_allowed {
1512 return Err(RedDBError::Query(format!(
1513 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1514 outcome.table_resource.kind, outcome.table_resource.name
1515 )));
1516 }
1517 if let Some(denied) = outcome.first_denied_column() {
1518 return Err(RedDBError::Query(format!(
1519 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1520 denied.resource.kind, denied.resource.name
1521 )));
1522 }
1523
1524 Ok(())
1525 }
1526
1527 pub(crate) fn insert_timeseries_point(
1528 &self,
1529 collection: &str,
1530 fields: Vec<(String, Value)>,
1531 mut metadata: Vec<(String, MetadataValue)>,
1532 ) -> RedDBResult<EntityId> {
1533 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1534
1535 let (columns, values) = pairwise_columns_values(&fields);
1536 validate_timeseries_insert_columns(&columns)?;
1537
1538 let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1547 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1548 if let Some(event_name) = event_name_opt.as_deref() {
1549 let store_for_schema = self.inner.db.store();
1550 if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1551 .is_some()
1552 {
1553 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1554 super::analytics_schema_registry::validate(
1555 store_for_schema.as_ref(),
1556 event_name,
1557 payload_json,
1558 )
1559 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1560 }
1561 }
1562
1563 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1568 Some(m) => m,
1569 None => event_name_opt.clone().ok_or_else(|| {
1570 RedDBError::Query(
1571 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1572 )
1573 })?,
1574 };
1575 let value = match find_column_value_opt_string(&columns, &values, "value") {
1579 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1580 None => columns
1581 .iter()
1582 .position(|c| c.eq_ignore_ascii_case("value"))
1583 .and_then(|i| match &values[i] {
1584 Value::Float(f) => Some(*f),
1585 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1586 Value::UnsignedInteger(n) => Some(*n as f64),
1587 _ => None,
1588 })
1589 .unwrap_or(1.0),
1590 };
1591 let timestamp_ns =
1592 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1593 let mut tags = find_timeseries_tags(&columns, &values)?;
1594 if let Some(ref name) = event_name_opt {
1595 tags.entry("event_name".to_string())
1596 .or_insert_with(|| name.clone());
1597 }
1598 if let Some(ref payload) = payload_opt {
1599 tags.entry("payload".to_string())
1600 .or_insert_with(|| payload.clone());
1601 }
1602
1603 let mut entity = UnifiedEntity::new(
1604 EntityId::new(0),
1605 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1606 series: collection.to_string(),
1607 metric: metric.clone(),
1608 })),
1609 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1610 metric,
1611 timestamp_ns,
1612 value,
1613 tags,
1614 }),
1615 );
1616 let writer_xid = match self.current_xid() {
1620 Some(xid) => xid,
1621 None => {
1622 let mgr = self.snapshot_manager();
1623 let xid = mgr.begin();
1624 mgr.commit(xid);
1625 xid
1626 }
1627 };
1628 entity.set_xmin(writer_xid);
1629
1630 let store = self.inner.db.store();
1631 let id = store
1632 .insert_auto(collection, entity)
1633 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1634
1635 if !metadata.is_empty() {
1636 let _ = store.set_metadata(
1637 collection,
1638 id,
1639 Metadata::with_fields(metadata.into_iter().collect()),
1640 );
1641 }
1642
1643 self.cdc_emit(
1644 crate::replication::cdc::ChangeOperation::Insert,
1645 collection,
1646 id.raw(),
1647 "timeseries",
1648 );
1649
1650 Ok(id)
1651 }
1652
1653 pub fn execute_update(
1658 &self,
1659 raw_query: &str,
1660 query: &UpdateQuery,
1661 ) -> RedDBResult<RuntimeQueryResult> {
1662 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1663 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1667 return Err(RedDBError::InvalidOperation(format!(
1668 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1669 query.table
1670 )));
1671 }
1672 crate::runtime::collection_contract::CollectionContractGate::check(
1678 self,
1679 &query.table,
1680 crate::runtime::collection_contract::MutationKind::Update,
1681 )?;
1682 ensure_update_target_contract(self, &query.table, query.target)?;
1683 ensure_graph_identity_update_target_allowed(query)?;
1684
1685 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1691 let augmented_query: UpdateQuery;
1692 let effective_query: &UpdateQuery = if rls_gated {
1693 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1694 self,
1695 &query.table,
1696 crate::storage::query::ast::PolicyAction::Update,
1697 );
1698 let Some(policy) = rls_filter else {
1699 let mut response = RuntimeQueryResult::dml_result(
1702 raw_query.to_string(),
1703 0,
1704 "update",
1705 "runtime-dml-rls",
1706 );
1707 if let Some(items) = query.returning.clone() {
1708 response.result = build_returning_result(&items, &[], None);
1709 }
1710 return Ok(response);
1711 };
1712 let mut augmented = query.clone();
1713 augmented.filter = Some(match augmented.filter.take() {
1714 Some(existing) => {
1715 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1716 }
1717 None => policy,
1718 });
1719 augmented_query = augmented;
1720 &augmented_query
1721 } else {
1722 query
1723 };
1724
1725 if let Some(items) = effective_query.returning.clone() {
1730 let mut inner_query = effective_query.clone();
1731 inner_query.returning = None;
1732 let (mut response, touched_ids) =
1733 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1734
1735 let snapshots = if matches!(
1736 effective_query.target,
1737 UpdateTarget::Nodes | UpdateTarget::Edges
1738 ) {
1739 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1740 } else {
1741 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1742 .row_snapshots(&touched_ids)
1743 };
1744
1745 response.result = build_returning_result(&items, &snapshots, None);
1746 response.engine = "runtime-dml-returning";
1747 return Ok(response);
1748 }
1749
1750 self.execute_update_inner(raw_query, effective_query)
1751 }
1752
1753 fn execute_update_inner(
1755 &self,
1756 raw_query: &str,
1757 query: &UpdateQuery,
1758 ) -> RedDBResult<RuntimeQueryResult> {
1759 self.execute_update_inner_tracked(raw_query, query)
1760 .map(|(res, _)| res)
1761 }
1762
1763 fn execute_update_inner_tracked(
1764 &self,
1765 raw_query: &str,
1766 query: &UpdateQuery,
1767 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1768 let store = self.inner.db.store();
1769 let effective_filter = effective_update_filter(query);
1770 let compiled_plan = self.compile_update_plan(query)?;
1771 let needs_rmw_lock = update_needs_rmw_lock(query);
1772 let table_rmw_lock = if needs_rmw_lock {
1773 Some(
1774 self.inner
1775 .rmw_locks
1776 .lock_for(&query.table, "__table_rmw_update__"),
1777 )
1778 } else {
1779 None
1780 };
1781 let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1782 let mut touched_ids: Vec<EntityId> = Vec::new();
1783 let limit_cap = query.limit.map(|l| l as usize);
1784 let manager = store
1785 .get_collection(&query.table)
1786 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1787 let scan_limit = if query.order_by.is_empty() {
1788 limit_cap
1789 } else {
1790 None
1791 };
1792 let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1793 self,
1794 &query.table,
1795 effective_filter.as_ref(),
1796 scan_limit,
1797 query.target,
1798 );
1799 if needs_rmw_lock {
1800 target_scan = target_scan.with_live_table_rows();
1801 }
1802 let ids_to_update = target_scan.find_target_ids()?;
1803 let ids_to_update = if query.order_by.is_empty() {
1804 ids_to_update
1805 } else {
1806 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1807 };
1808
1809 if needs_rmw_lock {
1810 return self.execute_update_inner_tracked_locked(
1811 raw_query,
1812 query,
1813 &compiled_plan,
1814 &ids_to_update,
1815 effective_filter.as_ref(),
1816 );
1817 }
1818
1819 let mut affected: u64 = 0;
1820 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1821 let mut applied_chunk = Vec::with_capacity(chunk.len());
1822 for entity in manager.get_many(chunk).into_iter().flatten() {
1823 let assignments =
1824 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1825 let applied = self.apply_materialized_update_for_entity(
1826 query.table.clone(),
1827 entity,
1828 &compiled_plan,
1829 assignments,
1830 )?;
1831 touched_ids.push(applied.id);
1832 applied_chunk.push(applied);
1833 }
1834 self.persist_update_chunk(&applied_chunk)?;
1835 affected += applied_chunk.len() as u64;
1836 let lsns = self.flush_update_chunk(&applied_chunk)?;
1837 if !query.suppress_events {
1838 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1839 }
1840 }
1841
1842 if affected > 0 {
1843 self.note_table_write(&query.table);
1844 }
1845
1846 Ok((
1847 RuntimeQueryResult::dml_result(
1848 raw_query.to_string(),
1849 affected,
1850 "update",
1851 "runtime-dml",
1852 ),
1853 touched_ids,
1854 ))
1855 }
1856
1857 fn execute_update_inner_tracked_locked(
1858 &self,
1859 raw_query: &str,
1860 query: &UpdateQuery,
1861 compiled_plan: &CompiledUpdatePlan,
1862 ids_to_update: &[EntityId],
1863 effective_filter: Option<&Filter>,
1864 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1865 let store = self.inner.db.store();
1866 let mut touched_ids = Vec::new();
1867 let mut lock_entries = Vec::new();
1868
1869 for id in ids_to_update {
1870 let Some(candidate) = store.get(&query.table, *id) else {
1871 continue;
1872 };
1873 let logical_id = candidate.logical_id();
1874 let lock_key = format!("row:{}", logical_id.raw());
1875 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1876 lock_entries.push((lock_key, logical_id, rmw_lock));
1877 }
1878
1879 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1880 lock_entries.dedup_by(|left, right| left.0 == right.0);
1881 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1882
1883 let mut applied_chunk = Vec::new();
1884 for (_, logical_id, _) in &lock_entries {
1885 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1886 else {
1887 continue;
1888 };
1889 if let Some(filter) = effective_filter {
1890 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1891 Some(self.inner.db.as_ref()),
1892 &entity,
1893 filter,
1894 &query.table,
1895 &query.table,
1896 ) {
1897 continue;
1898 }
1899 }
1900
1901 let assignments =
1902 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1903 let applied = self.apply_materialized_update_for_entity(
1904 query.table.clone(),
1905 entity,
1906 compiled_plan,
1907 assignments,
1908 )?;
1909 touched_ids.push(applied.id);
1910 applied_chunk.push(applied);
1911 }
1912
1913 let affected = applied_chunk.len() as u64;
1914 if !applied_chunk.is_empty() {
1915 self.persist_update_chunk(&applied_chunk)?;
1916 let lsns = self.flush_update_chunk(&applied_chunk)?;
1917 if !query.suppress_events {
1918 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1919 }
1920 }
1921
1922 if affected > 0 {
1923 self.note_table_write(&query.table);
1924 }
1925
1926 Ok((
1927 RuntimeQueryResult::dml_result(
1928 raw_query.to_string(),
1929 affected,
1930 "update",
1931 "runtime-dml",
1932 ),
1933 touched_ids,
1934 ))
1935 }
1936
1937 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1938 let mut static_field_assignments = Vec::new();
1939 let mut static_metadata_assignments = Vec::new();
1940 let mut dynamic_assignments = Vec::new();
1941 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1942 let mut row_modified_columns = Vec::new();
1943
1944 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1945 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1946 let metadata_key = resolve_sql_ttl_metadata_key(column);
1947 if compound_op.is_some() && metadata_key.is_some() {
1948 return Err(RedDBError::Query(format!(
1949 "compound assignment is only supported for row fields: {column}"
1950 )));
1951 }
1952 if compound_op.is_none() {
1953 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1954 if let Some(metadata_key) = metadata_key {
1955 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1956 let (canonical_key, canonical_value) =
1957 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1958 static_metadata_assignments
1959 .push((canonical_key.to_string(), canonical_value));
1960 } else {
1961 let value = self.resolve_crypto_sentinel(value)?;
1962 static_field_assignments.push((
1963 column.clone(),
1964 normalize_row_update_assignment_with_plan(
1965 &query.table,
1966 column,
1967 value,
1968 row_contract_plan.as_ref(),
1969 )?,
1970 ));
1971 row_modified_columns.push(column.clone());
1972 }
1973 continue;
1974 }
1975 }
1976
1977 dynamic_assignments.push(CompiledUpdateAssignment {
1978 column: column.clone(),
1979 expr: expr.clone(),
1980 compound_op,
1981 metadata_key,
1982 row_rule: if metadata_key.is_none() {
1983 if let Some(plan) = row_contract_plan.as_ref() {
1984 if plan.timestamps_enabled
1985 && (column == "created_at" || column == "updated_at")
1986 {
1987 return Err(RedDBError::Query(format!(
1988 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1989 query.table, column
1990 )));
1991 }
1992 if let Some(rule) = plan.declared_rules.get(column) {
1993 Some(rule.clone())
1994 } else if plan.strict_schema {
1995 return Err(RedDBError::Query(format!(
1996 "collection '{}' is strict and does not allow undeclared fields: {}",
1997 query.table, column
1998 )));
1999 } else {
2000 None
2001 }
2002 } else {
2003 None
2004 }
2005 } else {
2006 None
2007 },
2008 });
2009 if metadata_key.is_none() {
2010 row_modified_columns.push(column.clone());
2011 }
2012 }
2013
2014 let row_modified_columns = dedupe_update_columns(row_modified_columns);
2015 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2016 row_modified_columns.iter().any(|column| {
2017 plan.unique_columns
2018 .keys()
2019 .any(|unique| unique.eq_ignore_ascii_case(column))
2020 })
2021 });
2022
2023 if let Some(ttl_ms) = query.ttl_ms {
2024 static_metadata_assignments
2025 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2026 }
2027 if let Some(expires_at_ms) = query.expires_at_ms {
2028 static_metadata_assignments.push((
2029 "_expires_at".to_string(),
2030 metadata_u64_to_value(expires_at_ms),
2031 ));
2032 }
2033 for (key, val) in &query.with_metadata {
2034 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2035 }
2036
2037 Ok(CompiledUpdatePlan {
2038 static_field_assignments,
2039 static_metadata_assignments,
2040 dynamic_assignments,
2041 row_contract_plan,
2042 row_modified_columns,
2043 row_touches_unique_columns,
2044 })
2045 }
2046
2047 fn materialize_update_assignments_for_entity(
2048 &self,
2049 query: &UpdateQuery,
2050 entity: &UnifiedEntity,
2051 compiled_plan: &CompiledUpdatePlan,
2052 ) -> RedDBResult<MaterializedUpdateAssignments> {
2053 let mut assignments = MaterializedUpdateAssignments::default();
2054 let mut record: Option<UnifiedRecord> = None;
2055
2056 for assignment in &compiled_plan.dynamic_assignments {
2057 if assignment.compound_op.is_some()
2058 && !matches!(
2059 entity.data,
2060 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2061 )
2062 {
2063 return Err(RedDBError::Query(format!(
2064 "compound assignment is only supported for row or graph UPDATE column '{}'",
2065 assignment.column
2066 )));
2067 }
2068 if record.is_none() {
2069 record = runtime_any_record_from_entity_ref(entity);
2070 }
2071 let Some(record) = record.as_ref() else {
2072 return Err(RedDBError::Query(format!(
2073 "UPDATE could not materialize runtime record for entity {} in '{}'",
2074 entity.id.raw(),
2075 query.table
2076 )));
2077 };
2078 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2079 Some(self.inner.db.as_ref()),
2080 &assignment.expr,
2081 record,
2082 Some(query.table.as_str()),
2083 Some(query.table.as_str()),
2084 )
2085 .ok_or_else(|| {
2086 RedDBError::Query(format!(
2087 "failed to evaluate UPDATE expression for column '{}'",
2088 assignment.column
2089 ))
2090 })?;
2091 let value = if let Some(op) = assignment.compound_op {
2092 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2093 } else {
2094 rhs
2095 };
2096
2097 if let Some(metadata_key) = assignment.metadata_key {
2098 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2099 let (canonical_key, canonical_value) =
2100 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2101 assignments
2102 .dynamic_metadata_assignments
2103 .push((canonical_key.to_string(), canonical_value));
2104 } else {
2105 assignments.dynamic_field_assignments.push((
2106 assignment.column.clone(),
2107 normalize_row_update_value_for_rule(
2108 &query.table,
2109 self.resolve_crypto_sentinel(value)?,
2110 assignment.row_rule.as_ref(),
2111 )?,
2112 ));
2113 }
2114 }
2115
2116 Ok(assignments)
2117 }
2118
2119 fn apply_materialized_update_for_entity(
2120 &self,
2121 collection: String,
2122 entity: UnifiedEntity,
2123 compiled_plan: &CompiledUpdatePlan,
2124 assignments: MaterializedUpdateAssignments,
2125 ) -> RedDBResult<AppliedEntityMutation> {
2126 if matches!(entity.data, EntityData::Row(_)) {
2127 return self.apply_loaded_sql_update_row_core(
2128 collection,
2129 entity,
2130 &compiled_plan.static_field_assignments,
2131 assignments.dynamic_field_assignments,
2132 &compiled_plan.static_metadata_assignments,
2133 assignments.dynamic_metadata_assignments,
2134 compiled_plan.row_contract_plan.as_ref(),
2135 &compiled_plan.row_modified_columns,
2136 compiled_plan.row_touches_unique_columns,
2137 );
2138 }
2139
2140 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2141
2142 let operations = build_patch_operations_from_materialized_assignments(
2143 &entity,
2144 compiled_plan,
2145 assignments,
2146 );
2147 self.apply_loaded_patch_entity_core(
2148 collection,
2149 entity,
2150 crate::json::Value::Null,
2151 operations,
2152 )
2153 }
2154
2155 pub fn execute_delete(
2157 &self,
2158 raw_query: &str,
2159 query: &DeleteQuery,
2160 ) -> RedDBResult<RuntimeQueryResult> {
2161 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2162 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2165 return Err(RedDBError::InvalidOperation(format!(
2166 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2167 query.table
2168 )));
2169 }
2170 crate::runtime::collection_contract::CollectionContractGate::check(
2174 self,
2175 &query.table,
2176 crate::runtime::collection_contract::MutationKind::Delete,
2177 )?;
2178
2179 if let Some(items) = query.returning.clone() {
2186 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2187 RedDBError::Query(
2188 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2189 )
2190 })?;
2191 let captured = self.execute_query(&select_sql)?;
2192
2193 let mut inner_query = query.clone();
2194 inner_query.returning = None;
2195 let _ = self.execute_delete(raw_query, &inner_query)?;
2196
2197 let snapshots: Vec<Vec<(String, Value)>> = captured
2198 .result
2199 .records
2200 .iter()
2201 .map(|rec| {
2202 rec.iter_fields()
2203 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2204 .collect()
2205 })
2206 .collect();
2207 let affected = snapshots.len() as u64;
2208 let result = build_returning_result(&items, &snapshots, None);
2209
2210 let mut response = RuntimeQueryResult::dml_result(
2211 raw_query.to_string(),
2212 affected,
2213 "delete",
2214 "runtime-dml-returning",
2215 );
2216 response.result = result;
2217 return Ok(response);
2218 }
2219 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2226 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2227 self,
2228 &query.table,
2229 crate::storage::query::ast::PolicyAction::Delete,
2230 );
2231 let Some(policy) = rls_filter else {
2232 return Ok(RuntimeQueryResult::dml_result(
2233 raw_query.to_string(),
2234 0,
2235 "delete",
2236 "runtime-dml-rls",
2237 ));
2238 };
2239 let mut augmented = query.clone();
2244 augmented.filter = Some(match augmented.filter.take() {
2245 Some(existing) => {
2246 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2247 }
2248 None => policy,
2249 });
2250 return self.execute_delete_inner(raw_query, &augmented);
2251 }
2252 self.execute_delete_inner(raw_query, query)
2253 }
2254
2255 fn execute_delete_inner(
2256 &self,
2257 raw_query: &str,
2258 query: &DeleteQuery,
2259 ) -> RedDBResult<RuntimeQueryResult> {
2260 let effective_filter = effective_delete_filter(query);
2261
2262 let scan = super::dml_target_scan::DmlTargetScan::new(
2266 self,
2267 &query.table,
2268 effective_filter.as_ref(),
2269 None,
2270 );
2271 let ids_to_delete = scan.find_target_ids()?;
2272
2273 let needs_delete_events =
2276 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2277 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2278 scan.row_json_pre_images(&ids_to_delete)
2279 } else {
2280 HashMap::new()
2281 };
2282
2283 let mut affected: u64 = 0;
2284 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2285 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2286 affected += count;
2287 if needs_delete_events && !lsns.is_empty() {
2288 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2292 self.emit_delete_events_for_collection(
2293 &query.table,
2294 deleted_chunk,
2295 &lsns,
2296 &pre_images,
2297 )?;
2298 }
2299 }
2300 pre_images.clear();
2301
2302 if affected > 0 {
2303 self.note_table_write(&query.table);
2304 }
2305
2306 Ok(RuntimeQueryResult::dml_result(
2307 raw_query.to_string(),
2308 affected,
2309 "delete",
2310 "runtime-dml",
2311 ))
2312 }
2313}
2314
2315fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2321 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2322 return Ok(());
2323 }
2324 for (column, _) in &query.assignment_exprs {
2325 if is_immutable_graph_identity_field(column) {
2326 return Err(RedDBError::Query(format!(
2327 "immutable graph field '{column}' cannot be updated"
2328 )));
2329 }
2330 }
2331 Ok(())
2332}
2333
2334fn ensure_graph_identity_update_allowed(
2335 entity: &UnifiedEntity,
2336 compiled_plan: &CompiledUpdatePlan,
2337 assignments: &MaterializedUpdateAssignments,
2338) -> RedDBResult<()> {
2339 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2340 return Ok(());
2341 }
2342
2343 for (column, _) in compiled_plan
2344 .static_field_assignments
2345 .iter()
2346 .chain(assignments.dynamic_field_assignments.iter())
2347 {
2348 if is_immutable_graph_identity_field(column) {
2349 return Err(RedDBError::Query(format!(
2350 "immutable graph field '{column}' cannot be updated"
2351 )));
2352 }
2353 }
2354
2355 Ok(())
2356}
2357
2358fn is_immutable_graph_identity_field(column: &str) -> bool {
2359 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2360 .iter()
2361 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2362}
2363
2364fn build_patch_operations_from_materialized_assignments(
2365 entity: &UnifiedEntity,
2366 compiled_plan: &CompiledUpdatePlan,
2367 assignments: MaterializedUpdateAssignments,
2368) -> Vec<PatchEntityOperation> {
2369 let mut operations = Vec::with_capacity(
2370 compiled_plan.static_field_assignments.len()
2371 + compiled_plan.static_metadata_assignments.len()
2372 + assignments.dynamic_field_assignments.len()
2373 + assignments.dynamic_metadata_assignments.len(),
2374 );
2375
2376 for (column, value) in &compiled_plan.static_field_assignments {
2377 operations.push(PatchEntityOperation {
2378 op: PatchEntityOperationType::Set,
2379 path: update_patch_path_for_entity(entity, column),
2380 value: Some(storage_value_to_json(value)),
2381 });
2382 }
2383
2384 for (column, value) in assignments.dynamic_field_assignments {
2385 operations.push(PatchEntityOperation {
2386 op: PatchEntityOperationType::Set,
2387 path: update_patch_path_for_entity(entity, &column),
2388 value: Some(storage_value_to_json(&value)),
2389 });
2390 }
2391
2392 for (key, value) in &compiled_plan.static_metadata_assignments {
2393 operations.push(PatchEntityOperation {
2394 op: PatchEntityOperationType::Set,
2395 path: vec!["metadata".to_string(), key.clone()],
2396 value: Some(metadata_value_to_json(value)),
2397 });
2398 }
2399
2400 for (key, value) in assignments.dynamic_metadata_assignments {
2401 operations.push(PatchEntityOperation {
2402 op: PatchEntityOperationType::Set,
2403 path: vec!["metadata".to_string(), key],
2404 value: Some(metadata_value_to_json(&value)),
2405 });
2406 }
2407
2408 operations
2409}
2410
2411fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2412 if matches!(
2413 (&entity.kind, &entity.data),
2414 (
2415 crate::storage::EntityKind::GraphNode(_),
2416 EntityData::Node(_)
2417 )
2418 ) && column.eq_ignore_ascii_case("node_type")
2419 {
2420 return vec!["node_type".to_string()];
2421 }
2422 if matches!(
2423 (&entity.kind, &entity.data),
2424 (
2425 crate::storage::EntityKind::GraphEdge(_),
2426 EntityData::Edge(_)
2427 )
2428 ) && column.eq_ignore_ascii_case("weight")
2429 {
2430 return vec!["weight".to_string()];
2431 }
2432 vec!["fields".to_string(), column.to_string()]
2433}
2434
2435fn delete_to_select_sql(sql: &str) -> Option<String> {
2445 let trimmed = sql.trim_start();
2446 let lowered = trimmed.to_ascii_lowercase();
2447 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2448 return None;
2449 }
2450 let from_idx = lowered.find(" from ")?;
2452 let after_from = &trimmed[from_idx + " from ".len()..];
2453 let after_from_lc = &lowered[from_idx + " from ".len()..];
2454
2455 let mut body = after_from.to_string();
2460 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2461 body.truncate(pos);
2462 }
2463 Some(format!("SELECT * FROM {}", body.trim_end()))
2464}
2465
2466fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2471 let bytes = haystack.as_bytes();
2472 let nlen = needle.len();
2473 let mut i = 0usize;
2474 let mut in_string = false;
2475 while i < bytes.len() {
2476 let c = bytes[i];
2477 if c == b'\'' {
2478 in_string = !in_string;
2479 i += 1;
2480 continue;
2481 }
2482 if !in_string
2483 && i + nlen <= bytes.len()
2484 && &bytes[i..i + nlen] == needle.as_bytes()
2485 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2486 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2487 {
2488 return Some(i);
2489 }
2490 i += 1;
2491 }
2492 None
2493}
2494
2495fn build_returning_result(
2502 items: &[ReturningItem],
2503 snapshots: &[Vec<(String, Value)>],
2504 outputs: Option<&[CreateEntityOutput]>,
2505) -> UnifiedResult {
2506 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2507 let public_item_outputs = outputs.is_some_and(|outs| {
2508 outs.first()
2509 .and_then(|out| out.entity.as_ref())
2510 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2511 });
2512
2513 let mut columns: Vec<String> = if project_all {
2514 let mut cols: Vec<String> = Vec::new();
2515 if public_item_outputs {
2516 cols.extend(
2517 [
2518 "rid",
2519 "collection",
2520 "kind",
2521 "tenant",
2522 "created_at",
2523 "updated_at",
2524 ]
2525 .into_iter()
2526 .map(str::to_string),
2527 );
2528 } else if outputs.is_some() {
2529 cols.push("rid".to_string());
2530 }
2531 if let Some(first) = snapshots.first() {
2532 for (name, _) in first {
2533 cols.push(name.clone());
2534 }
2535 }
2536 cols
2537 } else {
2538 items
2539 .iter()
2540 .filter_map(|it| match it {
2541 ReturningItem::Column(c) => Some(c.clone()),
2542 ReturningItem::All => None,
2543 })
2544 .collect()
2545 };
2546 {
2548 let mut seen = std::collections::HashSet::new();
2549 columns.retain(|c| seen.insert(c.clone()));
2550 }
2551
2552 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2553 for (idx, snap) in snapshots.iter().enumerate() {
2554 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2555 if let Some(outs) = outputs {
2556 if let Some(out) = outs.get(idx) {
2557 if let Some(entity) = out.entity.as_ref() {
2558 if let Some(kind) = public_returning_item_kind(entity) {
2559 values.insert(
2560 Arc::clone(&sys_key_rid()),
2561 Value::UnsignedInteger(out.id.raw()),
2562 );
2563 values.insert(
2564 Arc::clone(&sys_key_collection()),
2565 Value::text(entity.kind.collection().to_string()),
2566 );
2567 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2568 values.insert(
2569 Arc::clone(&sys_key_created_at()),
2570 Value::UnsignedInteger(entity.created_at),
2571 );
2572 values.insert(
2573 Arc::clone(&sys_key_updated_at()),
2574 Value::UnsignedInteger(entity.updated_at),
2575 );
2576 } else {
2577 values.insert(
2578 Arc::clone(&sys_key_rid()),
2579 Value::Integer(out.id.raw() as i64),
2580 );
2581 }
2582 } else {
2583 values.insert(
2584 Arc::clone(&sys_key_rid()),
2585 Value::Integer(out.id.raw() as i64),
2586 );
2587 }
2588 }
2589 }
2590 for (name, val) in snap {
2591 values.insert(Arc::from(name.as_str()), val.clone());
2592 }
2593 if !values.contains_key("tenant") {
2594 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2595 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2596 }
2597 let mut rec = UnifiedRecord::default();
2598 for col in &columns {
2600 if let Some(v) = values.get(col.as_str()) {
2601 rec.set_arc(Arc::from(col.as_str()), v.clone());
2602 }
2603 }
2604 records.push(rec);
2605 }
2606
2607 UnifiedResult {
2608 columns,
2609 records,
2610 stats: Default::default(),
2611 pre_serialized_json: None,
2612 }
2613}
2614
2615fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2616 match (&entity.kind, &entity.data) {
2617 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2618 Some("node")
2619 }
2620 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2621 Some("edge")
2622 }
2623 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2624 (_, crate::storage::EntityData::Vector(_)) => Some("vector"),
2629 _ => None,
2630 }
2631}
2632
2633fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2634 let Some(row) = entity.data.as_row() else {
2635 return "row";
2636 };
2637
2638 let is_kv = row.named.as_ref().is_some_and(|named| {
2639 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2640 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2641 });
2642 if is_kv {
2643 return "kv";
2644 }
2645
2646 let is_document = row
2647 .named
2648 .as_ref()
2649 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2650 || row.columns.iter().any(runtime_returning_documentish_value);
2651 if is_document {
2652 "document"
2653 } else {
2654 "row"
2655 }
2656}
2657
2658fn runtime_returning_documentish_value(value: &Value) -> bool {
2659 matches!(value, Value::Json(_) | Value::Blob(_))
2660}
2661
2662fn row_insert_returning_snapshots(
2663 outputs: &[CreateEntityOutput],
2664 fallback: Vec<Vec<(String, Value)>>,
2665) -> Vec<Vec<(String, Value)>> {
2666 outputs
2667 .iter()
2668 .enumerate()
2669 .map(|(idx, out)| {
2670 out.entity
2671 .as_ref()
2672 .map(entity_row_fields_snapshot)
2673 .filter(|snap| !snap.is_empty())
2674 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2675 })
2676 .collect()
2677}
2678
2679fn graph_insert_returning_snapshots(
2680 store: &crate::storage::unified::UnifiedStore,
2681 collection: &str,
2682 ids: &[EntityId],
2683) -> Vec<Vec<(String, Value)>> {
2684 let Some(manager) = store.get_collection(collection) else {
2685 return Vec::new();
2686 };
2687
2688 ids.iter()
2689 .filter_map(|id| manager.get(*id))
2690 .filter_map(|entity| {
2691 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2692 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2693 Some(record)
2694 })
2695 .map(|record| {
2696 record
2697 .iter_fields()
2698 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2699 .collect()
2700 })
2701 .collect()
2702}
2703
2704fn graph_update_returning_snapshots(
2705 runtime: &RedDBRuntime,
2706 collection: &str,
2707 ids: &[EntityId],
2708) -> Vec<Vec<(String, Value)>> {
2709 let store = runtime.db().store();
2710 let Some(manager) = store.get_collection(collection) else {
2711 return Vec::new();
2712 };
2713
2714 manager
2715 .get_many(ids)
2716 .into_iter()
2717 .flatten()
2718 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2719 .map(|record| {
2720 record
2721 .iter_fields()
2722 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2723 .collect()
2724 })
2725 .collect()
2726}
2727
2728fn ensure_update_target_contract(
2729 runtime: &RedDBRuntime,
2730 collection: &str,
2731 target: UpdateTarget,
2732) -> RedDBResult<()> {
2733 let Some(contract) = runtime.db().collection_contract(collection) else {
2734 return Ok(());
2735 };
2736 if update_target_contract_is_advisory(&contract)
2737 || update_target_allows_model(contract.declared_model, update_target_model(target))
2738 {
2739 return Ok(());
2740 }
2741 Err(RedDBError::InvalidOperation(format!(
2742 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2743 collection,
2744 update_model_name(contract.declared_model),
2745 update_model_name(update_target_model(target))
2746 )))
2747}
2748
2749fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2750 matches!(
2751 (&contract.origin, &contract.schema_mode),
2752 (
2753 crate::physical::ContractOrigin::Implicit,
2754 crate::catalog::SchemaMode::Dynamic,
2755 )
2756 )
2757}
2758
2759fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2760 match target {
2761 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2762 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2763 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2764 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2765 }
2766}
2767
2768fn update_target_allows_model(
2769 declared_model: crate::catalog::CollectionModel,
2770 requested_model: crate::catalog::CollectionModel,
2771) -> bool {
2772 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2773}
2774
2775fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2776 match model {
2777 crate::catalog::CollectionModel::Table => "table",
2778 crate::catalog::CollectionModel::Document => "document",
2779 crate::catalog::CollectionModel::Graph => "graph",
2780 crate::catalog::CollectionModel::Vector => "vector",
2781 crate::catalog::CollectionModel::Hll => "hll",
2782 crate::catalog::CollectionModel::Sketch => "sketch",
2783 crate::catalog::CollectionModel::Filter => "filter",
2784 crate::catalog::CollectionModel::Kv => "kv",
2785 crate::catalog::CollectionModel::Config => "config",
2786 crate::catalog::CollectionModel::Vault => "vault",
2787 crate::catalog::CollectionModel::Mixed => "mixed",
2788 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2789 crate::catalog::CollectionModel::Queue => "queue",
2790 crate::catalog::CollectionModel::Metrics => "metrics",
2791 }
2792}
2793
2794fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2795 let db = runtime.db();
2796 if let Some(contract) = db.collection_contract(collection) {
2797 let advisory_implicit_dynamic = matches!(
2798 (&contract.origin, &contract.schema_mode),
2799 (
2800 crate::physical::ContractOrigin::Implicit,
2801 crate::catalog::SchemaMode::Dynamic,
2802 )
2803 );
2804 if advisory_implicit_dynamic
2805 || matches!(
2806 contract.declared_model,
2807 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2808 )
2809 {
2810 return Ok(());
2811 }
2812 return Err(RedDBError::InvalidOperation(format!(
2813 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2814 collection, contract.declared_model
2815 )));
2816 }
2817
2818 let now = std::time::SystemTime::now()
2819 .duration_since(std::time::UNIX_EPOCH)
2820 .unwrap_or_default()
2821 .as_millis();
2822 db.save_collection_contract(crate::physical::CollectionContract {
2823 name: collection.to_string(),
2824 declared_model: crate::catalog::CollectionModel::Graph,
2825 schema_mode: crate::catalog::SchemaMode::Dynamic,
2826 origin: crate::physical::ContractOrigin::Implicit,
2827 version: 1,
2828 created_at_unix_ms: now,
2829 updated_at_unix_ms: now,
2830 default_ttl_ms: db.collection_default_ttl_ms(collection),
2831 vector_dimension: None,
2832 vector_metric: None,
2833 context_index_fields: Vec::new(),
2834 declared_columns: Vec::new(),
2835 table_def: None,
2836 timestamps_enabled: false,
2837 context_index_enabled: false,
2838 metrics_raw_retention_ms: None,
2839 metrics_rollup_policies: Vec::new(),
2840 metrics_tenant_identity: None,
2841 metrics_namespace: None,
2842 append_only: false,
2843 subscriptions: Vec::new(),
2844 analytics_config: Vec::new(),
2845 session_key: None,
2846 session_gap_ms: None,
2847 retention_duration_ms: None,
2848 analytical_storage: None,
2849
2850 ai_policy: None,
2851 })
2852 .map(|_| ())
2853 .map_err(|err| RedDBError::Internal(err.to_string()))
2854}
2855
2856fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2857 query
2858 .assignment_exprs
2859 .iter()
2860 .enumerate()
2861 .any(|(idx, (column, expr))| {
2862 query
2863 .compound_assignment_ops
2864 .get(idx)
2865 .is_some_and(|op| op.is_some())
2866 || expr_references_update_column(expr, &query.table, column)
2867 })
2868}
2869
2870fn evaluate_compound_update_assignment(
2871 column: &str,
2872 record: &UnifiedRecord,
2873 op: BinOp,
2874 rhs: Value,
2875) -> RedDBResult<Value> {
2876 let lhs = record.get(column).ok_or_else(|| {
2877 RedDBError::Query(format!(
2878 "compound assignment requires existing numeric field '{column}'"
2879 ))
2880 })?;
2881 if matches!(lhs, Value::Null) {
2882 return Err(RedDBError::Query(format!(
2883 "compound assignment requires non-null numeric field '{column}'"
2884 )));
2885 }
2886 apply_compound_numeric_op(column, op, lhs, &rhs)
2887}
2888
2889fn apply_compound_numeric_op(
2890 column: &str,
2891 op: BinOp,
2892 lhs: &Value,
2893 rhs: &Value,
2894) -> RedDBResult<Value> {
2895 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2896 return Err(RedDBError::Query(format!(
2897 "compound assignment requires numeric field '{column}'"
2898 )));
2899 };
2900 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2901 return Err(RedDBError::Query(format!(
2902 "compound assignment requires numeric right-hand value for field '{column}'"
2903 )));
2904 };
2905
2906 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2907 let a = lhs_number.as_f64();
2908 let b = rhs_number.as_f64();
2909 let out = match op {
2910 BinOp::Add => a + b,
2911 BinOp::Sub => a - b,
2912 BinOp::Mul => a * b,
2913 BinOp::Div => {
2914 if b == 0.0 {
2915 return Err(RedDBError::Query(format!(
2916 "division by zero in compound assignment for field '{column}'"
2917 )));
2918 }
2919 a / b
2920 }
2921 BinOp::Mod => {
2922 if b == 0.0 {
2923 return Err(RedDBError::Query(format!(
2924 "modulo by zero in compound assignment for field '{column}'"
2925 )));
2926 }
2927 a % b
2928 }
2929 _ => {
2930 return Err(RedDBError::Query(format!(
2931 "unsupported compound assignment operator for field '{column}'"
2932 )));
2933 }
2934 };
2935 if !out.is_finite() {
2936 return Err(RedDBError::Query(format!(
2937 "numeric overflow in compound assignment for field '{column}'"
2938 )));
2939 }
2940 return Ok(Value::Float(out));
2941 }
2942
2943 let a = lhs_number.as_i128();
2944 let b = rhs_number.as_i128();
2945 let out = match op {
2946 BinOp::Add => a.checked_add(b),
2947 BinOp::Sub => a.checked_sub(b),
2948 BinOp::Mul => a.checked_mul(b),
2949 BinOp::Mod => {
2950 if b == 0 {
2951 return Err(RedDBError::Query(format!(
2952 "modulo by zero in compound assignment for field '{column}'"
2953 )));
2954 }
2955 a.checked_rem(b)
2956 }
2957 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2958 _ => None,
2959 }
2960 .ok_or_else(|| {
2961 RedDBError::Query(format!(
2962 "numeric overflow in compound assignment for field '{column}'"
2963 ))
2964 })?;
2965
2966 if matches!(lhs, Value::UnsignedInteger(_)) {
2967 let value = u64::try_from(out).map_err(|_| {
2968 RedDBError::Query(format!(
2969 "numeric overflow in compound assignment for field '{column}'"
2970 ))
2971 })?;
2972 Ok(Value::UnsignedInteger(value))
2973 } else {
2974 let value = i64::try_from(out).map_err(|_| {
2975 RedDBError::Query(format!(
2976 "numeric overflow in compound assignment for field '{column}'"
2977 ))
2978 })?;
2979 Ok(Value::Integer(value))
2980 }
2981}
2982
2983#[derive(Clone, Copy)]
2984enum CompoundNumber {
2985 Integer(i128),
2986 Float(f64),
2987}
2988
2989impl CompoundNumber {
2990 fn from_value(value: &Value) -> Option<Self> {
2991 match value {
2992 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2993 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2994 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2995 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2996 _ => None,
2997 }
2998 }
2999
3000 fn is_float(self) -> bool {
3001 matches!(self, Self::Float(_))
3002 }
3003
3004 fn as_f64(self) -> f64 {
3005 match self {
3006 Self::Integer(value) => value as f64,
3007 Self::Float(value) => value,
3008 }
3009 }
3010
3011 fn as_i128(self) -> i128 {
3012 match self {
3013 Self::Integer(value) => value,
3014 Self::Float(_) => unreachable!("float compound number used as integer"),
3015 }
3016 }
3017}
3018
3019fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3020 match expr {
3021 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3022 Expr::Column { field, .. } => {
3023 field_ref_matches_update_column(field, table_name, target_column)
3024 }
3025 Expr::BinaryOp { lhs, rhs, .. } => {
3026 expr_references_update_column(lhs, table_name, target_column)
3027 || expr_references_update_column(rhs, table_name, target_column)
3028 }
3029 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3030 expr_references_update_column(operand, table_name, target_column)
3031 }
3032 Expr::FunctionCall { args, .. } => args
3033 .iter()
3034 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3035 Expr::Case {
3036 branches, else_, ..
3037 } => {
3038 branches.iter().any(|(cond, value)| {
3039 expr_references_update_column(cond, table_name, target_column)
3040 || expr_references_update_column(value, table_name, target_column)
3041 }) || else_
3042 .as_deref()
3043 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3044 }
3045 Expr::IsNull { operand, .. } => {
3046 expr_references_update_column(operand, table_name, target_column)
3047 }
3048 Expr::InList { target, values, .. } => {
3049 expr_references_update_column(target, table_name, target_column)
3050 || values
3051 .iter()
3052 .any(|value| expr_references_update_column(value, table_name, target_column))
3053 }
3054 Expr::Between {
3055 target, low, high, ..
3056 } => {
3057 expr_references_update_column(target, table_name, target_column)
3058 || expr_references_update_column(low, table_name, target_column)
3059 || expr_references_update_column(high, table_name, target_column)
3060 }
3061 Expr::WindowFunctionCall { args, window, .. } => {
3062 args.iter()
3063 .any(|arg| expr_references_update_column(arg, table_name, target_column))
3064 || window
3065 .partition_by
3066 .iter()
3067 .any(|e| expr_references_update_column(e, table_name, target_column))
3068 || window
3069 .order_by
3070 .iter()
3071 .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3072 }
3073 }
3074}
3075
3076fn field_ref_matches_update_column(
3077 field: &FieldRef,
3078 table_name: &str,
3079 target_column: &str,
3080) -> bool {
3081 match field {
3082 FieldRef::TableColumn { table, column } => {
3083 column.eq_ignore_ascii_case(target_column)
3084 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3085 }
3086 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3087 false
3088 }
3089 }
3090}
3091
3092fn resolve_update_entity_by_logical_id(
3093 runtime: &RedDBRuntime,
3094 table: &str,
3095 logical_id: EntityId,
3096) -> Option<UnifiedEntity> {
3097 let store = runtime.inner.db.store();
3098 if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3099 return Some(entity);
3100 }
3101 store.get(table, logical_id)
3104}
3105
3106fn update_cdc_item_kind(
3107 runtime: &RedDBRuntime,
3108 collection: &str,
3109 entity: &UnifiedEntity,
3110) -> &'static str {
3111 match &entity.data {
3112 EntityData::Node(_) => return "node",
3113 EntityData::Edge(_) => return "edge",
3114 _ => {}
3115 }
3116
3117 match runtime
3118 .db()
3119 .collection_contract(collection)
3120 .map(|contract| contract.declared_model)
3121 {
3122 Some(crate::catalog::CollectionModel::Document) => "document",
3123 Some(crate::catalog::CollectionModel::Kv)
3124 | Some(crate::catalog::CollectionModel::Vault) => "kv",
3125 _ => "row",
3126 }
3127}
3128
3129fn ordered_update_target_ids(
3130 manager: &Arc<crate::storage::SegmentManager>,
3131 entity_ids: &[EntityId],
3132 order_by: &[OrderByClause],
3133 limit: Option<usize>,
3134) -> Vec<EntityId> {
3135 let mut entities: Vec<UnifiedEntity> =
3136 manager.get_many(entity_ids).into_iter().flatten().collect();
3137 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3138 if let Some(limit) = limit {
3139 entities.truncate(limit);
3140 }
3141 entities.into_iter().map(|entity| entity.id).collect()
3142}
3143
3144fn compare_update_order(
3145 left: &UnifiedEntity,
3146 right: &UnifiedEntity,
3147 order_by: &[OrderByClause],
3148) -> Ordering {
3149 for clause in order_by {
3150 let left_value = update_order_value(left, &clause.field);
3151 let right_value = update_order_value(right, &clause.field);
3152 let ordering = compare_update_order_values(
3153 left_value.as_ref(),
3154 right_value.as_ref(),
3155 clause.nulls_first,
3156 );
3157 if ordering != Ordering::Equal {
3158 return if clause.ascending {
3159 ordering
3160 } else {
3161 ordering.reverse()
3162 };
3163 }
3164 }
3165 left.logical_id().raw().cmp(&right.logical_id().raw())
3166}
3167
3168fn compare_update_order_values(
3169 left: Option<&Value>,
3170 right: Option<&Value>,
3171 nulls_first: bool,
3172) -> Ordering {
3173 match (left, right) {
3174 (None, None) => Ordering::Equal,
3175 (None, Some(_)) => {
3176 if nulls_first {
3177 Ordering::Less
3178 } else {
3179 Ordering::Greater
3180 }
3181 }
3182 (Some(_), None) => {
3183 if nulls_first {
3184 Ordering::Greater
3185 } else {
3186 Ordering::Less
3187 }
3188 }
3189 (Some(left), Some(right)) => {
3190 crate::storage::query::value_compare::total_compare_values(left, right)
3191 }
3192 }
3193}
3194
3195fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3196 let FieldRef::TableColumn { table, column } = field else {
3197 return None;
3198 };
3199 if !table.is_empty() {
3200 return None;
3201 }
3202 if column.eq_ignore_ascii_case("rid") {
3203 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3204 }
3205 match &entity.data {
3206 EntityData::Row(row) => row.get_field(column).cloned(),
3207 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3208 .and_then(|record| record.get(column).cloned()),
3209 _ => None,
3210 }
3211}
3212
3213fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3214 if columns.is_empty() {
3215 return columns;
3216 }
3217
3218 let mut unique = Vec::with_capacity(columns.len());
3219 for column in columns.drain(..) {
3220 if !unique
3221 .iter()
3222 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3223 {
3224 unique.push(column);
3225 }
3226 }
3227 unique
3228}
3229
3230const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3235
3236fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3237 if column.eq_ignore_ascii_case("_ttl") {
3238 Some(SQL_TTL_METADATA_COLUMNS[0])
3239 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3240 Some(SQL_TTL_METADATA_COLUMNS[1])
3241 } else if column.eq_ignore_ascii_case("_expires_at") {
3242 Some(SQL_TTL_METADATA_COLUMNS[2])
3243 } else {
3244 None
3245 }
3246}
3247
3248fn canonicalize_sql_ttl_metadata(
3253 key: &'static str,
3254 value: MetadataValue,
3255) -> (&'static str, MetadataValue) {
3256 if key != "_ttl" {
3257 return (key, value);
3258 }
3259 let scaled = match value {
3260 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3261 MetadataValue::Timestamp(ms_or_s) => {
3262 MetadataValue::Timestamp(ms_or_s)
3265 }
3266 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3267 other => other,
3268 };
3269 ("_ttl_ms", scaled)
3270}
3271
3272pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3276
3277impl RedDBRuntime {
3278 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3284 match value {
3285 Value::Password(marked) => {
3286 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3287 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3288 } else {
3289 Ok(Value::Password(marked))
3290 }
3291 }
3292 Value::Secret(bytes) => {
3293 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3294 if !self.secret_auto_encrypt() {
3295 return Err(RedDBError::Query(
3296 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3297 is false. Insert pre-encrypted bytes directly instead."
3298 .to_string(),
3299 ));
3300 }
3301 let key = self.secret_aes_key().ok_or_else(|| {
3302 RedDBError::Query(
3303 "SECRET() column encryption requires a bootstrapped \
3304 vault (red.secret.aes_key is missing). Start the server \
3305 with --vault to enable."
3306 .to_string(),
3307 )
3308 })?;
3309 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3310 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3311 } else {
3312 Ok(Value::Secret(bytes))
3313 }
3314 }
3315 other => Ok(other),
3316 }
3317 }
3318}
3319
3320fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3323 let nonce_bytes = crate::auth::store::random_bytes(12);
3324 let mut nonce = [0u8; 12];
3325 nonce.copy_from_slice(&nonce_bytes[..12]);
3326 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3327 let mut out = Vec::with_capacity(12 + ct.len());
3328 out.extend_from_slice(&nonce);
3329 out.extend_from_slice(&ct);
3330 out
3331}
3332
3333pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3337 if payload.len() < 12 {
3338 return None;
3339 }
3340 let mut nonce = [0u8; 12];
3341 nonce.copy_from_slice(&payload[..12]);
3342 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3343}
3344
3345fn split_insert_metadata(
3346 runtime: &RedDBRuntime,
3347 columns: &[String],
3348 values: &[Value],
3349) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3350 let mut fields = Vec::new();
3351 let mut metadata = Vec::new();
3352
3353 for (column, value) in columns.iter().zip(values.iter()) {
3354 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3356 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3357 let (canonical_key, canonical_value) =
3358 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3359 metadata.push((canonical_key.to_string(), canonical_value));
3360 continue;
3361 }
3362 fields.push((
3363 column.clone(),
3364 runtime.resolve_crypto_sentinel(value.clone())?,
3365 ));
3366 }
3367
3368 Ok((fields, metadata))
3369}
3370
3371fn merge_with_clauses(
3373 metadata: &mut Vec<(String, MetadataValue)>,
3374 ttl_ms: Option<u64>,
3375 expires_at_ms: Option<u64>,
3376 with_metadata: &[(String, Value)],
3377) {
3378 if let Some(ms) = ttl_ms {
3379 metadata.push((
3380 "_ttl_ms".to_string(),
3381 if ms <= i64::MAX as u64 {
3382 MetadataValue::Int(ms as i64)
3383 } else {
3384 MetadataValue::Timestamp(ms)
3385 },
3386 ));
3387 }
3388 if let Some(ms) = expires_at_ms {
3389 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3390 }
3391 for (key, value) in with_metadata {
3392 let meta_value = match value {
3393 Value::Text(s) => MetadataValue::String(s.to_string()),
3394 Value::Integer(n) => MetadataValue::Int(*n),
3395 Value::Float(n) => MetadataValue::Float(*n),
3396 Value::Boolean(b) => MetadataValue::Bool(*b),
3397 _ => MetadataValue::String(value.to_string()),
3398 };
3399 metadata.push((key.clone(), meta_value));
3400 }
3401}
3402
3403fn merge_vector_metadata_column(
3404 metadata: &mut Vec<(String, MetadataValue)>,
3405 columns: &[String],
3406 values: &[Value],
3407) -> RedDBResult<()> {
3408 let Some(value) = columns
3409 .iter()
3410 .position(|column| column.eq_ignore_ascii_case("metadata"))
3411 .map(|index| &values[index])
3412 else {
3413 return Ok(());
3414 };
3415 let json = match value {
3416 Value::Null => return Ok(()),
3417 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3418 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3419 })?,
3420 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3421 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3422 })?,
3423 other => {
3424 return Err(RedDBError::Query(format!(
3425 "column 'metadata' expected JSON object, got {other:?}"
3426 )))
3427 }
3428 };
3429 let parsed = metadata_from_json(&json)?;
3430 for (key, value) in parsed.iter() {
3431 metadata.push((key.clone(), value.clone()));
3432 }
3433 Ok(())
3434}
3435
3436fn apply_collection_default_ttl_metadata(
3437 runtime: &RedDBRuntime,
3438 collection: &str,
3439 metadata: &mut Vec<(String, MetadataValue)>,
3440) {
3441 if has_internal_ttl_metadata(metadata) {
3442 return;
3443 }
3444
3445 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3446 return;
3447 };
3448
3449 metadata.push((
3450 "_ttl_ms".to_string(),
3451 if default_ttl_ms <= i64::MAX as u64 {
3452 MetadataValue::Int(default_ttl_ms as i64)
3453 } else {
3454 MetadataValue::Timestamp(default_ttl_ms)
3455 },
3456 ));
3457}
3458
3459fn ensure_non_tree_reserved_metadata_entries(
3460 metadata: &[(String, MetadataValue)],
3461) -> RedDBResult<()> {
3462 for (key, _) in metadata {
3463 ensure_non_tree_reserved_metadata_key(key)?;
3464 }
3465 Ok(())
3466}
3467
3468fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3469 if key.starts_with(TREE_METADATA_PREFIX) {
3470 return Err(RedDBError::Query(format!(
3471 "metadata key '{}' is reserved for managed trees",
3472 key
3473 )));
3474 }
3475 Ok(())
3476}
3477
3478fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3479 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3480 return Err(RedDBError::Query(format!(
3481 "edge label '{}' is reserved for managed trees",
3482 TREE_CHILD_EDGE_LABEL
3483 )));
3484 }
3485 Ok(())
3486}
3487
3488fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3489 let mut columns = Vec::with_capacity(pairs.len());
3490 let mut values = Vec::with_capacity(pairs.len());
3491
3492 for (column, value) in pairs {
3493 columns.push(column.clone());
3494 values.push(value.clone());
3495 }
3496
3497 (columns, values)
3498}
3499
3500fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3502 for (i, col) in columns.iter().enumerate() {
3503 if col.eq_ignore_ascii_case(name) {
3504 return Ok(values[i].clone());
3505 }
3506 }
3507 Err(RedDBError::Query(format!(
3508 "required column '{name}' not found in INSERT"
3509 )))
3510}
3511
3512fn find_column_value_string(
3514 columns: &[String],
3515 values: &[Value],
3516 name: &str,
3517) -> RedDBResult<String> {
3518 let val = find_column_value(columns, values, name)?;
3519 match val {
3520 Value::Text(s) => Ok(s.to_string()),
3521 Value::Integer(n) => Ok(n.to_string()),
3522 Value::Float(n) => Ok(n.to_string()),
3523 other => Err(RedDBError::Query(format!(
3524 "column '{name}' expected text, got {other:?}"
3525 ))),
3526 }
3527}
3528
3529fn find_document_body_json(
3530 columns: &[String],
3531 values: &[Value],
3532) -> RedDBResult<crate::json::Value> {
3533 let val = find_column_value(columns, values, "body")?;
3534 match val {
3535 Value::Json(bytes) | Value::Blob(bytes) => crate::json::from_slice(&bytes)
3536 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3537 Value::Text(text) => crate::json::from_str(text.as_ref())
3538 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3539 Value::Integer(value) => crate::json::from_str(&value.to_string())
3540 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3541 Value::UnsignedInteger(value) => crate::json::from_str(&value.to_string())
3542 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3543 Value::Float(value) => crate::json::from_str(&value.to_string())
3544 .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3545 other => Err(RedDBError::Query(format!(
3546 "column 'body' expected JSON body, got {other:?}"
3547 ))),
3548 }
3549}
3550
3551fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3552 let val = find_column_value(columns, values, name)?;
3553 match val {
3554 Value::Float(n) => Ok(n),
3555 Value::Integer(n) => Ok(n as f64),
3556 Value::UnsignedInteger(n) => Ok(n as f64),
3557 Value::Text(s) => s
3558 .parse::<f64>()
3559 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3560 other => Err(RedDBError::Query(format!(
3561 "column '{name}' expected number, got {other:?}"
3562 ))),
3563 }
3564}
3565
3566fn find_column_value_opt_string(
3568 columns: &[String],
3569 values: &[Value],
3570 name: &str,
3571) -> Option<String> {
3572 for (i, col) in columns.iter().enumerate() {
3573 if col.eq_ignore_ascii_case(name) {
3574 return match &values[i] {
3575 Value::Null => None,
3576 Value::Text(s) => Some(s.to_string()),
3577 Value::Integer(n) => Some(n.to_string()),
3578 Value::Float(n) => Some(n.to_string()),
3579 _ => None,
3580 };
3581 }
3582 }
3583 None
3584}
3585
3586fn resolve_edge_endpoint(
3593 store: &crate::storage::unified::UnifiedStore,
3594 collection: &str,
3595 columns: &[String],
3596 values: &[Value],
3597 name: &str,
3598) -> RedDBResult<u64> {
3599 let val = find_column_value(columns, values, name)?;
3600 match val {
3601 Value::Integer(n) => Ok(n as u64),
3602 Value::UnsignedInteger(n) => Ok(n),
3603 Value::Text(s) => {
3604 if let Ok(n) = s.parse::<u64>() {
3605 return Ok(n);
3606 }
3607 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3608 match matches.len() {
3609 0 => Err(RedDBError::Query(format!(
3610 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3611 ))),
3612 1 => Ok(matches[0].raw()),
3613 n => Err(RedDBError::Query(format!(
3614 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3615 ))),
3616 }
3617 }
3618 other => Err(RedDBError::Query(format!(
3619 "column '{name}' expected integer or node label, got {other:?}"
3620 ))),
3621 }
3622}
3623
3624fn resolve_edge_endpoint_any(
3625 store: &crate::storage::unified::UnifiedStore,
3626 collection: &str,
3627 columns: &[String],
3628 values: &[Value],
3629 names: &[&str],
3630) -> RedDBResult<u64> {
3631 for name in names {
3632 if columns
3633 .iter()
3634 .any(|column| column.eq_ignore_ascii_case(name))
3635 {
3636 return resolve_edge_endpoint(store, collection, columns, values, name);
3637 }
3638 }
3639
3640 Err(RedDBError::Query(format!(
3641 "required column '{}' not found in INSERT",
3642 names.first().copied().unwrap_or("from_rid")
3643 )))
3644}
3645
3646fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3648 let val = find_column_value(columns, values, name)?;
3649 match val {
3650 Value::Integer(n) => Ok(n as u64),
3651 Value::UnsignedInteger(n) => Ok(n),
3652 Value::Text(s) => s
3653 .parse::<u64>()
3654 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3655 other => Err(RedDBError::Query(format!(
3656 "column '{name}' expected integer, got {other:?}"
3657 ))),
3658 }
3659}
3660
3661fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3663 for (i, col) in columns.iter().enumerate() {
3664 if col.eq_ignore_ascii_case(name) {
3665 return match &values[i] {
3666 Value::Float(n) => Some(*n as f32),
3667 Value::Integer(n) => Some(*n as f32),
3668 Value::Null => None,
3669 _ => None,
3670 };
3671 }
3672 }
3673 None
3674}
3675
3676fn find_column_value_vec_f32(
3678 columns: &[String],
3679 values: &[Value],
3680 name: &str,
3681) -> RedDBResult<Vec<f32>> {
3682 let val = find_column_value(columns, values, name)?;
3683 match val {
3684 Value::Vector(v) => Ok(v),
3685 Value::Json(bytes) => {
3686 let s = std::str::from_utf8(&bytes).map_err(|_| {
3688 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3689 })?;
3690 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3691 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3692 })?;
3693 Ok(arr)
3694 }
3695 other => Err(RedDBError::Query(format!(
3696 "column '{name}' expected vector, got {other:?}"
3697 ))),
3698 }
3699}
3700
3701fn find_column_value_vec_f32_any(
3702 columns: &[String],
3703 values: &[Value],
3704 names: &[&str],
3705) -> RedDBResult<Vec<f32>> {
3706 for name in names {
3707 if columns
3708 .iter()
3709 .any(|column| column.eq_ignore_ascii_case(name))
3710 {
3711 return find_column_value_vec_f32(columns, values, name);
3712 }
3713 }
3714 Err(RedDBError::Query(format!(
3715 "required vector column '{}' not found in INSERT",
3716 names.join("' or '")
3717 )))
3718}
3719
3720fn extract_remaining_properties(
3722 columns: &[String],
3723 values: &[Value],
3724 exclude: &[&str],
3725) -> Vec<(String, Value)> {
3726 columns
3727 .iter()
3728 .zip(values.iter())
3729 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3730 .map(|(col, val)| (col.clone(), val.clone()))
3731 .collect()
3732}
3733
3734fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3735 let mut invalid = Vec::new();
3736 for column in columns {
3737 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3738 invalid.push(column.clone());
3739 }
3740 }
3741
3742 if invalid.is_empty() {
3743 Ok(())
3744 } else {
3745 Err(RedDBError::Query(format!(
3746 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3747 invalid.join(", ")
3748 )))
3749 }
3750}
3751
3752fn is_timeseries_insert_column(column: &str) -> bool {
3753 matches!(
3754 column.to_ascii_lowercase().as_str(),
3755 "metric"
3756 | "value"
3757 | "tags"
3758 | "timestamp"
3759 | "timestamp_ns"
3760 | "time"
3761 | "event_name"
3766 | "payload"
3767 )
3768}
3769
3770fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3771 let mut found = None;
3772
3773 for alias in ["timestamp_ns", "timestamp", "time"] {
3774 for (index, column) in columns.iter().enumerate() {
3775 if !column.eq_ignore_ascii_case(alias) {
3776 continue;
3777 }
3778
3779 if found.is_some() {
3780 return Err(RedDBError::Query(
3781 "timeseries INSERT accepts only one timestamp column".to_string(),
3782 ));
3783 }
3784
3785 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3786 }
3787 }
3788
3789 Ok(found)
3790}
3791
3792fn find_timeseries_tags(
3793 columns: &[String],
3794 values: &[Value],
3795) -> RedDBResult<std::collections::HashMap<String, String>> {
3796 for (index, column) in columns.iter().enumerate() {
3797 if column.eq_ignore_ascii_case("tags") {
3798 return parse_timeseries_tags(&values[index]);
3799 }
3800 }
3801 Ok(std::collections::HashMap::new())
3802}
3803
3804fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3805 match value {
3806 Value::Null => Ok(std::collections::HashMap::new()),
3807 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3808 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3809 other => Err(RedDBError::Query(format!(
3810 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3811 ))),
3812 }
3813}
3814
3815fn parse_timeseries_tags_json(
3816 bytes: &[u8],
3817) -> RedDBResult<std::collections::HashMap<String, String>> {
3818 let json: crate::json::Value = crate::json::from_slice(bytes)
3819 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3820
3821 let object = match json {
3822 crate::json::Value::Object(object) => object,
3823 other => {
3824 return Err(RedDBError::Query(format!(
3825 "timeseries tags must be a JSON object, got {other:?}"
3826 )))
3827 }
3828 };
3829
3830 let mut tags = std::collections::HashMap::with_capacity(object.len());
3831 for (key, value) in object {
3832 tags.insert(key, json_tag_value_to_string(&value));
3833 }
3834 Ok(tags)
3835}
3836
3837fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3853 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3854 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3855 buf.push_str(&value.to_string_compact());
3856 buf
3857}
3858
3859fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3860 match value {
3861 Value::UnsignedInteger(value) => Ok(*value),
3862 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3863 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3864 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3865 RedDBError::Query(format!(
3866 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3867 ))
3868 }),
3869 other => Err(RedDBError::Query(format!(
3870 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3871 ))),
3872 }
3873}
3874
3875fn current_unix_ns() -> u64 {
3876 std::time::SystemTime::now()
3877 .duration_since(std::time::UNIX_EPOCH)
3878 .unwrap_or_default()
3879 .as_nanos()
3880 .min(u128::from(u64::MAX)) as u64
3881}
3882
3883fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3884 use crate::json::{Map, Value as JV};
3885 match value {
3886 MetadataValue::Null => JV::Null,
3887 MetadataValue::Bool(value) => JV::Bool(*value),
3888 MetadataValue::Int(value) => JV::Number(*value as f64),
3889 MetadataValue::Float(value) => JV::Number(*value),
3890 MetadataValue::String(value) => JV::String(value.clone()),
3891 MetadataValue::Bytes(value) => JV::Array(
3892 value
3893 .iter()
3894 .map(|value| JV::Number(*value as f64))
3895 .collect(),
3896 ),
3897 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3898 MetadataValue::Array(values) => {
3899 JV::Array(values.iter().map(metadata_value_to_json).collect())
3900 }
3901 MetadataValue::Object(object) => {
3902 let entries = object
3903 .iter()
3904 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3905 .collect();
3906 JV::Object(entries)
3907 }
3908 MetadataValue::Geo { lat, lon } => {
3909 let mut object = Map::new();
3910 object.insert("lat".to_string(), JV::Number(*lat));
3911 object.insert("lon".to_string(), JV::Number(*lon));
3912 JV::Object(object)
3913 }
3914 MetadataValue::Reference(target) => {
3915 let mut object = Map::new();
3916 object.insert(
3917 "collection".to_string(),
3918 JV::String(target.collection().to_string()),
3919 );
3920 object.insert(
3921 "entity_id".to_string(),
3922 JV::Number(target.entity_id().raw() as f64),
3923 );
3924 JV::Object(object)
3925 }
3926 MetadataValue::References(values) => {
3927 let refs = values
3928 .iter()
3929 .map(|target| {
3930 let mut object = Map::new();
3931 object.insert(
3932 "collection".to_string(),
3933 JV::String(target.collection().to_string()),
3934 );
3935 object.insert(
3936 "entity_id".to_string(),
3937 JV::Number(target.entity_id().raw() as f64),
3938 );
3939 JV::Object(object)
3940 })
3941 .collect();
3942 JV::Array(refs)
3943 }
3944 }
3945}
3946
3947fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3948 match value {
3949 Value::Null => MetadataValue::Null,
3950 Value::Boolean(value) => MetadataValue::Bool(*value),
3951 Value::Integer(value) => MetadataValue::Int(*value),
3952 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3953 Value::Float(value) => MetadataValue::Float(*value),
3954 Value::Text(value) => MetadataValue::String(value.to_string()),
3955 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3956 Value::Timestamp(value) => {
3957 if *value >= 0 {
3958 metadata_u64_to_value(*value as u64)
3959 } else {
3960 MetadataValue::Int(*value)
3961 }
3962 }
3963 Value::TimestampMs(value) => {
3964 if *value >= 0 {
3965 metadata_u64_to_value(*value as u64)
3966 } else {
3967 MetadataValue::Int(*value)
3968 }
3969 }
3970 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3971 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3972 Value::Date(value) => MetadataValue::String(value.to_string()),
3973 Value::Time(value) => MetadataValue::String(value.to_string()),
3974 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3975 Value::Ipv4(value) => MetadataValue::String(format!(
3976 "{}.{}.{}.{}",
3977 (value >> 24) & 0xFF,
3978 (value >> 16) & 0xFF,
3979 (value >> 8) & 0xFF,
3980 value & 0xFF
3981 )),
3982 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3983 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3984 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3985 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3986 lat: *lat as f64 / 1_000_000.0,
3987 lon: *lon as f64 / 1_000_000.0,
3988 },
3989 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3990 Value::TableRef(value) => MetadataValue::String(value.clone()),
3991 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3992 Value::Password(value) => MetadataValue::String(value.clone()),
3993 Value::Array(values) => {
3994 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3995 }
3996 _ => MetadataValue::String(value.to_string()),
3997 }
3998}
3999
4000fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
4001 match value {
4002 Value::Null => Ok(MetadataValue::Null),
4003 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
4004 Value::Integer(_) => Err(RedDBError::Query(format!(
4005 "column '{field}' must be non-negative for TTL metadata"
4006 ))),
4007 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
4008 Value::Float(value) if value.is_finite() => {
4009 if value.fract().abs() >= f64::EPSILON {
4010 return Err(RedDBError::Query(format!(
4011 "column '{field}' must be an integer (TTL metadata must be an integer)"
4012 )));
4013 }
4014 if *value < 0.0 {
4015 return Err(RedDBError::Query(format!(
4016 "column '{field}' must be non-negative for TTL metadata"
4017 )));
4018 }
4019 if *value > u64::MAX as f64 {
4020 return Err(RedDBError::Query(format!(
4021 "column '{field}' value is too large"
4022 )));
4023 }
4024 Ok(metadata_u64_to_value(*value as u64))
4025 }
4026 Value::Float(_) => Err(RedDBError::Query(format!(
4027 "column '{field}' must be a finite number"
4028 ))),
4029 Value::Text(value) => {
4030 let value = value.trim();
4031 if let Ok(value) = value.parse::<u64>() {
4032 Ok(metadata_u64_to_value(value))
4033 } else if let Ok(value) = value.parse::<i64>() {
4034 if value < 0 {
4035 return Err(RedDBError::Query(format!(
4036 "column '{field}' must be non-negative for TTL metadata"
4037 )));
4038 }
4039 Ok(metadata_u64_to_value(value as u64))
4040 } else if let Ok(value) = value.parse::<f64>() {
4041 if !value.is_finite() {
4042 return Err(RedDBError::Query(format!(
4043 "column '{field}' must be a finite number"
4044 )));
4045 }
4046 if value.fract().abs() >= f64::EPSILON {
4047 return Err(RedDBError::Query(format!(
4048 "column '{field}' must be an integer (TTL metadata must be an integer)"
4049 )));
4050 }
4051 if value < 0.0 {
4052 return Err(RedDBError::Query(format!(
4053 "column '{field}' must be non-negative for TTL metadata"
4054 )));
4055 }
4056 if value > u64::MAX as f64 {
4057 return Err(RedDBError::Query(format!(
4058 "column '{field}' value is too large"
4059 )));
4060 }
4061 Ok(metadata_u64_to_value(value as u64))
4062 } else {
4063 Err(RedDBError::Query(format!(
4064 "column '{field}' expects a numeric value for TTL metadata"
4065 )))
4066 }
4067 }
4068 _ => Err(RedDBError::Query(format!(
4069 "column '{field}' expects a numeric value for TTL metadata"
4070 ))),
4071 }
4072}
4073
4074fn metadata_u64_to_value(value: u64) -> MetadataValue {
4075 if value <= i64::MAX as u64 {
4076 MetadataValue::Int(value as i64)
4077 } else {
4078 MetadataValue::Timestamp(value)
4079 }
4080}
4081
4082fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4088 let json = match value {
4089 Value::Null => return false,
4090 Value::Json(bytes) | Value::Blob(bytes) => {
4091 match crate::json::from_slice::<crate::json::Value>(bytes) {
4092 Ok(v) => v,
4093 Err(_) => return false,
4094 }
4095 }
4096 Value::Text(s) => {
4097 let trimmed = s.trim_start();
4098 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4099 return false;
4100 }
4101 match crate::json::from_str::<crate::json::Value>(s) {
4102 Ok(v) => v,
4103 Err(_) => return false,
4104 }
4105 }
4106 _ => return false,
4107 };
4108 let mut cursor = &json;
4109 for seg in tail.split('.') {
4110 match cursor {
4111 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4112 Some((_, v)) => cursor = v,
4113 None => return false,
4114 },
4115 _ => return false,
4116 }
4117 }
4118 !matches!(cursor, crate::json::Value::Null)
4119}
4120
4121fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4132 let mut root = match current {
4133 Value::Null => crate::json::Value::Object(Default::default()),
4134 Value::Json(bytes) | Value::Blob(bytes) => {
4135 crate::json::from_slice(&bytes).map_err(|err| {
4136 RedDBError::Query(format!(
4137 "tenant auto-fill: root column is not valid JSON ({err})"
4138 ))
4139 })?
4140 }
4141 Value::Text(s) => {
4142 if s.trim().is_empty() {
4143 crate::json::Value::Object(Default::default())
4144 } else {
4145 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4146 RedDBError::Query(format!(
4147 "tenant auto-fill: text root is not valid JSON ({err})"
4148 ))
4149 })?
4150 }
4151 }
4152 other => {
4153 return Err(RedDBError::Query(format!(
4154 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4155 )));
4156 }
4157 };
4158
4159 let segments: Vec<&str> = tail.split('.').collect();
4161 let mut cursor: &mut crate::json::Value = &mut root;
4162 for (i, seg) in segments.iter().enumerate() {
4163 let is_last = i + 1 == segments.len();
4164 let map = match cursor {
4165 crate::json::Value::Object(m) => m,
4166 _ => {
4167 return Err(RedDBError::Query(format!(
4168 "tenant auto-fill: segment '{seg}' is not inside an object"
4169 )));
4170 }
4171 };
4172 if is_last {
4173 map.insert(
4174 seg.to_string(),
4175 crate::json::Value::String(tenant_id.to_string()),
4176 );
4177 break;
4178 }
4179 cursor = map
4180 .entry(seg.to_string())
4181 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4182 }
4183
4184 let bytes = crate::json::to_vec(&root).map_err(|err| {
4185 RedDBError::Query(format!(
4186 "tenant auto-fill: failed to re-serialize JSON ({err})"
4187 ))
4188 })?;
4189 Ok(Value::Json(bytes))
4190}
4191
4192#[cfg(test)]
4193mod tests {
4194 use crate::storage::schema::Value;
4195 use crate::storage::wal::{WalReader, WalRecord};
4196 use crate::storage::{DeployProfile, StoragePackaging, StorageProfileSelection};
4197 use crate::{RedDBOptions, RedDBRuntime};
4198 use std::path::Path;
4199
4200 fn persistent_operational_options(path: &Path) -> RedDBOptions {
4201 RedDBOptions::persistent(path)
4202 .with_storage_profile(StorageProfileSelection {
4203 deploy_profile: DeployProfile::Embedded,
4204 packaging: StoragePackaging::OperationalDirectory,
4205 replica_count: 0,
4206 managed_backup: false,
4207 wal_retention: false,
4208 })
4209 .unwrap()
4210 }
4211
4212 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4213 WalReader::open(wal_path)
4214 .expect("wal opens")
4215 .iter()
4216 .map(|record| record.expect("wal record decodes").1)
4217 .filter_map(|record| match record {
4218 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4219 _ => None,
4220 })
4221 .collect()
4222 }
4223
4224 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4225 action
4226 .windows(needle.len())
4227 .any(|window| window == needle.as_bytes())
4228 }
4229
4230 fn assert_statement_writes_collections_in_one_new_wal_batch(
4231 rt: &RedDBRuntime,
4232 wal_path: &Path,
4233 statement: &str,
4234 source: &str,
4235 event_queue: &str,
4236 ) {
4237 let before_batches = store_commit_batches(wal_path).len();
4238
4239 rt.execute_query(statement).unwrap();
4240
4241 let batches = store_commit_batches(wal_path);
4242 let statement_batches = &batches[before_batches..];
4243 let source_batch = statement_batches
4244 .iter()
4245 .position(|actions| {
4246 actions.iter().any(|action| {
4247 action_contains_text(action, source)
4248 && !action_contains_text(action, event_queue)
4249 })
4250 })
4251 .expect("source collection write batch is present");
4252 let event_batch = statement_batches
4253 .iter()
4254 .position(|actions| {
4255 actions
4256 .iter()
4257 .any(|action| action_contains_text(action, event_queue))
4258 })
4259 .expect("event queue write batch is present");
4260
4261 assert_eq!(
4262 source_batch, event_batch,
4263 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4264 );
4265 }
4266
4267 #[test]
4268 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4269 let dir = tempfile::tempdir().unwrap();
4270 let db_path = dir.path().join("events_dual_write.rdb");
4271 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4272 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4273
4274 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4275 .unwrap();
4276 assert_statement_writes_collections_in_one_new_wal_batch(
4277 &rt,
4278 &wal_path,
4279 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4280 "users",
4281 "users_events",
4282 );
4283 }
4284
4285 #[test]
4286 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4287 let dir = tempfile::tempdir().unwrap();
4288 let db_path = dir.path().join("events_update_atomic.rdb");
4289 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4290 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4291
4292 rt.execute_query(
4293 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4294 )
4295 .unwrap();
4296 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4297 .unwrap();
4298
4299 assert_statement_writes_collections_in_one_new_wal_batch(
4300 &rt,
4301 &wal_path,
4302 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4303 "users",
4304 "user_updates",
4305 );
4306 }
4307
4308 #[test]
4309 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4310 let dir = tempfile::tempdir().unwrap();
4311 let db_path = dir.path().join("events_delete_atomic.rdb");
4312 let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4313 let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4314
4315 rt.execute_query(
4316 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4317 )
4318 .unwrap();
4319 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4320 .unwrap();
4321
4322 assert_statement_writes_collections_in_one_new_wal_batch(
4323 &rt,
4324 &wal_path,
4325 "DELETE FROM users WHERE id = 1",
4326 "users",
4327 "user_deletes",
4328 );
4329 }
4330
4331 #[test]
4332 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4333 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4334 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4335 .unwrap();
4336 for id in 0..5 {
4337 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4338 .unwrap();
4339 }
4340 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4341 .unwrap();
4342
4343 let updated = rt
4344 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4345 .unwrap();
4346 assert_eq!(updated.affected_rows, 3);
4347
4348 let selected = rt
4349 .execute_query("SELECT id, score FROM users ORDER BY id")
4350 .unwrap();
4351 let scores: Vec<(i64, i64)> = selected
4352 .result
4353 .records
4354 .iter()
4355 .map(|record| {
4356 let id = match record.get("id").unwrap() {
4357 Value::Integer(value) => *value,
4358 other => panic!("expected integer id, got {other:?}"),
4359 };
4360 let score = match record.get("score").unwrap() {
4361 Value::Integer(value) => *value,
4362 other => panic!("expected integer score, got {other:?}"),
4363 };
4364 (id, score)
4365 })
4366 .collect();
4367 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4368 }
4369
4370 #[test]
4377 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4378 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4379 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4380 .unwrap();
4381 for id in 0..5 {
4382 rt.execute_query(&format!(
4383 "INSERT INTO items (id, score) VALUES ({id}, {})",
4384 id * 10
4385 ))
4386 .unwrap();
4387 }
4388 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4389 .unwrap();
4390
4391 let updated_one = rt
4395 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4396 .unwrap();
4397 assert_eq!(updated_one.affected_rows, 1);
4398
4399 let updated_many = rt
4403 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4404 .unwrap();
4405 assert_eq!(updated_many.affected_rows, 2);
4406
4407 let snapshot = rt
4408 .execute_query("SELECT id, score FROM items ORDER BY id")
4409 .unwrap();
4410 let pairs: Vec<(i64, i64)> = snapshot
4411 .result
4412 .records
4413 .iter()
4414 .map(|record| {
4415 let id = match record.get("id").unwrap() {
4416 Value::Integer(value) => *value,
4417 other => panic!("expected integer id, got {other:?}"),
4418 };
4419 let score = match record.get("score").unwrap() {
4420 Value::Integer(value) => *value,
4421 other => panic!("expected integer score, got {other:?}"),
4422 };
4423 (id, score)
4424 })
4425 .collect();
4426 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4427
4428 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4430 assert_eq!(updated_all.affected_rows, 5);
4431 let after = rt
4432 .execute_query("SELECT score FROM items ORDER BY id")
4433 .unwrap();
4434 let scores: Vec<i64> = after
4435 .result
4436 .records
4437 .iter()
4438 .map(|record| match record.get("score").unwrap() {
4439 Value::Integer(value) => *value,
4440 other => panic!("expected integer score, got {other:?}"),
4441 })
4442 .collect();
4443 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4444 }
4445
4446 #[test]
4452 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4453 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4454 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4455 .unwrap();
4456 for id in 0..5 {
4457 rt.execute_query(&format!(
4458 "INSERT INTO items (id, score) VALUES ({id}, {})",
4459 id * 10
4460 ))
4461 .unwrap();
4462 }
4463 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4464 .unwrap();
4465
4466 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4469 assert_eq!(deleted_one.affected_rows, 1);
4470
4471 let deleted_many = rt
4475 .execute_query("DELETE FROM items WHERE score > 25")
4476 .unwrap();
4477 assert_eq!(deleted_many.affected_rows, 2);
4478
4479 let surviving = rt
4480 .execute_query("SELECT id FROM items ORDER BY id")
4481 .unwrap();
4482 let ids: Vec<i64> = surviving
4483 .result
4484 .records
4485 .iter()
4486 .map(|record| match record.get("id").unwrap() {
4487 Value::Integer(value) => *value,
4488 other => panic!("expected integer id, got {other:?}"),
4489 })
4490 .collect();
4491 assert_eq!(ids, vec![0, 1]);
4492
4493 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4495 assert_eq!(deleted_rest.affected_rows, 2);
4496 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4497 assert!(empty.result.records.is_empty());
4498 }
4499
4500 #[test]
4505 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4506 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4507 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4508 .unwrap();
4509
4510 let inserted = rt
4513 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4514 .unwrap();
4515 assert_eq!(inserted.affected_rows, 1);
4516
4517 let update_err = rt
4519 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4520 .unwrap_err();
4521 let msg = format!("{update_err}");
4522 assert!(
4523 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4524 "expected UPDATE rejection message, got: {msg}"
4525 );
4526
4527 let delete_err = rt
4529 .execute_query("DELETE FROM events WHERE id = 1")
4530 .unwrap_err();
4531 let msg = format!("{delete_err}");
4532 assert!(
4533 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4534 "expected DELETE rejection message, got: {msg}"
4535 );
4536
4537 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4540 assert_eq!(surviving.result.records.len(), 1);
4541 }
4542
4543 #[test]
4547 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4548 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4549 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4550 .unwrap();
4551
4552 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4553 .unwrap();
4554 let updated = rt
4555 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4556 .unwrap();
4557 assert_eq!(updated.affected_rows, 1);
4558 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4559 assert_eq!(deleted.affected_rows, 1);
4560 }
4561
4562 #[test]
4563 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4564 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4565 rt.execute_query(
4566 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4567 )
4568 .unwrap();
4569
4570 let inserted = rt
4571 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4572 .unwrap();
4573 assert_eq!(inserted.affected_rows, 1);
4574
4575 let events = queue_payloads(&rt, "audit_log");
4576 assert_eq!(events.len(), 1);
4577 let event = events[0].as_object().expect("event payload object");
4578 assert!(event
4579 .get("event_id")
4580 .and_then(crate::json::Value::as_str)
4581 .is_some_and(|value| !value.is_empty()));
4582 assert_eq!(
4583 event.get("op").and_then(crate::json::Value::as_str),
4584 Some("insert")
4585 );
4586 assert_eq!(
4587 event.get("collection").and_then(crate::json::Value::as_str),
4588 Some("users")
4589 );
4590 assert_eq!(
4591 event.get("id").and_then(crate::json::Value::as_u64),
4592 Some(7)
4593 );
4594 assert!(event
4595 .get("ts")
4596 .and_then(crate::json::Value::as_u64)
4597 .is_some());
4598 assert!(event
4599 .get("lsn")
4600 .and_then(crate::json::Value::as_u64)
4601 .is_some());
4602 assert!(matches!(
4603 event.get("tenant"),
4604 Some(crate::json::Value::Null)
4605 ));
4606 assert!(matches!(
4607 event.get("before"),
4608 Some(crate::json::Value::Null)
4609 ));
4610 let after = event
4611 .get("after")
4612 .and_then(crate::json::Value::as_object)
4613 .expect("after object");
4614 assert_eq!(
4615 after.get("id").and_then(crate::json::Value::as_u64),
4616 Some(7)
4617 );
4618 assert_eq!(
4619 after.get("email").and_then(crate::json::Value::as_str),
4620 Some("a@example.com")
4621 );
4622 }
4623
4624 #[test]
4625 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4626 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4627 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4628 .unwrap();
4629
4630 rt.execute_query(
4631 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4632 )
4633 .unwrap();
4634
4635 let events = queue_payloads(&rt, "users_events");
4636 assert_eq!(events.len(), 2);
4637 let mut previous_lsn = 0;
4638 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4639 let object = event.as_object().expect("event payload object");
4640 assert_eq!(
4641 object.get("op").and_then(crate::json::Value::as_str),
4642 Some("insert")
4643 );
4644 assert_eq!(
4645 object.get("id").and_then(crate::json::Value::as_u64),
4646 Some(expected_id)
4647 );
4648 let lsn = object
4649 .get("lsn")
4650 .and_then(crate::json::Value::as_u64)
4651 .expect("event lsn");
4652 assert!(
4653 lsn > previous_lsn,
4654 "event LSNs should increase in row order"
4655 );
4656 previous_lsn = lsn;
4657 let after = object
4658 .get("after")
4659 .and_then(crate::json::Value::as_object)
4660 .expect("after object");
4661 assert_eq!(
4662 after.get("id").and_then(crate::json::Value::as_u64),
4663 Some(expected_id)
4664 );
4665 }
4666 }
4667
4668 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4669 let result = rt
4670 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4671 .expect("peek queue");
4672 result
4673 .result
4674 .records
4675 .iter()
4676 .map(
4677 |record| match record.get("payload").expect("payload column") {
4678 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4679 other => panic!("expected JSON queue payload, got {other:?}"),
4680 },
4681 )
4682 .collect()
4683 }
4684
4685 #[test]
4697 fn auto_index_id_fires_on_first_insert() {
4698 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4699 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4700 .unwrap();
4701
4702 assert!(
4704 rt.index_store_ref()
4705 .find_index_for_column("bench_users", "id")
4706 .is_none(),
4707 "freshly created collection should not have an `id` index"
4708 );
4709
4710 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4712 .unwrap();
4713
4714 let registered = rt
4716 .index_store_ref()
4717 .find_index_for_column("bench_users", "id")
4718 .expect("auto-index hook should have registered idx_id on first insert");
4719 assert_eq!(registered.name, "idx_id");
4720 assert_eq!(registered.collection, "bench_users");
4721 assert_eq!(registered.columns, vec!["id".to_string()]);
4722 assert!(matches!(
4723 registered.method,
4724 super::super::index_store::IndexMethodKind::Hash
4725 ));
4726
4727 for id in 2..=5 {
4730 rt.execute_query(&format!(
4731 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4732 id * 10
4733 ))
4734 .unwrap();
4735 }
4736 for id in 1..=5 {
4737 let result = rt
4738 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4739 .unwrap();
4740 assert_eq!(
4741 result.result.records.len(),
4742 1,
4743 "id={id} should match one row"
4744 );
4745 }
4746
4747 let deleted = rt
4752 .execute_query("DELETE FROM bench_users WHERE id = 3")
4753 .unwrap();
4754 assert_eq!(deleted.affected_rows, 1);
4755 }
4756
4757 #[test]
4762 fn auto_index_id_fires_on_first_bulk_insert() {
4763 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4764 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4765 .unwrap();
4766
4767 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4768 .unwrap();
4769
4770 let registered = rt
4771 .index_store_ref()
4772 .find_index_for_column("bench_bulk", "id")
4773 .expect("auto-index hook should fire on first bulk insert");
4774 assert_eq!(registered.name, "idx_id");
4775
4776 for id in 1..=3 {
4778 let result = rt
4779 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4780 .unwrap();
4781 assert_eq!(result.result.records.len(), 1);
4782 }
4783 }
4784
4785 #[test]
4789 fn auto_index_id_skips_when_no_id_column() {
4790 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4791 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4792 .unwrap();
4793 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4794 .unwrap();
4795
4796 assert!(rt
4797 .index_store_ref()
4798 .find_index_for_column("plain", "id")
4799 .is_none());
4800 assert!(rt
4801 .index_store_ref()
4802 .find_index_for_column("plain", "uid")
4803 .is_none());
4804 }
4805
4806 #[test]
4811 fn auto_index_id_skips_when_index_already_exists() {
4812 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4813 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4814 .unwrap();
4815 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4817 .unwrap();
4818 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4819 .unwrap();
4820
4821 let registered = rt
4822 .index_store_ref()
4823 .find_index_for_column("pre", "id")
4824 .expect("user index should still be there");
4825 assert_eq!(
4826 registered.name, "user_idx",
4827 "auto-index hook must not overwrite an existing index"
4828 );
4829 }
4830
4831 #[test]
4835 fn auto_index_id_dropped_with_collection() {
4836 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4837 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4838 .unwrap();
4839 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4840 .unwrap();
4841 assert!(rt
4842 .index_store_ref()
4843 .find_index_for_column("ephemeral", "id")
4844 .is_some());
4845
4846 rt.execute_query("DROP TABLE ephemeral").unwrap();
4847
4848 assert!(
4849 rt.index_store_ref()
4850 .find_index_for_column("ephemeral", "id")
4851 .is_none(),
4852 "implicit `idx_id` must be reaped when its collection drops"
4853 );
4854 }
4855
4856 #[test]
4861 fn auto_index_id_disabled_by_config() {
4862 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4863 let rt = RedDBRuntime::with_options(opts).unwrap();
4864
4865 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4866 .unwrap();
4867 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4868 .unwrap();
4869
4870 assert!(
4871 rt.index_store_ref()
4872 .find_index_for_column("off", "id")
4873 .is_none(),
4874 "with auto_index_id=false, no implicit index should be created"
4875 );
4876 }
4877
4878 #[test]
4881 fn update_single_row_emits_update_event() {
4882 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4883 rt.execute_query(
4884 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4885 )
4886 .unwrap();
4887 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4888 .unwrap();
4889
4890 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4891 .unwrap();
4892
4893 let events = queue_payloads(&rt, "audit_log");
4894 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4895 let event = events[0].as_object().expect("event payload object");
4896 assert_eq!(
4897 event.get("op").and_then(crate::json::Value::as_str),
4898 Some("update")
4899 );
4900 assert_eq!(
4901 event.get("collection").and_then(crate::json::Value::as_str),
4902 Some("users")
4903 );
4904 assert!(event
4905 .get("event_id")
4906 .and_then(crate::json::Value::as_str)
4907 .is_some_and(|v| !v.is_empty()));
4908 let before = event
4909 .get("before")
4910 .and_then(crate::json::Value::as_object)
4911 .expect("before must be an object");
4912 let after = event
4913 .get("after")
4914 .and_then(crate::json::Value::as_object)
4915 .expect("after must be an object");
4916 assert_eq!(
4917 before.get("name").and_then(crate::json::Value::as_str),
4918 Some("Alice"),
4919 "before.name should be the old value"
4920 );
4921 assert_eq!(
4922 after.get("name").and_then(crate::json::Value::as_str),
4923 Some("Bob"),
4924 "after.name should be the new value"
4925 );
4926 }
4927
4928 #[test]
4929 fn update_event_only_includes_changed_fields() {
4930 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4931 rt.execute_query(
4932 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4933 )
4934 .unwrap();
4935 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4936 .unwrap();
4937
4938 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4939 .unwrap();
4940
4941 let events = queue_payloads(&rt, "evts");
4942 assert_eq!(events.len(), 1);
4943 let event = events[0].as_object().unwrap();
4944 let before = event
4945 .get("before")
4946 .and_then(crate::json::Value::as_object)
4947 .unwrap();
4948 let after = event
4949 .get("after")
4950 .and_then(crate::json::Value::as_object)
4951 .unwrap();
4952 assert!(
4954 before.contains_key("name"),
4955 "before must include changed field"
4956 );
4957 assert!(
4958 after.contains_key("name"),
4959 "after must include changed field"
4960 );
4961 assert!(
4963 !before.contains_key("email"),
4964 "before must not include unchanged email"
4965 );
4966 assert!(
4967 !after.contains_key("email"),
4968 "after must not include unchanged email"
4969 );
4970 }
4971
4972 #[test]
4973 fn multi_row_update_emits_one_event_per_row() {
4974 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4975 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4976 .unwrap();
4977 rt.execute_query(
4978 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4979 )
4980 .unwrap();
4981
4982 rt.execute_query("UPDATE items SET status = 'done'")
4983 .unwrap();
4984
4985 let events = queue_payloads(&rt, "evts");
4986 assert_eq!(events.len(), 3, "expected one update event per row");
4987 for event in &events {
4988 let obj = event.as_object().unwrap();
4989 assert_eq!(
4990 obj.get("op").and_then(crate::json::Value::as_str),
4991 Some("update")
4992 );
4993 }
4994 }
4995
4996 #[test]
4997 fn delete_single_row_emits_delete_event() {
4998 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4999 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
5000 .unwrap();
5001 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
5002 .unwrap();
5003
5004 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
5005
5006 let events = queue_payloads(&rt, "del_log");
5007 assert_eq!(events.len(), 1);
5008 let event = events[0].as_object().expect("event payload object");
5009 assert_eq!(
5010 event.get("op").and_then(crate::json::Value::as_str),
5011 Some("delete")
5012 );
5013 assert_eq!(
5014 event.get("collection").and_then(crate::json::Value::as_str),
5015 Some("users")
5016 );
5017 assert!(event
5018 .get("event_id")
5019 .and_then(crate::json::Value::as_str)
5020 .is_some_and(|v| !v.is_empty()));
5021 let before = event
5022 .get("before")
5023 .and_then(crate::json::Value::as_object)
5024 .expect("before must be an object for delete");
5025 assert_eq!(
5026 before.get("id").and_then(crate::json::Value::as_u64),
5027 Some(42)
5028 );
5029 assert_eq!(
5030 before.get("name").and_then(crate::json::Value::as_str),
5031 Some("Alice")
5032 );
5033 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5034 }
5035
5036 #[test]
5037 fn multi_row_delete_emits_one_event_per_row() {
5038 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5039 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5040 .unwrap();
5041 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5042 .unwrap();
5043
5044 rt.execute_query("DELETE FROM items").unwrap();
5045
5046 let events = queue_payloads(&rt, "del_log");
5047 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5048 for event in &events {
5049 let obj = event.as_object().unwrap();
5050 assert_eq!(
5051 obj.get("op").and_then(crate::json::Value::as_str),
5052 Some("delete")
5053 );
5054 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5055 }
5056 }
5057
5058 #[test]
5059 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5060 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5061 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5062 .unwrap();
5063
5064 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5065 .unwrap();
5066 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5067
5068 let events = queue_payloads(&rt, "evts");
5069 assert!(
5070 events.is_empty(),
5071 "UPDATE-only filter must not emit INSERT or DELETE events"
5072 );
5073 }
5074
5075 #[test]
5078 fn suppress_events_on_insert_emits_no_events() {
5079 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5080 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5081 .unwrap();
5082
5083 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5084 .unwrap();
5085
5086 let events = queue_payloads(&rt, "evts");
5087 assert!(
5088 events.is_empty(),
5089 "SUPPRESS EVENTS must prevent INSERT events"
5090 );
5091 }
5092
5093 #[test]
5094 fn suppress_events_on_update_emits_no_events() {
5095 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5096 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5097 .unwrap();
5098 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5099 .unwrap();
5100 let _ = queue_payloads(&rt, "evts");
5102 rt.execute_query("QUEUE PURGE evts").unwrap();
5104
5105 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5106 .unwrap();
5107
5108 let events = queue_payloads(&rt, "evts");
5109 assert!(
5110 events.is_empty(),
5111 "SUPPRESS EVENTS must prevent UPDATE events"
5112 );
5113 }
5114
5115 #[test]
5116 fn suppress_events_on_delete_emits_no_events() {
5117 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5118 rt.execute_query(
5119 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5120 )
5121 .unwrap();
5122 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5123 .unwrap();
5124
5125 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5126 .unwrap();
5127
5128 let events = queue_payloads(&rt, "evts");
5129 assert!(
5130 events.is_empty(),
5131 "SUPPRESS EVENTS must prevent DELETE events"
5132 );
5133 }
5134
5135 #[test]
5136 fn normal_insert_after_suppress_still_emits() {
5137 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5138 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5139 .unwrap();
5140
5141 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5142 .unwrap();
5143 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5144 .unwrap();
5145
5146 let events = queue_payloads(&rt, "evts");
5147 assert_eq!(
5148 events.len(),
5149 1,
5150 "only the non-suppressed INSERT should emit"
5151 );
5152 assert_eq!(
5153 events[0].get("id").and_then(crate::json::Value::as_u64),
5154 Some(2)
5155 );
5156 }
5157}