1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28 sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&*store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&*store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome =
100 crate::runtime::blockchain_kind::verify_chain_outcome(&*store, collection)?;
101 if !outcome.ok {
102 crate::runtime::blockchain_kind::persist_integrity_flag(&*store, collection, true);
103 self.inner
104 .chain_integrity_broken
105 .lock()
106 .insert(collection.to_string(), true);
107 }
108 Some(outcome)
109 }
110
111 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
115 let store = self.inner.db.store();
116 if !crate::runtime::blockchain_kind::is_chain(&*store, collection) {
117 return false;
118 }
119 crate::runtime::blockchain_kind::persist_integrity_flag(&*store, collection, false);
120 self.inner
121 .chain_integrity_broken
122 .lock()
123 .insert(collection.to_string(), false);
124 true
125 }
126
127 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
131 {
132 let cache = self.inner.chain_integrity_broken.lock();
133 if let Some(v) = cache.get(collection) {
134 return *v;
135 }
136 }
137 let store = self.inner.db.store();
138 let persisted =
139 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&*store, collection)
140 .unwrap_or(false);
141 self.inner
142 .chain_integrity_broken
143 .lock()
144 .insert(collection.to_string(), persisted);
145 persisted
146 }
147
148 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
162 let Some(tenant_col) = self.tenant_column(&query.table) else {
163 return Ok(None);
164 };
165 if query
167 .columns
168 .iter()
169 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
170 {
171 return Ok(None);
172 }
173
174 if let Some(dot_pos) = tenant_col.find('.') {
180 let (root, tail) = tenant_col.split_at(dot_pos);
181 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
183 }
184
185 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
186 return Err(RedDBError::Query(format!(
187 "INSERT into tenant-scoped table '{}' requires an active tenant — \
188 run SET TENANT '<id>' first or name column '{}' explicitly",
189 query.table, tenant_col
190 )));
191 };
192
193 let mut augmented = query.clone();
194 augmented.columns.push(tenant_col);
195 let lit = Value::text(tenant_id.clone());
196 for row in augmented.values.iter_mut() {
197 row.push(lit.clone());
198 }
199 for row in augmented.value_exprs.iter_mut() {
200 row.push(crate::storage::query::ast::Expr::Literal {
201 value: lit.clone(),
202 span: crate::storage::query::ast::Span::synthetic(),
203 });
204 }
205 Ok(Some(augmented))
206 }
207
208 fn inject_dotted_tenant(
218 &self,
219 query: &InsertQuery,
220 root: &str,
221 tail: &str,
222 ) -> RedDBResult<Option<InsertQuery>> {
223 let active_tenant = crate::runtime::impl_core::current_tenant();
224 let mut augmented = query.clone();
225 let root_idx = augmented
226 .columns
227 .iter()
228 .position(|c| c.eq_ignore_ascii_case(root));
229
230 if let Some(idx) = root_idx {
231 for row in augmented.values.iter_mut() {
237 let Some(slot) = row.get_mut(idx) else {
238 continue;
239 };
240 if dotted_tail_already_set(slot, tail) {
241 continue;
242 }
243 let Some(tenant_id) = &active_tenant else {
244 return Err(RedDBError::Query(format!(
245 "INSERT into tenant-scoped table '{}' requires an active tenant — \
246 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
247 query.table, root, tail
248 )));
249 };
250 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
251 }
252 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
256 if let Some(slot) = row.get_mut(idx) {
257 let new_value = augmented
258 .values
259 .get(row_idx)
260 .and_then(|v| v.get(idx))
261 .cloned()
262 .unwrap_or(Value::Null);
263 *slot = crate::storage::query::ast::Expr::Literal {
264 value: new_value,
265 span: crate::storage::query::ast::Span::synthetic(),
266 };
267 }
268 }
269 } else {
270 let Some(tenant_id) = &active_tenant else {
274 return Err(RedDBError::Query(format!(
275 "INSERT into tenant-scoped table '{}' requires an active tenant — \
276 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
277 query.table, root, tail
278 )));
279 };
280 augmented.columns.push(root.to_string());
282 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
283 for row in augmented.values.iter_mut() {
284 row.push(fresh.clone());
285 }
286 for row in augmented.value_exprs.iter_mut() {
287 row.push(crate::storage::query::ast::Expr::Literal {
288 value: fresh.clone(),
289 span: crate::storage::query::ast::Span::synthetic(),
290 });
291 }
292 }
293
294 Ok(Some(augmented))
295 }
296
297 fn delete_entities_batch(
300 &self,
301 collection: &str,
302 ids: &[EntityId],
303 ) -> RedDBResult<(u64, Vec<u64>)> {
304 if ids.is_empty() {
305 return Ok((0, vec![]));
306 }
307
308 let store = self.db().store();
309 let Some(manager) = store.get_collection(collection) else {
310 return Ok((0, vec![]));
311 };
312
313 let active_xid = self.current_xid();
314 let conn_id = crate::runtime::impl_core::current_connection_id();
315 let mut autocommit_xid = None;
316 let mut tombstoned_ids = Vec::new();
317 let mut tombstoned_entities = Vec::new();
318 let mut physical_delete_ids = Vec::new();
319 let table_row_resolver =
320 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
321
322 for &id in ids {
323 let Some(mut entity) = manager.get(id) else {
324 continue;
325 };
326 if matches!(entity.data, EntityData::Row(_)) {
327 let previous_xmax = entity.xmax;
328 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
329 if table_row_resolver.resolve_candidate(&entity).is_none() {
330 continue;
331 }
332 } else if entity.xmax != 0 {
333 continue;
334 }
335
336 let xid = match active_xid {
337 Some(xid) => xid,
338 None => match autocommit_xid {
339 Some(xid) => xid,
340 None => {
341 let mgr = self.snapshot_manager();
342 let xid = mgr.begin();
343 autocommit_xid = Some(xid);
344 xid
345 }
346 },
347 };
348 entity.set_xmax(xid);
349 if manager.update(entity.clone()).is_ok() {
350 if active_xid.is_some() {
351 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
352 }
353 tombstoned_entities.push(entity);
354 tombstoned_ids.push(id);
355 }
356 } else {
357 physical_delete_ids.push(id);
358 }
359 }
360
361 if let Some(xid) = autocommit_xid {
362 self.snapshot_manager().commit(xid);
363 }
364
365 let mut affected = tombstoned_ids.len() as u64;
366 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
367 if active_xid.is_some() {
368 store
369 .persist_entities_to_pager(collection, &tombstoned_entities)
370 .map_err(|err| RedDBError::Internal(err.to_string()))?;
371 } else {
372 store
373 .persist_entities_to_pager(collection, &tombstoned_entities)
374 .map_err(|err| RedDBError::Internal(err.to_string()))?;
375 for id in &tombstoned_ids {
376 store.context_index().remove_entity(*id);
377 let lsn = self.cdc_emit(
378 crate::replication::cdc::ChangeOperation::Delete,
379 collection,
380 id.raw(),
381 "entity",
382 );
383 lsns.push(lsn);
384 }
385 }
386
387 let deleted_ids = store
388 .delete_batch(collection, &physical_delete_ids)
389 .map_err(|err| RedDBError::Internal(err.to_string()))?;
390 affected += deleted_ids.len() as u64;
391 for id in &deleted_ids {
392 store.context_index().remove_entity(*id);
393 let lsn = self.cdc_emit(
394 crate::replication::cdc::ChangeOperation::Delete,
395 collection,
396 id.raw(),
397 "entity",
398 );
399 lsns.push(lsn);
400 }
401
402 Ok((affected, lsns))
403 }
404
405 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
408 if applied.is_empty() {
409 return Ok(Vec::new());
410 }
411
412 let store = self.db().store();
413 if applied.iter().any(|item| item.context_index_dirty) {
414 store.context_index().index_entities(
415 &applied[0].collection,
416 applied
417 .iter()
418 .filter(|item| item.context_index_dirty)
419 .map(|item| &item.entity),
420 );
421 }
422
423 for item in applied {
424 self.refresh_update_secondary_indexes(item)?;
425 }
426
427 let mut lsns = Vec::with_capacity(applied.len());
428 for item in applied {
429 let lsn = self.cdc_emit_prebuilt(
430 crate::replication::cdc::ChangeOperation::Update,
431 &item.collection,
432 &item.entity,
433 update_cdc_item_kind(self, &item.collection, &item.entity),
434 item.metadata.as_ref(),
435 false,
436 );
437 lsns.push(lsn);
438 }
439 Ok(lsns)
440 }
441
442 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
443 self.persist_applied_entity_mutations(applied)
444 }
445
446 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
447 if applied.pre_mutation_fields.is_empty() {
448 return Ok(());
449 }
450 let post = entity_row_fields_snapshot(&applied.entity);
451 if post.is_empty() {
452 return Ok(());
453 }
454
455 let indexed_cols = self
456 .index_store_ref()
457 .indexed_columns_set(&applied.collection);
458 if indexed_cols.is_empty() {
459 return Ok(());
460 }
461
462 if let Some(old_version) = applied.replaced_entity.as_ref() {
463 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
464 .pre_mutation_fields
465 .iter()
466 .filter(|(col, _)| indexed_cols.contains(col))
467 .cloned()
468 .collect();
469 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
470 .iter()
471 .filter(|(col, _)| indexed_cols.contains(col))
472 .cloned()
473 .collect();
474 if !old_index_fields.is_empty() {
475 self.index_store_ref()
476 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
477 .map_err(crate::RedDBError::Internal)?;
478 }
479 if !new_index_fields.is_empty() {
480 self.index_store_ref()
481 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
482 .map_err(crate::RedDBError::Internal)?;
483 }
484 return Ok(());
485 }
486
487 let damage =
488 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
489 if damage
490 .touched_columns()
491 .into_iter()
492 .any(|col| indexed_cols.contains(col))
493 {
494 self.index_store_ref()
495 .index_entity_update(
496 &applied.collection,
497 applied.id,
498 &applied.pre_mutation_fields,
499 &post,
500 )
501 .map_err(crate::RedDBError::Internal)?;
502 }
503 Ok(())
504 }
505
506 pub fn execute_insert(
511 &self,
512 raw_query: &str,
513 query: &InsertQuery,
514 ) -> RedDBResult<RuntimeQueryResult> {
515 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
516 crate::runtime::collection_contract::CollectionContractGate::check(
522 self,
523 &query.table,
524 crate::runtime::collection_contract::MutationKind::Insert,
525 )?;
526 let augmented_owned;
535 let query = match self.maybe_inject_tenant_column(query)? {
536 Some(new_q) => {
537 augmented_owned = new_q;
538 &augmented_owned
539 }
540 None => query,
541 };
542 self.check_insert_column_policy(query)?;
543
544 let mut inserted_count: u64 = 0;
545 let effective_rows =
546 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
547
548 let store = self.inner.db.store();
550 let _ = store.get_or_create_collection(&query.table);
551 let declared_model = self
552 .db()
553 .collection_contract_arc(&query.table)
554 .map(|contract| contract.declared_model);
555
556 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
557 if query.returning.is_some() {
558 Some(Vec::with_capacity(effective_rows.len()))
559 } else {
560 None
561 };
562 let mut returning_result: Option<UnifiedResult> = None;
563
564 if matches!(query.entity_type, InsertEntityType::Row)
565 && !matches!(
566 declared_model,
567 Some(crate::catalog::CollectionModel::TimeSeries)
568 )
569 {
570 let chain_mode = crate::runtime::blockchain_kind::is_chain(&*store, &query.table);
581 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
582 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
583 } else {
584 None
585 };
586 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
587
588 if chain_mode && self.is_chain_integrity_broken(&query.table) {
591 return Err(RedDBError::InvalidOperation(format!(
592 "ChainIntegrityBroken: collection '{}' is locked until \
593 POST /collections/{}/clear-integrity-flag is called by an admin",
594 query.table, query.table
595 )));
596 }
597
598 let mut chain_tip_full: Option<
602 crate::runtime::blockchain_kind::ChainTipFull,
603 > = if chain_mode {
604 let mut cache = self.inner.chain_tip_cache.lock();
605 if let Some(existing) = cache.get(&query.table) {
606 Some(existing.clone())
607 } else if let Some(scanned) =
608 crate::runtime::blockchain_kind::chain_tip_full(&*store, &query.table)
609 {
610 cache.insert(query.table.clone(), scanned.clone());
611 Some(scanned)
612 } else {
613 None
614 }
615 } else {
616 None
617 };
618
619 let mut rows = Vec::with_capacity(effective_rows.len());
620 for row_values in &effective_rows {
621 if row_values.len() != query.columns.len() {
622 return Err(RedDBError::Query(format!(
623 "INSERT column count ({}) does not match value count ({})",
624 query.columns.len(),
625 row_values.len()
626 )));
627 }
628 let (mut fields, mut metadata) =
629 split_insert_metadata(self, &query.columns, row_values)?;
630 if chain_mode {
631 use crate::runtime::blockchain_kind::{
632 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
633 COL_TIMESTAMP, RESERVED_COLUMNS,
634 };
635 let supplied_height = fields
636 .iter()
637 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
638 .map(|(_, v)| v.clone());
639 let supplied_prev = fields
640 .iter()
641 .find(|(k, _)| k == COL_PREV_HASH)
642 .map(|(_, v)| v.clone());
643 let supplied_ts = fields
644 .iter()
645 .find(|(k, _)| k == COL_TIMESTAMP)
646 .map(|(_, v)| v.clone());
647 let supplied_hash =
648 fields.iter().any(|(k, _)| k == COL_HASH);
649 let user_supplied_any = supplied_height.is_some()
650 || supplied_prev.is_some()
651 || supplied_ts.is_some()
652 || supplied_hash;
653
654 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
655 let payload =
656 crate::runtime::blockchain_kind::canonical_payload(&fields);
657
658 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
659 Some(t) => (t.hash, t.height + 1),
660 None => (
661 crate::storage::blockchain::GENESIS_PREV_HASH,
662 0u64,
663 ),
664 };
665 let server_now = crate::runtime::blockchain_kind::now_ms();
666
667 let (use_prev, use_height, use_ts) = if user_supplied_any {
668 if supplied_hash {
671 return Err(chain_conflict_error(
672 tip_next_height.saturating_sub(1),
673 tip_prev_hash,
674 chain_tip_full
675 .as_ref()
676 .map(|t| t.timestamp_ms)
677 .unwrap_or(0),
678 server_now,
679 "hash column is engine-computed and cannot be supplied",
680 ));
681 }
682 let caller_prev = match &supplied_prev {
683 Some(Value::Blob(b)) if b.len() == 32 => {
684 let mut a = [0u8; 32];
685 a.copy_from_slice(b);
686 a
687 }
688 Some(Value::Text(s)) if s.len() == 64 => {
689 let mut a = [0u8; 32];
693 let mut ok = true;
694 for i in 0..32 {
695 let pair = &s.as_ref()[i * 2..i * 2 + 2];
696 match u8::from_str_radix(pair, 16) {
697 Ok(byte) => a[i] = byte,
698 Err(_) => {
699 ok = false;
700 break;
701 }
702 }
703 }
704 if !ok {
705 return Err(chain_conflict_error(
706 tip_next_height.saturating_sub(1),
707 tip_prev_hash,
708 chain_tip_full
709 .as_ref()
710 .map(|t| t.timestamp_ms)
711 .unwrap_or(0),
712 server_now,
713 "prev_hash is not valid hex",
714 ));
715 }
716 a
717 }
718 _ => {
719 return Err(chain_conflict_error(
720 tip_next_height.saturating_sub(1),
721 tip_prev_hash,
722 chain_tip_full
723 .as_ref()
724 .map(|t| t.timestamp_ms)
725 .unwrap_or(0),
726 server_now,
727 "prev_hash missing or not a 32-byte Blob",
728 ));
729 }
730 };
731 if caller_prev != tip_prev_hash {
732 return Err(chain_conflict_error(
733 tip_next_height.saturating_sub(1),
734 tip_prev_hash,
735 chain_tip_full
736 .as_ref()
737 .map(|t| t.timestamp_ms)
738 .unwrap_or(0),
739 server_now,
740 "prev_hash does not match current tip",
741 ));
742 }
743 let caller_height = match &supplied_height {
744 Some(Value::UnsignedInteger(v)) => *v,
745 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
746 _ => {
747 return Err(chain_conflict_error(
748 tip_next_height.saturating_sub(1),
749 tip_prev_hash,
750 chain_tip_full
751 .as_ref()
752 .map(|t| t.timestamp_ms)
753 .unwrap_or(0),
754 server_now,
755 "block_height missing or not an unsigned integer",
756 ));
757 }
758 };
759 if caller_height != tip_next_height {
760 return Err(chain_conflict_error(
761 tip_next_height.saturating_sub(1),
762 tip_prev_hash,
763 chain_tip_full
764 .as_ref()
765 .map(|t| t.timestamp_ms)
766 .unwrap_or(0),
767 server_now,
768 "block_height does not match tip+1",
769 ));
770 }
771 let caller_ts = match &supplied_ts {
772 Some(Value::UnsignedInteger(v)) => *v,
773 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
774 _ => {
775 return Err(chain_conflict_error(
776 tip_next_height.saturating_sub(1),
777 tip_prev_hash,
778 chain_tip_full
779 .as_ref()
780 .map(|t| t.timestamp_ms)
781 .unwrap_or(0),
782 server_now,
783 "timestamp missing or not an unsigned integer",
784 ));
785 }
786 };
787 let drift = (caller_ts as i128) - (server_now as i128);
788 if drift.abs() > 60_000 {
789 return Err(chain_conflict_error(
790 tip_next_height.saturating_sub(1),
791 tip_prev_hash,
792 chain_tip_full
793 .as_ref()
794 .map(|t| t.timestamp_ms)
795 .unwrap_or(0),
796 server_now,
797 "timestamp outside ±60s of server_time",
798 ));
799 }
800 (caller_prev, caller_height, caller_ts)
801 } else {
802 (tip_prev_hash, tip_next_height, server_now)
803 };
804
805 let (reserved, new_hash) =
806 crate::runtime::blockchain_kind::make_block_reserved_fields(
807 use_prev, use_height, use_ts, &payload,
808 );
809 fields.extend(reserved);
810 chain_tip_full =
811 Some(crate::runtime::blockchain_kind::ChainTipFull {
812 height: use_height,
813 hash: new_hash,
814 timestamp_ms: use_ts,
815 });
816 }
817 if crate::runtime::signed_writes_kind::is_signed(&*store, &query.table)
829 {
830 let (pk_col, sig_col, residual) =
831 crate::runtime::signed_writes_kind::split_signature_fields(fields);
832 let payload =
833 crate::runtime::blockchain_kind::canonical_payload(&residual);
834 let reg = crate::runtime::signed_writes_kind::registry(
835 &*store,
836 &query.table,
837 );
838 crate::runtime::signed_writes_kind::verify_row(
839 ®,
840 pk_col.as_ref().map(|c| c.bytes.as_slice()),
841 sig_col.as_ref().map(|c| c.bytes.as_slice()),
842 &payload,
843 )
844 .map_err(crate::runtime::signed_writes_kind::map_error)?;
845 fields = residual;
846 if let Some(col) = pk_col {
851 fields.push((
852 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL
853 .to_string(),
854 col.raw_value,
855 ));
856 }
857 if let Some(col) = sig_col {
858 fields.push((
859 crate::storage::signed_writes::RESERVED_SIGNATURE_COL
860 .to_string(),
861 col.raw_value,
862 ));
863 }
864 }
865 merge_with_clauses(
866 &mut metadata,
867 query.ttl_ms,
868 query.expires_at_ms,
869 &query.with_metadata,
870 );
871 if let Some(snaps) = returning_snapshots.as_mut() {
872 snaps.push(fields.clone());
873 }
874 rows.push(CreateRowInput {
875 collection: query.table.clone(),
876 fields,
877 metadata,
878 node_links: Vec::new(),
879 vector_links: Vec::new(),
880 });
881 }
882 let outputs = self.create_rows_batch(CreateRowsBatchInput {
883 collection: query.table.clone(),
884 rows,
885 suppress_events: query.suppress_events,
886 })?;
887 inserted_count = outputs.len() as u64;
888
889 if chain_mode {
893 if let Some(new_tip) = chain_tip_full.as_ref() {
894 self.inner
895 .chain_tip_cache
896 .lock()
897 .insert(query.table.clone(), new_tip.clone());
898 }
899 }
900
901 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
909 let time_col = &spec.time_column;
910 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
912 for row in &effective_rows {
913 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
914 if *n >= 0 {
915 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
916 }
917 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
918 let _ = self.inner.db.hypertables().route(&query.table, *n);
919 }
920 }
921 }
922 }
923
924 if let (Some(items), Some(snaps)) =
925 (query.returning.as_ref(), returning_snapshots.take())
926 {
927 let snaps = row_insert_returning_snapshots(&outputs, snaps);
928 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
929 }
930 } else {
931 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
938 Vec::with_capacity(effective_rows.len());
939 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
940 {
941 Vec::with_capacity(effective_rows.len())
942 } else {
943 Vec::new()
944 };
945 if matches!(
946 query.entity_type,
947 InsertEntityType::Node | InsertEntityType::Edge
948 ) {
949 enum PreparedGraphInsert {
950 Node {
951 fields: Vec<(String, Value)>,
952 input: CreateNodeInput,
953 },
954 Edge {
955 fields: Vec<(String, Value)>,
956 input: CreateEdgeInput,
957 },
958 }
959
960 let mut prepared = Vec::with_capacity(effective_rows.len());
961 for row_values in &effective_rows {
962 if row_values.len() != query.columns.len() {
963 return Err(RedDBError::Query(format!(
964 "INSERT column count ({}) does not match value count ({})",
965 query.columns.len(),
966 row_values.len()
967 )));
968 }
969
970 match query.entity_type {
971 InsertEntityType::Node => {
972 let (node_values, mut metadata) =
973 split_insert_metadata(self, &query.columns, row_values)?;
974 merge_with_clauses(
975 &mut metadata,
976 query.ttl_ms,
977 query.expires_at_ms,
978 &query.with_metadata,
979 );
980 ensure_non_tree_reserved_metadata_entries(&metadata)?;
981 apply_collection_default_ttl_metadata(
982 self,
983 &query.table,
984 &mut metadata,
985 );
986 let (columns, values) = pairwise_columns_values(&node_values);
987 let label = find_column_value_string(&columns, &values, "label")?;
988 let node_type =
989 find_column_value_opt_string(&columns, &values, "node_type");
990 let properties = extract_remaining_properties(
991 &columns,
992 &values,
993 &["label", "node_type"],
994 );
995 crate::reserved_fields::ensure_no_reserved_public_item_fields(
996 properties.iter().map(|(key, _)| key.as_str()),
997 &format!("node '{}'", query.table),
998 )?;
999 prepared.push(PreparedGraphInsert::Node {
1000 fields: node_values,
1001 input: CreateNodeInput {
1002 collection: query.table.clone(),
1003 label,
1004 node_type,
1005 properties,
1006 metadata,
1007 embeddings: Vec::new(),
1008 table_links: Vec::new(),
1009 node_links: Vec::new(),
1010 },
1011 });
1012 }
1013 InsertEntityType::Edge => {
1014 let (edge_values, mut metadata) =
1015 split_insert_metadata(self, &query.columns, row_values)?;
1016 merge_with_clauses(
1017 &mut metadata,
1018 query.ttl_ms,
1019 query.expires_at_ms,
1020 &query.with_metadata,
1021 );
1022 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1023 apply_collection_default_ttl_metadata(
1024 self,
1025 &query.table,
1026 &mut metadata,
1027 );
1028 let (columns, values) = pairwise_columns_values(&edge_values);
1029 let label = find_column_value_string(&columns, &values, "label")?;
1030 ensure_non_tree_structural_edge_label(&label)?;
1031 let from_id = resolve_edge_endpoint_any(
1032 self.inner.db.store().as_ref(),
1033 &query.table,
1034 &columns,
1035 &values,
1036 &["from_rid", "from"],
1037 )?;
1038 let to_id = resolve_edge_endpoint_any(
1039 self.inner.db.store().as_ref(),
1040 &query.table,
1041 &columns,
1042 &values,
1043 &["to_rid", "to"],
1044 )?;
1045 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1046 let properties = extract_remaining_properties(
1047 &columns,
1048 &values,
1049 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1050 );
1051 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1052 properties.iter().map(|(key, _)| key.as_str()),
1053 &format!("edge '{}'", query.table),
1054 )?;
1055 prepared.push(PreparedGraphInsert::Edge {
1056 fields: edge_values,
1057 input: CreateEdgeInput {
1058 collection: query.table.clone(),
1059 label,
1060 from: EntityId::new(from_id),
1061 to: EntityId::new(to_id),
1062 weight,
1063 properties,
1064 metadata,
1065 },
1066 });
1067 }
1068 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1069 }
1070 }
1071
1072 ensure_graph_insert_contract(self, &query.table)?;
1073 let mut batch = self.inner.db.batch();
1074 for item in prepared {
1075 match item {
1076 PreparedGraphInsert::Node { fields, input } => {
1077 if query.returning.is_some() {
1078 returning_field_snaps.push(fields);
1079 }
1080 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1081 batch = batch.add_node_with_type(
1082 input.collection,
1083 input.label,
1084 node_type,
1085 input.properties.into_iter().collect(),
1086 input.metadata.into_iter().collect(),
1087 );
1088 }
1089 PreparedGraphInsert::Edge { fields, input } => {
1090 if query.returning.is_some() {
1091 returning_field_snaps.push(fields);
1092 }
1093 batch = batch.add_edge(
1094 input.collection,
1095 input.label,
1096 input.from,
1097 input.to,
1098 input.weight.unwrap_or(1.0),
1099 input.properties.into_iter().collect(),
1100 input.metadata.into_iter().collect(),
1101 );
1102 }
1103 }
1104 }
1105 let batch_result = batch
1106 .execute()
1107 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1108 let (ids, entity_kind) = match query.entity_type {
1109 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1110 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1111 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1112 };
1113 for id in &ids {
1114 self.stamp_xmin_if_in_txn(&query.table, *id);
1115 }
1116 if query.returning.is_some() {
1117 returning_field_snaps = graph_insert_returning_snapshots(
1118 self.inner.db.store().as_ref(),
1119 &query.table,
1120 &ids,
1121 );
1122 }
1123 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1124 let store = self.inner.db.store();
1125 entity_outputs.extend(ids.iter().map(|id| {
1126 crate::application::entity::CreateEntityOutput {
1127 id: *id,
1128 entity: store.get(&query.table, *id),
1129 }
1130 }));
1131 inserted_count = ids.len() as u64;
1132 } else {
1133 for row_values in &effective_rows {
1134 if row_values.len() != query.columns.len() {
1135 return Err(RedDBError::Query(format!(
1136 "INSERT column count ({}) does not match value count ({})",
1137 query.columns.len(),
1138 row_values.len()
1139 )));
1140 }
1141
1142 match query.entity_type {
1143 InsertEntityType::Row => {
1144 if query.returning.is_some() {
1145 return Err(RedDBError::Query(
1146 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1147 .to_string(),
1148 ));
1149 }
1150 let (fields, mut metadata) =
1151 split_insert_metadata(self, &query.columns, row_values)?;
1152 merge_with_clauses(
1153 &mut metadata,
1154 query.ttl_ms,
1155 query.expires_at_ms,
1156 &query.with_metadata,
1157 );
1158 self.insert_timeseries_point(&query.table, fields, metadata)?;
1159 }
1160 InsertEntityType::Node | InsertEntityType::Edge => {
1161 unreachable!("NODE and EDGE are handled by the prepared graph path")
1162 }
1163 InsertEntityType::Vector => {
1164 let (vector_values, mut metadata) =
1165 split_insert_metadata(self, &query.columns, row_values)?;
1166 merge_with_clauses(
1167 &mut metadata,
1168 query.ttl_ms,
1169 query.expires_at_ms,
1170 &query.with_metadata,
1171 );
1172 let (columns, values) = pairwise_columns_values(&vector_values);
1173 let dense = find_column_value_vec_f32_any(
1174 &columns,
1175 &values,
1176 &["dense", "embedding"],
1177 )?;
1178 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1179 let content =
1180 find_column_value_opt_string(&columns, &values, "content");
1181 if query.returning.is_some() {
1182 returning_field_snaps.push(vector_values.clone());
1183 }
1184 let input = CreateVectorInput {
1185 collection: query.table.clone(),
1186 dense,
1187 content,
1188 metadata,
1189 link_row: None,
1190 link_node: None,
1191 };
1192 entity_outputs.push(self.create_vector(input)?);
1193 }
1194 InsertEntityType::Document => {
1195 let (document_values, mut metadata) =
1196 split_insert_metadata(self, &query.columns, row_values)?;
1197 merge_with_clauses(
1198 &mut metadata,
1199 query.ttl_ms,
1200 query.expires_at_ms,
1201 &query.with_metadata,
1202 );
1203 let (columns, values) = pairwise_columns_values(&document_values);
1204 let body_str = find_column_value_string(&columns, &values, "body")?;
1205 let body: crate::json::Value = crate::json::from_str(&body_str)
1206 .map_err(|e| {
1207 RedDBError::Query(format!("invalid JSON body: {e}"))
1208 })?;
1209 let input = CreateDocumentInput {
1210 collection: query.table.clone(),
1211 body,
1212 metadata,
1213 node_links: Vec::new(),
1214 vector_links: Vec::new(),
1215 };
1216 let output = self.create_document(input)?;
1217 if query.returning.is_some() {
1218 let fields = output
1219 .entity
1220 .as_ref()
1221 .map(entity_row_fields_snapshot)
1222 .filter(|fields| !fields.is_empty())
1223 .unwrap_or(document_values);
1224 returning_field_snaps.push(fields);
1225 }
1226 entity_outputs.push(output);
1227 }
1228 InsertEntityType::Kv => {
1229 let (kv_values, mut metadata) =
1230 split_insert_metadata(self, &query.columns, row_values)?;
1231 merge_with_clauses(
1232 &mut metadata,
1233 query.ttl_ms,
1234 query.expires_at_ms,
1235 &query.with_metadata,
1236 );
1237 let (columns, values) = pairwise_columns_values(&kv_values);
1238 let key = find_column_value_string(&columns, &values, "key")?;
1239 let value = find_column_value(&columns, &values, "value")?;
1240 if query.returning.is_some() {
1241 returning_field_snaps.push(kv_values.clone());
1242 }
1243 let input = CreateKvInput {
1244 collection: query.table.clone(),
1245 key,
1246 value,
1247 metadata,
1248 };
1249 entity_outputs.push(self.create_kv(input)?);
1250 }
1251 }
1252
1253 inserted_count += 1;
1254 }
1255 }
1256
1257 if let Some(items) = query.returning.as_ref() {
1258 if !entity_outputs.is_empty() {
1259 returning_result = Some(build_returning_result(
1260 items,
1261 &returning_field_snaps,
1262 Some(&entity_outputs),
1263 ));
1264 }
1265 }
1266 }
1267
1268 if let Some(ref embed_config) = query.auto_embed {
1270 let store = self.inner.db.store();
1271 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1272 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
1273 let model = embed_config.model.clone().unwrap_or_else(|| {
1274 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1275 .ok()
1276 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1277 });
1278
1279 let manager = store
1281 .get_collection(&query.table)
1282 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1283 let entities = manager.query_all(|_| true);
1284 let recent: Vec<_> = entities
1285 .into_iter()
1286 .rev()
1287 .take(effective_rows.len())
1288 .collect();
1289
1290 let entity_combos: Vec<(usize, String)> = recent
1292 .iter()
1293 .enumerate()
1294 .filter_map(|(i, entity)| {
1295 if let EntityData::Row(ref row) = entity.data {
1296 if let Some(ref named) = row.named {
1297 let texts: Vec<String> = embed_config
1298 .fields
1299 .iter()
1300 .filter_map(|field| match named.get(field) {
1301 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1302 _ => None,
1303 })
1304 .collect();
1305 if !texts.is_empty() {
1306 return Some((i, texts.join(" ")));
1307 }
1308 }
1309 }
1310 None
1311 })
1312 .collect();
1313
1314 if !entity_combos.is_empty() {
1315 let batch_texts: Vec<String> =
1317 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1318
1319 let batch_client =
1320 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1321
1322 let embeddings = match tokio::runtime::Handle::try_current() {
1323 Ok(handle) => tokio::task::block_in_place(|| {
1324 handle.block_on(batch_client.embed_batch(
1325 &provider,
1326 &model,
1327 &api_key,
1328 batch_texts,
1329 ))
1330 }),
1331 Err(_) => {
1332 return Err(RedDBError::Query(
1333 "AUTO EMBED requires a Tokio runtime context".to_string(),
1334 ));
1335 }
1336 }
1337 .map_err(|e| RedDBError::Query(e.to_string()))?;
1338
1339 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1341 if dense.is_empty() {
1342 continue;
1343 }
1344 self.create_vector(CreateVectorInput {
1345 collection: query.table.clone(),
1346 dense,
1347 content: Some(combined.clone()),
1348 metadata: Vec::new(),
1349 link_row: None,
1350 link_node: None,
1351 })?;
1352 }
1353 }
1354 }
1355
1356 if inserted_count > 0 {
1357 self.note_table_write(&query.table);
1358 }
1359
1360 let mut result = RuntimeQueryResult::dml_result(
1361 raw_query.to_string(),
1362 inserted_count,
1363 "insert",
1364 "runtime-dml",
1365 );
1366 if let Some(returning) = returning_result {
1367 result.result = returning;
1368 }
1369 Ok(result)
1370 }
1371
1372 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1373 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1374 return Ok(());
1375 };
1376 if !auth_store.iam_authorization_enabled() {
1377 return Ok(());
1378 }
1379 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1380 return Ok(());
1381 };
1382
1383 let tenant = crate::runtime::impl_core::current_tenant();
1384 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1385 let request = crate::auth::ColumnAccessRequest {
1386 action: "insert".to_string(),
1387 schema: None,
1388 table: query.table.clone(),
1389 columns: query.columns.clone(),
1390 };
1391 let ctx = crate::auth::policies::EvalContext {
1392 principal_tenant: tenant.clone(),
1393 current_tenant: tenant,
1394 peer_ip: None,
1395 mfa_present: false,
1396 now_ms: crate::auth::now_ms(),
1397 principal_is_admin_role: role == crate::auth::Role::Admin,
1398 };
1399
1400 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1401 let table_allowed = matches!(
1402 outcome.table_decision,
1403 crate::auth::policies::Decision::Allow { .. }
1404 | crate::auth::policies::Decision::AdminBypass
1405 );
1406 if !table_allowed {
1407 return Err(RedDBError::Query(format!(
1408 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1409 outcome.table_resource.kind, outcome.table_resource.name
1410 )));
1411 }
1412 if let Some(denied) = outcome.first_denied_column() {
1413 return Err(RedDBError::Query(format!(
1414 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1415 denied.resource.kind, denied.resource.name
1416 )));
1417 }
1418
1419 Ok(())
1420 }
1421
1422 pub(crate) fn insert_timeseries_point(
1423 &self,
1424 collection: &str,
1425 fields: Vec<(String, Value)>,
1426 mut metadata: Vec<(String, MetadataValue)>,
1427 ) -> RedDBResult<EntityId> {
1428 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1429
1430 let (columns, values) = pairwise_columns_values(&fields);
1431 validate_timeseries_insert_columns(&columns)?;
1432
1433 let event_name_opt =
1442 find_column_value_opt_string(&columns, &values, "event_name");
1443 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1444 if let Some(event_name) = event_name_opt.as_deref() {
1445 let store_for_schema = self.inner.db.store();
1446 if super::analytics_schema_registry::latest(
1447 store_for_schema.as_ref(),
1448 event_name,
1449 )
1450 .is_some()
1451 {
1452 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1453 super::analytics_schema_registry::validate(
1454 store_for_schema.as_ref(),
1455 event_name,
1456 payload_json,
1457 )
1458 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1459 }
1460 }
1461
1462 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1467 Some(m) => m,
1468 None => event_name_opt.clone().ok_or_else(|| {
1469 RedDBError::Query(
1470 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1471 )
1472 })?,
1473 };
1474 let value = match find_column_value_opt_string(&columns, &values, "value") {
1478 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1479 None => columns
1480 .iter()
1481 .position(|c| c.eq_ignore_ascii_case("value"))
1482 .and_then(|i| match &values[i] {
1483 Value::Float(f) => Some(*f),
1484 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1485 Value::UnsignedInteger(n) => Some(*n as f64),
1486 _ => None,
1487 })
1488 .unwrap_or(1.0),
1489 };
1490 let timestamp_ns =
1491 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1492 let mut tags = find_timeseries_tags(&columns, &values)?;
1493 if let Some(ref name) = event_name_opt {
1494 tags.entry("event_name".to_string())
1495 .or_insert_with(|| name.clone());
1496 }
1497 if let Some(ref payload) = payload_opt {
1498 tags.entry("payload".to_string())
1499 .or_insert_with(|| payload.clone());
1500 }
1501
1502 let mut entity = UnifiedEntity::new(
1503 EntityId::new(0),
1504 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1505 series: collection.to_string(),
1506 metric: metric.clone(),
1507 })),
1508 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1509 metric,
1510 timestamp_ns,
1511 value,
1512 tags,
1513 }),
1514 );
1515 let writer_xid = match self.current_xid() {
1519 Some(xid) => xid,
1520 None => {
1521 let mgr = self.snapshot_manager();
1522 let xid = mgr.begin();
1523 mgr.commit(xid);
1524 xid
1525 }
1526 };
1527 entity.set_xmin(writer_xid);
1528
1529 let store = self.inner.db.store();
1530 let id = store
1531 .insert_auto(collection, entity)
1532 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1533
1534 if !metadata.is_empty() {
1535 let _ = store.set_metadata(
1536 collection,
1537 id,
1538 Metadata::with_fields(metadata.into_iter().collect()),
1539 );
1540 }
1541
1542 self.cdc_emit(
1543 crate::replication::cdc::ChangeOperation::Insert,
1544 collection,
1545 id.raw(),
1546 "timeseries",
1547 );
1548
1549 Ok(id)
1550 }
1551
1552 pub fn execute_update(
1557 &self,
1558 raw_query: &str,
1559 query: &UpdateQuery,
1560 ) -> RedDBResult<RuntimeQueryResult> {
1561 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1562 if crate::runtime::blockchain_kind::is_chain(
1566 self.inner.db.store().as_ref(),
1567 &query.table,
1568 ) {
1569 return Err(RedDBError::InvalidOperation(format!(
1570 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1571 query.table
1572 )));
1573 }
1574 crate::runtime::collection_contract::CollectionContractGate::check(
1580 self,
1581 &query.table,
1582 crate::runtime::collection_contract::MutationKind::Update,
1583 )?;
1584 ensure_update_target_contract(self, &query.table, query.target)?;
1585 ensure_graph_identity_update_target_allowed(query)?;
1586
1587 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1593 let augmented_query: UpdateQuery;
1594 let effective_query: &UpdateQuery = if rls_gated {
1595 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1596 self,
1597 &query.table,
1598 crate::storage::query::ast::PolicyAction::Update,
1599 );
1600 let Some(policy) = rls_filter else {
1601 let mut response = RuntimeQueryResult::dml_result(
1604 raw_query.to_string(),
1605 0,
1606 "update",
1607 "runtime-dml-rls",
1608 );
1609 if let Some(items) = query.returning.clone() {
1610 response.result = build_returning_result(&items, &[], None);
1611 }
1612 return Ok(response);
1613 };
1614 let mut augmented = query.clone();
1615 augmented.filter = Some(match augmented.filter.take() {
1616 Some(existing) => {
1617 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1618 }
1619 None => policy,
1620 });
1621 augmented_query = augmented;
1622 &augmented_query
1623 } else {
1624 query
1625 };
1626
1627 if let Some(items) = effective_query.returning.clone() {
1632 let mut inner_query = effective_query.clone();
1633 inner_query.returning = None;
1634 let (mut response, touched_ids) =
1635 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1636
1637 let snapshots = if matches!(
1638 effective_query.target,
1639 UpdateTarget::Nodes | UpdateTarget::Edges
1640 ) {
1641 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1642 } else {
1643 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1644 .row_snapshots(&touched_ids)
1645 };
1646
1647 response.result = build_returning_result(&items, &snapshots, None);
1648 response.engine = "runtime-dml-returning";
1649 return Ok(response);
1650 }
1651
1652 self.execute_update_inner(raw_query, effective_query)
1653 }
1654
1655 fn execute_update_inner(
1657 &self,
1658 raw_query: &str,
1659 query: &UpdateQuery,
1660 ) -> RedDBResult<RuntimeQueryResult> {
1661 self.execute_update_inner_tracked(raw_query, query)
1662 .map(|(res, _)| res)
1663 }
1664
1665 fn execute_update_inner_tracked(
1666 &self,
1667 raw_query: &str,
1668 query: &UpdateQuery,
1669 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1670 let store = self.inner.db.store();
1671 let effective_filter = effective_update_filter(query);
1672 let compiled_plan = self.compile_update_plan(query)?;
1673 let mut touched_ids: Vec<EntityId> = Vec::new();
1674 let limit_cap = query.limit.map(|l| l as usize);
1675 let manager = store
1676 .get_collection(&query.table)
1677 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1678 let scan_limit = if query.order_by.is_empty() {
1679 limit_cap
1680 } else {
1681 None
1682 };
1683 let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1684 self,
1685 &query.table,
1686 effective_filter.as_ref(),
1687 scan_limit,
1688 query.target,
1689 )
1690 .find_target_ids()?;
1691 let ids_to_update = if query.order_by.is_empty() {
1692 ids_to_update
1693 } else {
1694 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1695 };
1696
1697 if update_needs_rmw_lock(query) {
1698 return self.execute_update_inner_tracked_locked(
1699 raw_query,
1700 query,
1701 &compiled_plan,
1702 &ids_to_update,
1703 effective_filter.as_ref(),
1704 );
1705 }
1706
1707 let mut affected: u64 = 0;
1708 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1709 let mut applied_chunk = Vec::with_capacity(chunk.len());
1710 for entity in manager.get_many(chunk).into_iter().flatten() {
1711 let assignments =
1712 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1713 let applied = self.apply_materialized_update_for_entity(
1714 query.table.clone(),
1715 entity,
1716 &compiled_plan,
1717 assignments,
1718 )?;
1719 touched_ids.push(applied.id);
1720 applied_chunk.push(applied);
1721 }
1722 self.persist_update_chunk(&applied_chunk)?;
1723 affected += applied_chunk.len() as u64;
1724 let lsns = self.flush_update_chunk(&applied_chunk)?;
1725 if !query.suppress_events {
1726 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1727 }
1728 }
1729
1730 if affected > 0 {
1731 self.note_table_write(&query.table);
1732 }
1733
1734 Ok((
1735 RuntimeQueryResult::dml_result(
1736 raw_query.to_string(),
1737 affected,
1738 "update",
1739 "runtime-dml",
1740 ),
1741 touched_ids,
1742 ))
1743 }
1744
1745 fn execute_update_inner_tracked_locked(
1746 &self,
1747 raw_query: &str,
1748 query: &UpdateQuery,
1749 compiled_plan: &CompiledUpdatePlan,
1750 ids_to_update: &[EntityId],
1751 effective_filter: Option<&Filter>,
1752 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1753 let store = self.inner.db.store();
1754 let mut touched_ids = Vec::new();
1755 let mut lock_entries = Vec::new();
1756
1757 for id in ids_to_update {
1758 let Some(candidate) = store.get(&query.table, *id) else {
1759 continue;
1760 };
1761 let logical_id = candidate.logical_id();
1762 let lock_key = format!("row:{}", logical_id.raw());
1763 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1764 lock_entries.push((lock_key, logical_id, rmw_lock));
1765 }
1766
1767 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1768 lock_entries.dedup_by(|left, right| left.0 == right.0);
1769 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1770
1771 let mut applied_chunk = Vec::new();
1772 for (_, logical_id, _) in &lock_entries {
1773 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1774 else {
1775 continue;
1776 };
1777 if let Some(filter) = effective_filter {
1778 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1779 Some(self.inner.db.as_ref()),
1780 &entity,
1781 filter,
1782 &query.table,
1783 &query.table,
1784 ) {
1785 continue;
1786 }
1787 }
1788
1789 let assignments =
1790 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1791 let applied = self.apply_materialized_update_for_entity(
1792 query.table.clone(),
1793 entity,
1794 compiled_plan,
1795 assignments,
1796 )?;
1797 touched_ids.push(applied.id);
1798 applied_chunk.push(applied);
1799 }
1800
1801 let affected = applied_chunk.len() as u64;
1802 if !applied_chunk.is_empty() {
1803 self.persist_update_chunk(&applied_chunk)?;
1804 let lsns = self.flush_update_chunk(&applied_chunk)?;
1805 if !query.suppress_events {
1806 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1807 }
1808 }
1809
1810 if affected > 0 {
1811 self.note_table_write(&query.table);
1812 }
1813
1814 Ok((
1815 RuntimeQueryResult::dml_result(
1816 raw_query.to_string(),
1817 affected,
1818 "update",
1819 "runtime-dml",
1820 ),
1821 touched_ids,
1822 ))
1823 }
1824
1825 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1826 let mut static_field_assignments = Vec::new();
1827 let mut static_metadata_assignments = Vec::new();
1828 let mut dynamic_assignments = Vec::new();
1829 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1830 let mut row_modified_columns = Vec::new();
1831
1832 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1833 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1834 let metadata_key = resolve_sql_ttl_metadata_key(column);
1835 if compound_op.is_some() && metadata_key.is_some() {
1836 return Err(RedDBError::Query(format!(
1837 "compound assignment is only supported for row fields: {column}"
1838 )));
1839 }
1840 if compound_op.is_none() {
1841 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1842 if let Some(metadata_key) = metadata_key {
1843 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1844 let (canonical_key, canonical_value) =
1845 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1846 static_metadata_assignments
1847 .push((canonical_key.to_string(), canonical_value));
1848 } else {
1849 let value = self.resolve_crypto_sentinel(value)?;
1850 static_field_assignments.push((
1851 column.clone(),
1852 normalize_row_update_assignment_with_plan(
1853 &query.table,
1854 column,
1855 value,
1856 row_contract_plan.as_ref(),
1857 )?,
1858 ));
1859 row_modified_columns.push(column.clone());
1860 }
1861 continue;
1862 }
1863 }
1864
1865 dynamic_assignments.push(CompiledUpdateAssignment {
1866 column: column.clone(),
1867 expr: expr.clone(),
1868 compound_op,
1869 metadata_key,
1870 row_rule: if metadata_key.is_none() {
1871 if let Some(plan) = row_contract_plan.as_ref() {
1872 if plan.timestamps_enabled
1873 && (column == "created_at" || column == "updated_at")
1874 {
1875 return Err(RedDBError::Query(format!(
1876 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1877 query.table, column
1878 )));
1879 }
1880 if let Some(rule) = plan.declared_rules.get(column) {
1881 Some(rule.clone())
1882 } else if plan.strict_schema {
1883 return Err(RedDBError::Query(format!(
1884 "collection '{}' is strict and does not allow undeclared fields: {}",
1885 query.table, column
1886 )));
1887 } else {
1888 None
1889 }
1890 } else {
1891 None
1892 }
1893 } else {
1894 None
1895 },
1896 });
1897 if metadata_key.is_none() {
1898 row_modified_columns.push(column.clone());
1899 }
1900 }
1901
1902 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1903 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1904 row_modified_columns.iter().any(|column| {
1905 plan.unique_columns
1906 .keys()
1907 .any(|unique| unique.eq_ignore_ascii_case(column))
1908 })
1909 });
1910
1911 if let Some(ttl_ms) = query.ttl_ms {
1912 static_metadata_assignments
1913 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1914 }
1915 if let Some(expires_at_ms) = query.expires_at_ms {
1916 static_metadata_assignments.push((
1917 "_expires_at".to_string(),
1918 metadata_u64_to_value(expires_at_ms),
1919 ));
1920 }
1921 for (key, val) in &query.with_metadata {
1922 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1923 }
1924
1925 Ok(CompiledUpdatePlan {
1926 static_field_assignments,
1927 static_metadata_assignments,
1928 dynamic_assignments,
1929 row_contract_plan,
1930 row_modified_columns,
1931 row_touches_unique_columns,
1932 })
1933 }
1934
1935 fn materialize_update_assignments_for_entity(
1936 &self,
1937 query: &UpdateQuery,
1938 entity: &UnifiedEntity,
1939 compiled_plan: &CompiledUpdatePlan,
1940 ) -> RedDBResult<MaterializedUpdateAssignments> {
1941 let mut assignments = MaterializedUpdateAssignments::default();
1942 let mut record: Option<UnifiedRecord> = None;
1943
1944 for assignment in &compiled_plan.dynamic_assignments {
1945 if assignment.compound_op.is_some()
1946 && !matches!(
1947 entity.data,
1948 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1949 )
1950 {
1951 return Err(RedDBError::Query(format!(
1952 "compound assignment is only supported for row or graph UPDATE column '{}'",
1953 assignment.column
1954 )));
1955 }
1956 if record.is_none() {
1957 record = runtime_any_record_from_entity_ref(entity);
1958 }
1959 let Some(record) = record.as_ref() else {
1960 return Err(RedDBError::Query(format!(
1961 "UPDATE could not materialize runtime record for entity {} in '{}'",
1962 entity.id.raw(),
1963 query.table
1964 )));
1965 };
1966 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1967 Some(self.inner.db.as_ref()),
1968 &assignment.expr,
1969 record,
1970 Some(query.table.as_str()),
1971 Some(query.table.as_str()),
1972 )
1973 .ok_or_else(|| {
1974 RedDBError::Query(format!(
1975 "failed to evaluate UPDATE expression for column '{}'",
1976 assignment.column
1977 ))
1978 })?;
1979 let value = if let Some(op) = assignment.compound_op {
1980 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1981 } else {
1982 rhs
1983 };
1984
1985 if let Some(metadata_key) = assignment.metadata_key {
1986 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1987 let (canonical_key, canonical_value) =
1988 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1989 assignments
1990 .dynamic_metadata_assignments
1991 .push((canonical_key.to_string(), canonical_value));
1992 } else {
1993 assignments.dynamic_field_assignments.push((
1994 assignment.column.clone(),
1995 normalize_row_update_value_for_rule(
1996 &query.table,
1997 self.resolve_crypto_sentinel(value)?,
1998 assignment.row_rule.as_ref(),
1999 )?,
2000 ));
2001 }
2002 }
2003
2004 Ok(assignments)
2005 }
2006
2007 fn apply_materialized_update_for_entity(
2008 &self,
2009 collection: String,
2010 entity: UnifiedEntity,
2011 compiled_plan: &CompiledUpdatePlan,
2012 assignments: MaterializedUpdateAssignments,
2013 ) -> RedDBResult<AppliedEntityMutation> {
2014 if matches!(entity.data, EntityData::Row(_)) {
2015 return self.apply_loaded_sql_update_row_core(
2016 collection,
2017 entity,
2018 &compiled_plan.static_field_assignments,
2019 assignments.dynamic_field_assignments,
2020 &compiled_plan.static_metadata_assignments,
2021 assignments.dynamic_metadata_assignments,
2022 compiled_plan.row_contract_plan.as_ref(),
2023 &compiled_plan.row_modified_columns,
2024 compiled_plan.row_touches_unique_columns,
2025 );
2026 }
2027
2028 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2029
2030 let operations = build_patch_operations_from_materialized_assignments(
2031 &entity,
2032 compiled_plan,
2033 assignments,
2034 );
2035 self.apply_loaded_patch_entity_core(
2036 collection,
2037 entity,
2038 crate::json::Value::Null,
2039 operations,
2040 )
2041 }
2042
2043 pub fn execute_delete(
2045 &self,
2046 raw_query: &str,
2047 query: &DeleteQuery,
2048 ) -> RedDBResult<RuntimeQueryResult> {
2049 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2050 if crate::runtime::blockchain_kind::is_chain(
2053 self.inner.db.store().as_ref(),
2054 &query.table,
2055 ) {
2056 return Err(RedDBError::InvalidOperation(format!(
2057 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2058 query.table
2059 )));
2060 }
2061 crate::runtime::collection_contract::CollectionContractGate::check(
2065 self,
2066 &query.table,
2067 crate::runtime::collection_contract::MutationKind::Delete,
2068 )?;
2069
2070 if let Some(items) = query.returning.clone() {
2077 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2078 RedDBError::Query(
2079 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2080 )
2081 })?;
2082 let captured = self.execute_query(&select_sql)?;
2083
2084 let mut inner_query = query.clone();
2085 inner_query.returning = None;
2086 let _ = self.execute_delete(raw_query, &inner_query)?;
2087
2088 let snapshots: Vec<Vec<(String, Value)>> = captured
2089 .result
2090 .records
2091 .iter()
2092 .map(|rec| {
2093 rec.iter_fields()
2094 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2095 .collect()
2096 })
2097 .collect();
2098 let affected = snapshots.len() as u64;
2099 let result = build_returning_result(&items, &snapshots, None);
2100
2101 let mut response = RuntimeQueryResult::dml_result(
2102 raw_query.to_string(),
2103 affected,
2104 "delete",
2105 "runtime-dml-returning",
2106 );
2107 response.result = result;
2108 return Ok(response);
2109 }
2110 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2117 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2118 self,
2119 &query.table,
2120 crate::storage::query::ast::PolicyAction::Delete,
2121 );
2122 let Some(policy) = rls_filter else {
2123 return Ok(RuntimeQueryResult::dml_result(
2124 raw_query.to_string(),
2125 0,
2126 "delete",
2127 "runtime-dml-rls",
2128 ));
2129 };
2130 let mut augmented = query.clone();
2135 augmented.filter = Some(match augmented.filter.take() {
2136 Some(existing) => {
2137 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2138 }
2139 None => policy,
2140 });
2141 return self.execute_delete_inner(raw_query, &augmented);
2142 }
2143 self.execute_delete_inner(raw_query, query)
2144 }
2145
2146 fn execute_delete_inner(
2147 &self,
2148 raw_query: &str,
2149 query: &DeleteQuery,
2150 ) -> RedDBResult<RuntimeQueryResult> {
2151 let effective_filter = effective_delete_filter(query);
2152
2153 let scan = super::dml_target_scan::DmlTargetScan::new(
2157 self,
2158 &query.table,
2159 effective_filter.as_ref(),
2160 None,
2161 );
2162 let ids_to_delete = scan.find_target_ids()?;
2163
2164 let needs_delete_events =
2167 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2168 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2169 scan.row_json_pre_images(&ids_to_delete)
2170 } else {
2171 HashMap::new()
2172 };
2173
2174 let mut affected: u64 = 0;
2175 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2176 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2177 affected += count;
2178 if needs_delete_events && !lsns.is_empty() {
2179 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2183 self.emit_delete_events_for_collection(
2184 &query.table,
2185 deleted_chunk,
2186 &lsns,
2187 &pre_images,
2188 )?;
2189 }
2190 }
2191 pre_images.clear();
2192
2193 if affected > 0 {
2194 self.note_table_write(&query.table);
2195 }
2196
2197 Ok(RuntimeQueryResult::dml_result(
2198 raw_query.to_string(),
2199 affected,
2200 "delete",
2201 "runtime-dml",
2202 ))
2203 }
2204}
2205
2206fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2212 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2213 return Ok(());
2214 }
2215 for (column, _) in &query.assignment_exprs {
2216 if is_immutable_graph_identity_field(column) {
2217 return Err(RedDBError::Query(format!(
2218 "immutable graph field '{column}' cannot be updated"
2219 )));
2220 }
2221 }
2222 Ok(())
2223}
2224
2225fn ensure_graph_identity_update_allowed(
2226 entity: &UnifiedEntity,
2227 compiled_plan: &CompiledUpdatePlan,
2228 assignments: &MaterializedUpdateAssignments,
2229) -> RedDBResult<()> {
2230 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2231 return Ok(());
2232 }
2233
2234 for (column, _) in compiled_plan
2235 .static_field_assignments
2236 .iter()
2237 .chain(assignments.dynamic_field_assignments.iter())
2238 {
2239 if is_immutable_graph_identity_field(column) {
2240 return Err(RedDBError::Query(format!(
2241 "immutable graph field '{column}' cannot be updated"
2242 )));
2243 }
2244 }
2245
2246 Ok(())
2247}
2248
2249fn is_immutable_graph_identity_field(column: &str) -> bool {
2250 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2251 .iter()
2252 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2253}
2254
2255fn build_patch_operations_from_materialized_assignments(
2256 entity: &UnifiedEntity,
2257 compiled_plan: &CompiledUpdatePlan,
2258 assignments: MaterializedUpdateAssignments,
2259) -> Vec<PatchEntityOperation> {
2260 let mut operations = Vec::with_capacity(
2261 compiled_plan.static_field_assignments.len()
2262 + compiled_plan.static_metadata_assignments.len()
2263 + assignments.dynamic_field_assignments.len()
2264 + assignments.dynamic_metadata_assignments.len(),
2265 );
2266
2267 for (column, value) in &compiled_plan.static_field_assignments {
2268 operations.push(PatchEntityOperation {
2269 op: PatchEntityOperationType::Set,
2270 path: update_patch_path_for_entity(entity, column),
2271 value: Some(storage_value_to_json(value)),
2272 });
2273 }
2274
2275 for (column, value) in assignments.dynamic_field_assignments {
2276 operations.push(PatchEntityOperation {
2277 op: PatchEntityOperationType::Set,
2278 path: update_patch_path_for_entity(entity, &column),
2279 value: Some(storage_value_to_json(&value)),
2280 });
2281 }
2282
2283 for (key, value) in &compiled_plan.static_metadata_assignments {
2284 operations.push(PatchEntityOperation {
2285 op: PatchEntityOperationType::Set,
2286 path: vec!["metadata".to_string(), key.clone()],
2287 value: Some(metadata_value_to_json(value)),
2288 });
2289 }
2290
2291 for (key, value) in assignments.dynamic_metadata_assignments {
2292 operations.push(PatchEntityOperation {
2293 op: PatchEntityOperationType::Set,
2294 path: vec!["metadata".to_string(), key],
2295 value: Some(metadata_value_to_json(&value)),
2296 });
2297 }
2298
2299 operations
2300}
2301
2302fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2303 if matches!(
2304 (&entity.kind, &entity.data),
2305 (
2306 crate::storage::EntityKind::GraphNode(_),
2307 EntityData::Node(_)
2308 )
2309 ) && column.eq_ignore_ascii_case("node_type")
2310 {
2311 return vec!["node_type".to_string()];
2312 }
2313 if matches!(
2314 (&entity.kind, &entity.data),
2315 (
2316 crate::storage::EntityKind::GraphEdge(_),
2317 EntityData::Edge(_)
2318 )
2319 ) && column.eq_ignore_ascii_case("weight")
2320 {
2321 return vec!["weight".to_string()];
2322 }
2323 vec!["fields".to_string(), column.to_string()]
2324}
2325
2326fn delete_to_select_sql(sql: &str) -> Option<String> {
2336 let trimmed = sql.trim_start();
2337 let lowered = trimmed.to_ascii_lowercase();
2338 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2339 return None;
2340 }
2341 let from_idx = lowered.find(" from ")?;
2343 let after_from = &trimmed[from_idx + " from ".len()..];
2344 let after_from_lc = &lowered[from_idx + " from ".len()..];
2345
2346 let mut body = after_from.to_string();
2351 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2352 body.truncate(pos);
2353 }
2354 Some(format!("SELECT * FROM {}", body.trim_end()))
2355}
2356
2357fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2362 let bytes = haystack.as_bytes();
2363 let nlen = needle.len();
2364 let mut i = 0usize;
2365 let mut in_string = false;
2366 while i < bytes.len() {
2367 let c = bytes[i];
2368 if c == b'\'' {
2369 in_string = !in_string;
2370 i += 1;
2371 continue;
2372 }
2373 if !in_string
2374 && i + nlen <= bytes.len()
2375 && &bytes[i..i + nlen] == needle.as_bytes()
2376 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2377 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2378 {
2379 return Some(i);
2380 }
2381 i += 1;
2382 }
2383 None
2384}
2385
2386fn build_returning_result(
2393 items: &[ReturningItem],
2394 snapshots: &[Vec<(String, Value)>],
2395 outputs: Option<&[CreateEntityOutput]>,
2396) -> UnifiedResult {
2397 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2398 let public_item_outputs = outputs.is_some_and(|outs| {
2399 outs.first()
2400 .and_then(|out| out.entity.as_ref())
2401 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2402 });
2403
2404 let mut columns: Vec<String> = if project_all {
2405 let mut cols: Vec<String> = Vec::new();
2406 if public_item_outputs {
2407 cols.extend(
2408 [
2409 "rid",
2410 "collection",
2411 "kind",
2412 "tenant",
2413 "created_at",
2414 "updated_at",
2415 ]
2416 .into_iter()
2417 .map(str::to_string),
2418 );
2419 } else if outputs.is_some() {
2420 cols.push("red_entity_id".to_string());
2421 }
2422 if let Some(first) = snapshots.first() {
2423 for (name, _) in first {
2424 cols.push(name.clone());
2425 }
2426 }
2427 cols
2428 } else {
2429 items
2430 .iter()
2431 .filter_map(|it| match it {
2432 ReturningItem::Column(c) => Some(c.clone()),
2433 ReturningItem::All => None,
2434 })
2435 .collect()
2436 };
2437 {
2439 let mut seen = std::collections::HashSet::new();
2440 columns.retain(|c| seen.insert(c.clone()));
2441 }
2442
2443 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2444 for (idx, snap) in snapshots.iter().enumerate() {
2445 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2446 if let Some(outs) = outputs {
2447 if let Some(out) = outs.get(idx) {
2448 if let Some(entity) = out.entity.as_ref() {
2449 if let Some(kind) = public_returning_item_kind(entity) {
2450 values.insert(
2451 Arc::clone(&sys_key_rid()),
2452 Value::UnsignedInteger(out.id.raw()),
2453 );
2454 values.insert(
2455 Arc::clone(&sys_key_collection()),
2456 Value::text(entity.kind.collection().to_string()),
2457 );
2458 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2459 values.insert(
2460 Arc::clone(&sys_key_created_at()),
2461 Value::UnsignedInteger(entity.created_at),
2462 );
2463 values.insert(
2464 Arc::clone(&sys_key_updated_at()),
2465 Value::UnsignedInteger(entity.updated_at),
2466 );
2467 } else {
2468 values.insert(
2469 Arc::clone(&sys_key_red_entity_id()),
2470 Value::Integer(out.id.raw() as i64),
2471 );
2472 }
2473 } else {
2474 values.insert(
2475 Arc::clone(&sys_key_red_entity_id()),
2476 Value::Integer(out.id.raw() as i64),
2477 );
2478 }
2479 }
2480 }
2481 for (name, val) in snap {
2482 values.insert(Arc::from(name.as_str()), val.clone());
2483 }
2484 if !values.contains_key("tenant") {
2485 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2486 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2487 }
2488 let mut rec = UnifiedRecord::default();
2489 for col in &columns {
2491 if let Some(v) = values.get(col.as_str()) {
2492 rec.set_arc(Arc::from(col.as_str()), v.clone());
2493 }
2494 }
2495 records.push(rec);
2496 }
2497
2498 UnifiedResult {
2499 columns,
2500 records,
2501 stats: Default::default(),
2502 pre_serialized_json: None,
2503 }
2504}
2505
2506fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2507 match (&entity.kind, &entity.data) {
2508 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2509 Some("node")
2510 }
2511 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2512 Some("edge")
2513 }
2514 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2515 _ => None,
2516 }
2517}
2518
2519fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2520 let Some(row) = entity.data.as_row() else {
2521 return "row";
2522 };
2523
2524 let is_kv = row.named.as_ref().is_some_and(|named| {
2525 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2526 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2527 });
2528 if is_kv {
2529 return "kv";
2530 }
2531
2532 let is_document = row
2533 .named
2534 .as_ref()
2535 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2536 || row.columns.iter().any(runtime_returning_documentish_value);
2537 if is_document {
2538 "document"
2539 } else {
2540 "row"
2541 }
2542}
2543
2544fn runtime_returning_documentish_value(value: &Value) -> bool {
2545 matches!(value, Value::Json(_) | Value::Blob(_))
2546}
2547
2548fn row_insert_returning_snapshots(
2549 outputs: &[CreateEntityOutput],
2550 fallback: Vec<Vec<(String, Value)>>,
2551) -> Vec<Vec<(String, Value)>> {
2552 outputs
2553 .iter()
2554 .enumerate()
2555 .map(|(idx, out)| {
2556 out.entity
2557 .as_ref()
2558 .map(entity_row_fields_snapshot)
2559 .filter(|snap| !snap.is_empty())
2560 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2561 })
2562 .collect()
2563}
2564
2565fn graph_insert_returning_snapshots(
2566 store: &crate::storage::unified::UnifiedStore,
2567 collection: &str,
2568 ids: &[EntityId],
2569) -> Vec<Vec<(String, Value)>> {
2570 let Some(manager) = store.get_collection(collection) else {
2571 return Vec::new();
2572 };
2573
2574 ids.iter()
2575 .filter_map(|id| manager.get(*id))
2576 .filter_map(|entity| {
2577 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2578 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2579 Some(record)
2580 })
2581 .map(|record| {
2582 record
2583 .iter_fields()
2584 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2585 .collect()
2586 })
2587 .collect()
2588}
2589
2590fn graph_update_returning_snapshots(
2591 runtime: &RedDBRuntime,
2592 collection: &str,
2593 ids: &[EntityId],
2594) -> Vec<Vec<(String, Value)>> {
2595 let store = runtime.db().store();
2596 let Some(manager) = store.get_collection(collection) else {
2597 return Vec::new();
2598 };
2599
2600 manager
2601 .get_many(ids)
2602 .into_iter()
2603 .flatten()
2604 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2605 .map(|record| {
2606 record
2607 .iter_fields()
2608 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2609 .collect()
2610 })
2611 .collect()
2612}
2613
2614fn ensure_update_target_contract(
2615 runtime: &RedDBRuntime,
2616 collection: &str,
2617 target: UpdateTarget,
2618) -> RedDBResult<()> {
2619 let Some(contract) = runtime.db().collection_contract(collection) else {
2620 return Ok(());
2621 };
2622 if update_target_contract_is_advisory(&contract)
2623 || update_target_allows_model(contract.declared_model, update_target_model(target))
2624 {
2625 return Ok(());
2626 }
2627 Err(RedDBError::InvalidOperation(format!(
2628 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2629 collection,
2630 update_model_name(contract.declared_model),
2631 update_model_name(update_target_model(target))
2632 )))
2633}
2634
2635fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2636 matches!(
2637 (&contract.origin, &contract.schema_mode),
2638 (
2639 crate::physical::ContractOrigin::Implicit,
2640 crate::catalog::SchemaMode::Dynamic,
2641 )
2642 )
2643}
2644
2645fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2646 match target {
2647 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2648 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2649 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2650 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2651 }
2652}
2653
2654fn update_target_allows_model(
2655 declared_model: crate::catalog::CollectionModel,
2656 requested_model: crate::catalog::CollectionModel,
2657) -> bool {
2658 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2659}
2660
2661fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2662 match model {
2663 crate::catalog::CollectionModel::Table => "table",
2664 crate::catalog::CollectionModel::Document => "document",
2665 crate::catalog::CollectionModel::Graph => "graph",
2666 crate::catalog::CollectionModel::Vector => "vector",
2667 crate::catalog::CollectionModel::Hll => "hll",
2668 crate::catalog::CollectionModel::Sketch => "sketch",
2669 crate::catalog::CollectionModel::Filter => "filter",
2670 crate::catalog::CollectionModel::Kv => "kv",
2671 crate::catalog::CollectionModel::Config => "config",
2672 crate::catalog::CollectionModel::Vault => "vault",
2673 crate::catalog::CollectionModel::Mixed => "mixed",
2674 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2675 crate::catalog::CollectionModel::Queue => "queue",
2676 crate::catalog::CollectionModel::Metrics => "metrics",
2677 }
2678}
2679
2680fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2681 let db = runtime.db();
2682 if let Some(contract) = db.collection_contract(collection) {
2683 let advisory_implicit_dynamic = matches!(
2684 (&contract.origin, &contract.schema_mode),
2685 (
2686 crate::physical::ContractOrigin::Implicit,
2687 crate::catalog::SchemaMode::Dynamic,
2688 )
2689 );
2690 if advisory_implicit_dynamic
2691 || matches!(
2692 contract.declared_model,
2693 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2694 )
2695 {
2696 return Ok(());
2697 }
2698 return Err(RedDBError::InvalidOperation(format!(
2699 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2700 collection, contract.declared_model
2701 )));
2702 }
2703
2704 let now = std::time::SystemTime::now()
2705 .duration_since(std::time::UNIX_EPOCH)
2706 .unwrap_or_default()
2707 .as_millis();
2708 db.save_collection_contract(crate::physical::CollectionContract {
2709 name: collection.to_string(),
2710 declared_model: crate::catalog::CollectionModel::Graph,
2711 schema_mode: crate::catalog::SchemaMode::Dynamic,
2712 origin: crate::physical::ContractOrigin::Implicit,
2713 version: 1,
2714 created_at_unix_ms: now,
2715 updated_at_unix_ms: now,
2716 default_ttl_ms: db.collection_default_ttl_ms(collection),
2717 vector_dimension: None,
2718 vector_metric: None,
2719 context_index_fields: Vec::new(),
2720 declared_columns: Vec::new(),
2721 table_def: None,
2722 timestamps_enabled: false,
2723 context_index_enabled: false,
2724 metrics_raw_retention_ms: None,
2725 metrics_rollup_policies: Vec::new(),
2726 metrics_tenant_identity: None,
2727 metrics_namespace: None,
2728 append_only: false,
2729 subscriptions: Vec::new(),
2730 session_key: None,
2731 session_gap_ms: None,
2732 retention_duration_ms: None,
2733 })
2734 .map(|_| ())
2735 .map_err(|err| RedDBError::Internal(err.to_string()))
2736}
2737
2738fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2739 query
2740 .assignment_exprs
2741 .iter()
2742 .enumerate()
2743 .any(|(idx, (column, expr))| {
2744 query
2745 .compound_assignment_ops
2746 .get(idx)
2747 .is_some_and(|op| op.is_some())
2748 || expr_references_update_column(expr, &query.table, column)
2749 })
2750}
2751
2752fn evaluate_compound_update_assignment(
2753 column: &str,
2754 record: &UnifiedRecord,
2755 op: BinOp,
2756 rhs: Value,
2757) -> RedDBResult<Value> {
2758 let lhs = record.get(column).ok_or_else(|| {
2759 RedDBError::Query(format!(
2760 "compound assignment requires existing numeric field '{column}'"
2761 ))
2762 })?;
2763 if matches!(lhs, Value::Null) {
2764 return Err(RedDBError::Query(format!(
2765 "compound assignment requires non-null numeric field '{column}'"
2766 )));
2767 }
2768 apply_compound_numeric_op(column, op, lhs, &rhs)
2769}
2770
2771fn apply_compound_numeric_op(
2772 column: &str,
2773 op: BinOp,
2774 lhs: &Value,
2775 rhs: &Value,
2776) -> RedDBResult<Value> {
2777 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2778 return Err(RedDBError::Query(format!(
2779 "compound assignment requires numeric field '{column}'"
2780 )));
2781 };
2782 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2783 return Err(RedDBError::Query(format!(
2784 "compound assignment requires numeric right-hand value for field '{column}'"
2785 )));
2786 };
2787
2788 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2789 let a = lhs_number.as_f64();
2790 let b = rhs_number.as_f64();
2791 let out = match op {
2792 BinOp::Add => a + b,
2793 BinOp::Sub => a - b,
2794 BinOp::Mul => a * b,
2795 BinOp::Div => {
2796 if b == 0.0 {
2797 return Err(RedDBError::Query(format!(
2798 "division by zero in compound assignment for field '{column}'"
2799 )));
2800 }
2801 a / b
2802 }
2803 BinOp::Mod => {
2804 if b == 0.0 {
2805 return Err(RedDBError::Query(format!(
2806 "modulo by zero in compound assignment for field '{column}'"
2807 )));
2808 }
2809 a % b
2810 }
2811 _ => {
2812 return Err(RedDBError::Query(format!(
2813 "unsupported compound assignment operator for field '{column}'"
2814 )));
2815 }
2816 };
2817 if !out.is_finite() {
2818 return Err(RedDBError::Query(format!(
2819 "numeric overflow in compound assignment for field '{column}'"
2820 )));
2821 }
2822 return Ok(Value::Float(out));
2823 }
2824
2825 let a = lhs_number.as_i128();
2826 let b = rhs_number.as_i128();
2827 let out = match op {
2828 BinOp::Add => a.checked_add(b),
2829 BinOp::Sub => a.checked_sub(b),
2830 BinOp::Mul => a.checked_mul(b),
2831 BinOp::Mod => {
2832 if b == 0 {
2833 return Err(RedDBError::Query(format!(
2834 "modulo by zero in compound assignment for field '{column}'"
2835 )));
2836 }
2837 a.checked_rem(b)
2838 }
2839 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2840 _ => None,
2841 }
2842 .ok_or_else(|| {
2843 RedDBError::Query(format!(
2844 "numeric overflow in compound assignment for field '{column}'"
2845 ))
2846 })?;
2847
2848 if matches!(lhs, Value::UnsignedInteger(_)) {
2849 let value = u64::try_from(out).map_err(|_| {
2850 RedDBError::Query(format!(
2851 "numeric overflow in compound assignment for field '{column}'"
2852 ))
2853 })?;
2854 Ok(Value::UnsignedInteger(value))
2855 } else {
2856 let value = i64::try_from(out).map_err(|_| {
2857 RedDBError::Query(format!(
2858 "numeric overflow in compound assignment for field '{column}'"
2859 ))
2860 })?;
2861 Ok(Value::Integer(value))
2862 }
2863}
2864
2865#[derive(Clone, Copy)]
2866enum CompoundNumber {
2867 Integer(i128),
2868 Float(f64),
2869}
2870
2871impl CompoundNumber {
2872 fn from_value(value: &Value) -> Option<Self> {
2873 match value {
2874 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2875 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2876 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2877 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2878 _ => None,
2879 }
2880 }
2881
2882 fn is_float(self) -> bool {
2883 matches!(self, Self::Float(_))
2884 }
2885
2886 fn as_f64(self) -> f64 {
2887 match self {
2888 Self::Integer(value) => value as f64,
2889 Self::Float(value) => value,
2890 }
2891 }
2892
2893 fn as_i128(self) -> i128 {
2894 match self {
2895 Self::Integer(value) => value,
2896 Self::Float(_) => unreachable!("float compound number used as integer"),
2897 }
2898 }
2899}
2900
2901fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2902 match expr {
2903 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2904 Expr::Column { field, .. } => {
2905 field_ref_matches_update_column(field, table_name, target_column)
2906 }
2907 Expr::BinaryOp { lhs, rhs, .. } => {
2908 expr_references_update_column(lhs, table_name, target_column)
2909 || expr_references_update_column(rhs, table_name, target_column)
2910 }
2911 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2912 expr_references_update_column(operand, table_name, target_column)
2913 }
2914 Expr::FunctionCall { args, .. } => args
2915 .iter()
2916 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2917 Expr::Case {
2918 branches, else_, ..
2919 } => {
2920 branches.iter().any(|(cond, value)| {
2921 expr_references_update_column(cond, table_name, target_column)
2922 || expr_references_update_column(value, table_name, target_column)
2923 }) || else_
2924 .as_deref()
2925 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2926 }
2927 Expr::IsNull { operand, .. } => {
2928 expr_references_update_column(operand, table_name, target_column)
2929 }
2930 Expr::InList { target, values, .. } => {
2931 expr_references_update_column(target, table_name, target_column)
2932 || values
2933 .iter()
2934 .any(|value| expr_references_update_column(value, table_name, target_column))
2935 }
2936 Expr::Between {
2937 target, low, high, ..
2938 } => {
2939 expr_references_update_column(target, table_name, target_column)
2940 || expr_references_update_column(low, table_name, target_column)
2941 || expr_references_update_column(high, table_name, target_column)
2942 }
2943 }
2944}
2945
2946fn field_ref_matches_update_column(
2947 field: &FieldRef,
2948 table_name: &str,
2949 target_column: &str,
2950) -> bool {
2951 match field {
2952 FieldRef::TableColumn { table, column } => {
2953 column.eq_ignore_ascii_case(target_column)
2954 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2955 }
2956 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2957 false
2958 }
2959 }
2960}
2961
2962fn resolve_update_entity_by_logical_id(
2963 runtime: &RedDBRuntime,
2964 table: &str,
2965 logical_id: EntityId,
2966) -> Option<UnifiedEntity> {
2967 let store = runtime.inner.db.store();
2968 if let Some(entity) = crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2969 .resolve_logical_id(&store, table, logical_id)
2970 {
2971 return Some(entity);
2972 }
2973 store.get(table, logical_id)
2976}
2977
2978fn update_cdc_item_kind(
2979 runtime: &RedDBRuntime,
2980 collection: &str,
2981 entity: &UnifiedEntity,
2982) -> &'static str {
2983 match &entity.data {
2984 EntityData::Node(_) => return "node",
2985 EntityData::Edge(_) => return "edge",
2986 _ => {}
2987 }
2988
2989 match runtime
2990 .db()
2991 .collection_contract(collection)
2992 .map(|contract| contract.declared_model)
2993 {
2994 Some(crate::catalog::CollectionModel::Document) => "document",
2995 Some(crate::catalog::CollectionModel::Kv)
2996 | Some(crate::catalog::CollectionModel::Vault) => "kv",
2997 _ => "row",
2998 }
2999}
3000
3001fn ordered_update_target_ids(
3002 manager: &Arc<crate::storage::SegmentManager>,
3003 entity_ids: &[EntityId],
3004 order_by: &[OrderByClause],
3005 limit: Option<usize>,
3006) -> Vec<EntityId> {
3007 let mut entities: Vec<UnifiedEntity> =
3008 manager.get_many(entity_ids).into_iter().flatten().collect();
3009 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3010 if let Some(limit) = limit {
3011 entities.truncate(limit);
3012 }
3013 entities.into_iter().map(|entity| entity.id).collect()
3014}
3015
3016fn compare_update_order(
3017 left: &UnifiedEntity,
3018 right: &UnifiedEntity,
3019 order_by: &[OrderByClause],
3020) -> Ordering {
3021 for clause in order_by {
3022 let left_value = update_order_value(left, &clause.field);
3023 let right_value = update_order_value(right, &clause.field);
3024 let ordering = compare_update_order_values(
3025 left_value.as_ref(),
3026 right_value.as_ref(),
3027 clause.nulls_first,
3028 );
3029 if ordering != Ordering::Equal {
3030 return if clause.ascending {
3031 ordering
3032 } else {
3033 ordering.reverse()
3034 };
3035 }
3036 }
3037 left.logical_id().raw().cmp(&right.logical_id().raw())
3038}
3039
3040fn compare_update_order_values(
3041 left: Option<&Value>,
3042 right: Option<&Value>,
3043 nulls_first: bool,
3044) -> Ordering {
3045 match (left, right) {
3046 (None, None) => Ordering::Equal,
3047 (None, Some(_)) => {
3048 if nulls_first {
3049 Ordering::Less
3050 } else {
3051 Ordering::Greater
3052 }
3053 }
3054 (Some(_), None) => {
3055 if nulls_first {
3056 Ordering::Greater
3057 } else {
3058 Ordering::Less
3059 }
3060 }
3061 (Some(left), Some(right)) => {
3062 crate::storage::query::value_compare::total_compare_values(left, right)
3063 }
3064 }
3065}
3066
3067fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3068 let FieldRef::TableColumn { table, column } = field else {
3069 return None;
3070 };
3071 if !table.is_empty() {
3072 return None;
3073 }
3074 if column.eq_ignore_ascii_case("rid") {
3075 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3076 }
3077 match &entity.data {
3078 EntityData::Row(row) => row.get_field(column).cloned(),
3079 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3080 .and_then(|record| record.get(column).cloned()),
3081 _ => None,
3082 }
3083}
3084
3085fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3086 if columns.is_empty() {
3087 return columns;
3088 }
3089
3090 let mut unique = Vec::with_capacity(columns.len());
3091 for column in columns.drain(..) {
3092 if !unique
3093 .iter()
3094 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3095 {
3096 unique.push(column);
3097 }
3098 }
3099 unique
3100}
3101
3102const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3107
3108fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3109 if column.eq_ignore_ascii_case("_ttl") {
3110 Some(SQL_TTL_METADATA_COLUMNS[0])
3111 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3112 Some(SQL_TTL_METADATA_COLUMNS[1])
3113 } else if column.eq_ignore_ascii_case("_expires_at") {
3114 Some(SQL_TTL_METADATA_COLUMNS[2])
3115 } else {
3116 None
3117 }
3118}
3119
3120fn canonicalize_sql_ttl_metadata(
3125 key: &'static str,
3126 value: MetadataValue,
3127) -> (&'static str, MetadataValue) {
3128 if key != "_ttl" {
3129 return (key, value);
3130 }
3131 let scaled = match value {
3132 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3133 MetadataValue::Timestamp(ms_or_s) => {
3134 MetadataValue::Timestamp(ms_or_s)
3137 }
3138 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3139 other => other,
3140 };
3141 ("_ttl_ms", scaled)
3142}
3143
3144pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3148
3149impl RedDBRuntime {
3150 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3156 match value {
3157 Value::Password(marked) => {
3158 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3159 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3160 } else {
3161 Ok(Value::Password(marked))
3162 }
3163 }
3164 Value::Secret(bytes) => {
3165 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3166 if !self.secret_auto_encrypt() {
3167 return Err(RedDBError::Query(
3168 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3169 is false. Insert pre-encrypted bytes directly instead."
3170 .to_string(),
3171 ));
3172 }
3173 let key = self.secret_aes_key().ok_or_else(|| {
3174 RedDBError::Query(
3175 "SECRET() column encryption requires a bootstrapped \
3176 vault (red.secret.aes_key is missing). Start the server \
3177 with --vault to enable."
3178 .to_string(),
3179 )
3180 })?;
3181 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3182 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3183 } else {
3184 Ok(Value::Secret(bytes))
3185 }
3186 }
3187 other => Ok(other),
3188 }
3189 }
3190}
3191
3192fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3195 let nonce_bytes = crate::auth::store::random_bytes(12);
3196 let mut nonce = [0u8; 12];
3197 nonce.copy_from_slice(&nonce_bytes[..12]);
3198 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3199 let mut out = Vec::with_capacity(12 + ct.len());
3200 out.extend_from_slice(&nonce);
3201 out.extend_from_slice(&ct);
3202 out
3203}
3204
3205pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3209 if payload.len() < 12 {
3210 return None;
3211 }
3212 let mut nonce = [0u8; 12];
3213 nonce.copy_from_slice(&payload[..12]);
3214 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3215}
3216
3217fn split_insert_metadata(
3218 runtime: &RedDBRuntime,
3219 columns: &[String],
3220 values: &[Value],
3221) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3222 let mut fields = Vec::new();
3223 let mut metadata = Vec::new();
3224
3225 for (column, value) in columns.iter().zip(values.iter()) {
3226 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3228 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3229 let (canonical_key, canonical_value) =
3230 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3231 metadata.push((canonical_key.to_string(), canonical_value));
3232 continue;
3233 }
3234 fields.push((
3235 column.clone(),
3236 runtime.resolve_crypto_sentinel(value.clone())?,
3237 ));
3238 }
3239
3240 Ok((fields, metadata))
3241}
3242
3243fn merge_with_clauses(
3245 metadata: &mut Vec<(String, MetadataValue)>,
3246 ttl_ms: Option<u64>,
3247 expires_at_ms: Option<u64>,
3248 with_metadata: &[(String, Value)],
3249) {
3250 if let Some(ms) = ttl_ms {
3251 metadata.push((
3252 "_ttl_ms".to_string(),
3253 if ms <= i64::MAX as u64 {
3254 MetadataValue::Int(ms as i64)
3255 } else {
3256 MetadataValue::Timestamp(ms)
3257 },
3258 ));
3259 }
3260 if let Some(ms) = expires_at_ms {
3261 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3262 }
3263 for (key, value) in with_metadata {
3264 let meta_value = match value {
3265 Value::Text(s) => MetadataValue::String(s.to_string()),
3266 Value::Integer(n) => MetadataValue::Int(*n),
3267 Value::Float(n) => MetadataValue::Float(*n),
3268 Value::Boolean(b) => MetadataValue::Bool(*b),
3269 _ => MetadataValue::String(value.to_string()),
3270 };
3271 metadata.push((key.clone(), meta_value));
3272 }
3273}
3274
3275fn merge_vector_metadata_column(
3276 metadata: &mut Vec<(String, MetadataValue)>,
3277 columns: &[String],
3278 values: &[Value],
3279) -> RedDBResult<()> {
3280 let Some(value) = columns
3281 .iter()
3282 .position(|column| column.eq_ignore_ascii_case("metadata"))
3283 .map(|index| &values[index])
3284 else {
3285 return Ok(());
3286 };
3287 let json = match value {
3288 Value::Null => return Ok(()),
3289 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3290 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3291 })?,
3292 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3293 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3294 })?,
3295 other => {
3296 return Err(RedDBError::Query(format!(
3297 "column 'metadata' expected JSON object, got {other:?}"
3298 )))
3299 }
3300 };
3301 let parsed = metadata_from_json(&json)?;
3302 for (key, value) in parsed.iter() {
3303 metadata.push((key.clone(), value.clone()));
3304 }
3305 Ok(())
3306}
3307
3308fn apply_collection_default_ttl_metadata(
3309 runtime: &RedDBRuntime,
3310 collection: &str,
3311 metadata: &mut Vec<(String, MetadataValue)>,
3312) {
3313 if has_internal_ttl_metadata(metadata) {
3314 return;
3315 }
3316
3317 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3318 return;
3319 };
3320
3321 metadata.push((
3322 "_ttl_ms".to_string(),
3323 if default_ttl_ms <= i64::MAX as u64 {
3324 MetadataValue::Int(default_ttl_ms as i64)
3325 } else {
3326 MetadataValue::Timestamp(default_ttl_ms)
3327 },
3328 ));
3329}
3330
3331fn ensure_non_tree_reserved_metadata_entries(
3332 metadata: &[(String, MetadataValue)],
3333) -> RedDBResult<()> {
3334 for (key, _) in metadata {
3335 ensure_non_tree_reserved_metadata_key(key)?;
3336 }
3337 Ok(())
3338}
3339
3340fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3341 if key.starts_with(TREE_METADATA_PREFIX) {
3342 return Err(RedDBError::Query(format!(
3343 "metadata key '{}' is reserved for managed trees",
3344 key
3345 )));
3346 }
3347 Ok(())
3348}
3349
3350fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3351 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3352 return Err(RedDBError::Query(format!(
3353 "edge label '{}' is reserved for managed trees",
3354 TREE_CHILD_EDGE_LABEL
3355 )));
3356 }
3357 Ok(())
3358}
3359
3360fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3361 let mut columns = Vec::with_capacity(pairs.len());
3362 let mut values = Vec::with_capacity(pairs.len());
3363
3364 for (column, value) in pairs {
3365 columns.push(column.clone());
3366 values.push(value.clone());
3367 }
3368
3369 (columns, values)
3370}
3371
3372fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3374 for (i, col) in columns.iter().enumerate() {
3375 if col.eq_ignore_ascii_case(name) {
3376 return Ok(values[i].clone());
3377 }
3378 }
3379 Err(RedDBError::Query(format!(
3380 "required column '{name}' not found in INSERT"
3381 )))
3382}
3383
3384fn find_column_value_string(
3386 columns: &[String],
3387 values: &[Value],
3388 name: &str,
3389) -> RedDBResult<String> {
3390 let val = find_column_value(columns, values, name)?;
3391 match val {
3392 Value::Text(s) => Ok(s.to_string()),
3393 Value::Integer(n) => Ok(n.to_string()),
3394 Value::Float(n) => Ok(n.to_string()),
3395 other => Err(RedDBError::Query(format!(
3396 "column '{name}' expected text, got {other:?}"
3397 ))),
3398 }
3399}
3400
3401fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3402 let val = find_column_value(columns, values, name)?;
3403 match val {
3404 Value::Float(n) => Ok(n),
3405 Value::Integer(n) => Ok(n as f64),
3406 Value::UnsignedInteger(n) => Ok(n as f64),
3407 Value::Text(s) => s
3408 .parse::<f64>()
3409 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3410 other => Err(RedDBError::Query(format!(
3411 "column '{name}' expected number, got {other:?}"
3412 ))),
3413 }
3414}
3415
3416fn find_column_value_opt_string(
3418 columns: &[String],
3419 values: &[Value],
3420 name: &str,
3421) -> Option<String> {
3422 for (i, col) in columns.iter().enumerate() {
3423 if col.eq_ignore_ascii_case(name) {
3424 return match &values[i] {
3425 Value::Null => None,
3426 Value::Text(s) => Some(s.to_string()),
3427 Value::Integer(n) => Some(n.to_string()),
3428 Value::Float(n) => Some(n.to_string()),
3429 _ => None,
3430 };
3431 }
3432 }
3433 None
3434}
3435
3436fn resolve_edge_endpoint(
3443 store: &crate::storage::unified::UnifiedStore,
3444 collection: &str,
3445 columns: &[String],
3446 values: &[Value],
3447 name: &str,
3448) -> RedDBResult<u64> {
3449 let val = find_column_value(columns, values, name)?;
3450 match val {
3451 Value::Integer(n) => Ok(n as u64),
3452 Value::UnsignedInteger(n) => Ok(n),
3453 Value::Text(s) => {
3454 if let Ok(n) = s.parse::<u64>() {
3455 return Ok(n);
3456 }
3457 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3458 match matches.len() {
3459 0 => Err(RedDBError::Query(format!(
3460 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3461 ))),
3462 1 => Ok(matches[0].raw()),
3463 n => Err(RedDBError::Query(format!(
3464 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3465 ))),
3466 }
3467 }
3468 other => Err(RedDBError::Query(format!(
3469 "column '{name}' expected integer or node label, got {other:?}"
3470 ))),
3471 }
3472}
3473
3474fn resolve_edge_endpoint_any(
3475 store: &crate::storage::unified::UnifiedStore,
3476 collection: &str,
3477 columns: &[String],
3478 values: &[Value],
3479 names: &[&str],
3480) -> RedDBResult<u64> {
3481 for name in names {
3482 if columns
3483 .iter()
3484 .any(|column| column.eq_ignore_ascii_case(name))
3485 {
3486 return resolve_edge_endpoint(store, collection, columns, values, name);
3487 }
3488 }
3489
3490 Err(RedDBError::Query(format!(
3491 "required column '{}' not found in INSERT",
3492 names.first().copied().unwrap_or("from_rid")
3493 )))
3494}
3495
3496fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3498 let val = find_column_value(columns, values, name)?;
3499 match val {
3500 Value::Integer(n) => Ok(n as u64),
3501 Value::UnsignedInteger(n) => Ok(n),
3502 Value::Text(s) => s
3503 .parse::<u64>()
3504 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3505 other => Err(RedDBError::Query(format!(
3506 "column '{name}' expected integer, got {other:?}"
3507 ))),
3508 }
3509}
3510
3511fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3513 for (i, col) in columns.iter().enumerate() {
3514 if col.eq_ignore_ascii_case(name) {
3515 return match &values[i] {
3516 Value::Float(n) => Some(*n as f32),
3517 Value::Integer(n) => Some(*n as f32),
3518 Value::Null => None,
3519 _ => None,
3520 };
3521 }
3522 }
3523 None
3524}
3525
3526fn find_column_value_vec_f32(
3528 columns: &[String],
3529 values: &[Value],
3530 name: &str,
3531) -> RedDBResult<Vec<f32>> {
3532 let val = find_column_value(columns, values, name)?;
3533 match val {
3534 Value::Vector(v) => Ok(v),
3535 Value::Json(bytes) => {
3536 let s = std::str::from_utf8(&bytes).map_err(|_| {
3538 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3539 })?;
3540 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3541 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3542 })?;
3543 Ok(arr)
3544 }
3545 other => Err(RedDBError::Query(format!(
3546 "column '{name}' expected vector, got {other:?}"
3547 ))),
3548 }
3549}
3550
3551fn find_column_value_vec_f32_any(
3552 columns: &[String],
3553 values: &[Value],
3554 names: &[&str],
3555) -> RedDBResult<Vec<f32>> {
3556 for name in names {
3557 if columns
3558 .iter()
3559 .any(|column| column.eq_ignore_ascii_case(name))
3560 {
3561 return find_column_value_vec_f32(columns, values, name);
3562 }
3563 }
3564 Err(RedDBError::Query(format!(
3565 "required vector column '{}' not found in INSERT",
3566 names.join("' or '")
3567 )))
3568}
3569
3570fn extract_remaining_properties(
3572 columns: &[String],
3573 values: &[Value],
3574 exclude: &[&str],
3575) -> Vec<(String, Value)> {
3576 columns
3577 .iter()
3578 .zip(values.iter())
3579 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3580 .map(|(col, val)| (col.clone(), val.clone()))
3581 .collect()
3582}
3583
3584fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3585 let mut invalid = Vec::new();
3586 for column in columns {
3587 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3588 invalid.push(column.clone());
3589 }
3590 }
3591
3592 if invalid.is_empty() {
3593 Ok(())
3594 } else {
3595 Err(RedDBError::Query(format!(
3596 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3597 invalid.join(", ")
3598 )))
3599 }
3600}
3601
3602fn is_timeseries_insert_column(column: &str) -> bool {
3603 matches!(
3604 column.to_ascii_lowercase().as_str(),
3605 "metric"
3606 | "value"
3607 | "tags"
3608 | "timestamp"
3609 | "timestamp_ns"
3610 | "time"
3611 | "event_name"
3616 | "payload"
3617 )
3618}
3619
3620fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3621 let mut found = None;
3622
3623 for alias in ["timestamp_ns", "timestamp", "time"] {
3624 for (index, column) in columns.iter().enumerate() {
3625 if !column.eq_ignore_ascii_case(alias) {
3626 continue;
3627 }
3628
3629 if found.is_some() {
3630 return Err(RedDBError::Query(
3631 "timeseries INSERT accepts only one timestamp column".to_string(),
3632 ));
3633 }
3634
3635 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3636 }
3637 }
3638
3639 Ok(found)
3640}
3641
3642fn find_timeseries_tags(
3643 columns: &[String],
3644 values: &[Value],
3645) -> RedDBResult<std::collections::HashMap<String, String>> {
3646 for (index, column) in columns.iter().enumerate() {
3647 if column.eq_ignore_ascii_case("tags") {
3648 return parse_timeseries_tags(&values[index]);
3649 }
3650 }
3651 Ok(std::collections::HashMap::new())
3652}
3653
3654fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3655 match value {
3656 Value::Null => Ok(std::collections::HashMap::new()),
3657 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3658 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3659 other => Err(RedDBError::Query(format!(
3660 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3661 ))),
3662 }
3663}
3664
3665fn parse_timeseries_tags_json(
3666 bytes: &[u8],
3667) -> RedDBResult<std::collections::HashMap<String, String>> {
3668 let json: crate::json::Value = crate::json::from_slice(bytes)
3669 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3670
3671 let object = match json {
3672 crate::json::Value::Object(object) => object,
3673 other => {
3674 return Err(RedDBError::Query(format!(
3675 "timeseries tags must be a JSON object, got {other:?}"
3676 )))
3677 }
3678 };
3679
3680 let mut tags = std::collections::HashMap::with_capacity(object.len());
3681 for (key, value) in object {
3682 tags.insert(key, json_tag_value_to_string(&value));
3683 }
3684 Ok(tags)
3685}
3686
3687fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3703 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3704 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3705 buf.push_str(&value.to_string_compact());
3706 buf
3707}
3708
3709fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3710 match value {
3711 Value::UnsignedInteger(value) => Ok(*value),
3712 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3713 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3714 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3715 RedDBError::Query(format!(
3716 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3717 ))
3718 }),
3719 other => Err(RedDBError::Query(format!(
3720 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3721 ))),
3722 }
3723}
3724
3725fn current_unix_ns() -> u64 {
3726 std::time::SystemTime::now()
3727 .duration_since(std::time::UNIX_EPOCH)
3728 .unwrap_or_default()
3729 .as_nanos()
3730 .min(u128::from(u64::MAX)) as u64
3731}
3732
3733fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3734 use crate::json::{Map, Value as JV};
3735 match value {
3736 MetadataValue::Null => JV::Null,
3737 MetadataValue::Bool(value) => JV::Bool(*value),
3738 MetadataValue::Int(value) => JV::Number(*value as f64),
3739 MetadataValue::Float(value) => JV::Number(*value),
3740 MetadataValue::String(value) => JV::String(value.clone()),
3741 MetadataValue::Bytes(value) => JV::Array(
3742 value
3743 .iter()
3744 .map(|value| JV::Number(*value as f64))
3745 .collect(),
3746 ),
3747 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3748 MetadataValue::Array(values) => {
3749 JV::Array(values.iter().map(metadata_value_to_json).collect())
3750 }
3751 MetadataValue::Object(object) => {
3752 let entries = object
3753 .iter()
3754 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3755 .collect();
3756 JV::Object(entries)
3757 }
3758 MetadataValue::Geo { lat, lon } => {
3759 let mut object = Map::new();
3760 object.insert("lat".to_string(), JV::Number(*lat));
3761 object.insert("lon".to_string(), JV::Number(*lon));
3762 JV::Object(object)
3763 }
3764 MetadataValue::Reference(target) => {
3765 let mut object = Map::new();
3766 object.insert(
3767 "collection".to_string(),
3768 JV::String(target.collection().to_string()),
3769 );
3770 object.insert(
3771 "entity_id".to_string(),
3772 JV::Number(target.entity_id().raw() as f64),
3773 );
3774 JV::Object(object)
3775 }
3776 MetadataValue::References(values) => {
3777 let refs = values
3778 .iter()
3779 .map(|target| {
3780 let mut object = Map::new();
3781 object.insert(
3782 "collection".to_string(),
3783 JV::String(target.collection().to_string()),
3784 );
3785 object.insert(
3786 "entity_id".to_string(),
3787 JV::Number(target.entity_id().raw() as f64),
3788 );
3789 JV::Object(object)
3790 })
3791 .collect();
3792 JV::Array(refs)
3793 }
3794 }
3795}
3796
3797fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3798 match value {
3799 Value::Null => MetadataValue::Null,
3800 Value::Boolean(value) => MetadataValue::Bool(*value),
3801 Value::Integer(value) => MetadataValue::Int(*value),
3802 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3803 Value::Float(value) => MetadataValue::Float(*value),
3804 Value::Text(value) => MetadataValue::String(value.to_string()),
3805 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3806 Value::Timestamp(value) => {
3807 if *value >= 0 {
3808 metadata_u64_to_value(*value as u64)
3809 } else {
3810 MetadataValue::Int(*value)
3811 }
3812 }
3813 Value::TimestampMs(value) => {
3814 if *value >= 0 {
3815 metadata_u64_to_value(*value as u64)
3816 } else {
3817 MetadataValue::Int(*value)
3818 }
3819 }
3820 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3821 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3822 Value::Date(value) => MetadataValue::String(value.to_string()),
3823 Value::Time(value) => MetadataValue::String(value.to_string()),
3824 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3825 Value::Ipv4(value) => MetadataValue::String(format!(
3826 "{}.{}.{}.{}",
3827 (value >> 24) & 0xFF,
3828 (value >> 16) & 0xFF,
3829 (value >> 8) & 0xFF,
3830 value & 0xFF
3831 )),
3832 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3833 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3834 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3835 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3836 lat: *lat as f64 / 1_000_000.0,
3837 lon: *lon as f64 / 1_000_000.0,
3838 },
3839 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3840 Value::TableRef(value) => MetadataValue::String(value.clone()),
3841 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3842 Value::Password(value) => MetadataValue::String(value.clone()),
3843 Value::Array(values) => {
3844 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3845 }
3846 _ => MetadataValue::String(value.to_string()),
3847 }
3848}
3849
3850fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3851 match value {
3852 Value::Null => Ok(MetadataValue::Null),
3853 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3854 Value::Integer(_) => Err(RedDBError::Query(format!(
3855 "column '{field}' must be non-negative for TTL metadata"
3856 ))),
3857 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3858 Value::Float(value) if value.is_finite() => {
3859 if value.fract().abs() >= f64::EPSILON {
3860 return Err(RedDBError::Query(format!(
3861 "column '{field}' must be an integer (TTL metadata must be an integer)"
3862 )));
3863 }
3864 if *value < 0.0 {
3865 return Err(RedDBError::Query(format!(
3866 "column '{field}' must be non-negative for TTL metadata"
3867 )));
3868 }
3869 if *value > u64::MAX as f64 {
3870 return Err(RedDBError::Query(format!(
3871 "column '{field}' value is too large"
3872 )));
3873 }
3874 Ok(metadata_u64_to_value(*value as u64))
3875 }
3876 Value::Float(_) => Err(RedDBError::Query(format!(
3877 "column '{field}' must be a finite number"
3878 ))),
3879 Value::Text(value) => {
3880 let value = value.trim();
3881 if let Ok(value) = value.parse::<u64>() {
3882 Ok(metadata_u64_to_value(value))
3883 } else if let Ok(value) = value.parse::<i64>() {
3884 if value < 0 {
3885 return Err(RedDBError::Query(format!(
3886 "column '{field}' must be non-negative for TTL metadata"
3887 )));
3888 }
3889 Ok(metadata_u64_to_value(value as u64))
3890 } else if let Ok(value) = value.parse::<f64>() {
3891 if !value.is_finite() {
3892 return Err(RedDBError::Query(format!(
3893 "column '{field}' must be a finite number"
3894 )));
3895 }
3896 if value.fract().abs() >= f64::EPSILON {
3897 return Err(RedDBError::Query(format!(
3898 "column '{field}' must be an integer (TTL metadata must be an integer)"
3899 )));
3900 }
3901 if value < 0.0 {
3902 return Err(RedDBError::Query(format!(
3903 "column '{field}' must be non-negative for TTL metadata"
3904 )));
3905 }
3906 if value > u64::MAX as f64 {
3907 return Err(RedDBError::Query(format!(
3908 "column '{field}' value is too large"
3909 )));
3910 }
3911 Ok(metadata_u64_to_value(value as u64))
3912 } else {
3913 Err(RedDBError::Query(format!(
3914 "column '{field}' expects a numeric value for TTL metadata"
3915 )))
3916 }
3917 }
3918 _ => Err(RedDBError::Query(format!(
3919 "column '{field}' expects a numeric value for TTL metadata"
3920 ))),
3921 }
3922}
3923
3924fn metadata_u64_to_value(value: u64) -> MetadataValue {
3925 if value <= i64::MAX as u64 {
3926 MetadataValue::Int(value as i64)
3927 } else {
3928 MetadataValue::Timestamp(value)
3929 }
3930}
3931
3932fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3938 let json = match value {
3939 Value::Null => return false,
3940 Value::Json(bytes) | Value::Blob(bytes) => {
3941 match crate::json::from_slice::<crate::json::Value>(bytes) {
3942 Ok(v) => v,
3943 Err(_) => return false,
3944 }
3945 }
3946 Value::Text(s) => {
3947 let trimmed = s.trim_start();
3948 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3949 return false;
3950 }
3951 match crate::json::from_str::<crate::json::Value>(s) {
3952 Ok(v) => v,
3953 Err(_) => return false,
3954 }
3955 }
3956 _ => return false,
3957 };
3958 let mut cursor = &json;
3959 for seg in tail.split('.') {
3960 match cursor {
3961 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3962 Some((_, v)) => cursor = v,
3963 None => return false,
3964 },
3965 _ => return false,
3966 }
3967 }
3968 !matches!(cursor, crate::json::Value::Null)
3969}
3970
3971fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3982 let mut root = match current {
3983 Value::Null => crate::json::Value::Object(Default::default()),
3984 Value::Json(bytes) | Value::Blob(bytes) => {
3985 crate::json::from_slice(&bytes).map_err(|err| {
3986 RedDBError::Query(format!(
3987 "tenant auto-fill: root column is not valid JSON ({err})"
3988 ))
3989 })?
3990 }
3991 Value::Text(s) => {
3992 if s.trim().is_empty() {
3993 crate::json::Value::Object(Default::default())
3994 } else {
3995 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3996 RedDBError::Query(format!(
3997 "tenant auto-fill: text root is not valid JSON ({err})"
3998 ))
3999 })?
4000 }
4001 }
4002 other => {
4003 return Err(RedDBError::Query(format!(
4004 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4005 )));
4006 }
4007 };
4008
4009 let segments: Vec<&str> = tail.split('.').collect();
4011 let mut cursor: &mut crate::json::Value = &mut root;
4012 for (i, seg) in segments.iter().enumerate() {
4013 let is_last = i + 1 == segments.len();
4014 let map = match cursor {
4015 crate::json::Value::Object(m) => m,
4016 _ => {
4017 return Err(RedDBError::Query(format!(
4018 "tenant auto-fill: segment '{seg}' is not inside an object"
4019 )));
4020 }
4021 };
4022 if is_last {
4023 map.insert(
4024 seg.to_string(),
4025 crate::json::Value::String(tenant_id.to_string()),
4026 );
4027 break;
4028 }
4029 cursor = map
4030 .entry(seg.to_string())
4031 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4032 }
4033
4034 let bytes = crate::json::to_vec(&root).map_err(|err| {
4035 RedDBError::Query(format!(
4036 "tenant auto-fill: failed to re-serialize JSON ({err})"
4037 ))
4038 })?;
4039 Ok(Value::Json(bytes))
4040}
4041
4042#[cfg(test)]
4043mod tests {
4044 use crate::storage::schema::Value;
4045 use crate::storage::wal::{WalReader, WalRecord};
4046 use crate::{RedDBOptions, RedDBRuntime};
4047 use std::path::Path;
4048
4049 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4050 WalReader::open(wal_path)
4051 .expect("wal opens")
4052 .iter()
4053 .map(|record| record.expect("wal record decodes").1)
4054 .filter_map(|record| match record {
4055 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4056 _ => None,
4057 })
4058 .collect()
4059 }
4060
4061 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4062 action
4063 .windows(needle.len())
4064 .any(|window| window == needle.as_bytes())
4065 }
4066
4067 fn assert_statement_writes_collections_in_one_new_wal_batch(
4068 rt: &RedDBRuntime,
4069 wal_path: &Path,
4070 statement: &str,
4071 source: &str,
4072 event_queue: &str,
4073 ) {
4074 let before_batches = store_commit_batches(wal_path).len();
4075
4076 rt.execute_query(statement).unwrap();
4077
4078 let batches = store_commit_batches(wal_path);
4079 let statement_batches = &batches[before_batches..];
4080 let source_batch = statement_batches
4081 .iter()
4082 .position(|actions| {
4083 actions.iter().any(|action| {
4084 action_contains_text(action, source)
4085 && !action_contains_text(action, event_queue)
4086 })
4087 })
4088 .expect("source collection write batch is present");
4089 let event_batch = statement_batches
4090 .iter()
4091 .position(|actions| {
4092 actions
4093 .iter()
4094 .any(|action| action_contains_text(action, event_queue))
4095 })
4096 .expect("event queue write batch is present");
4097
4098 assert_eq!(
4099 source_batch, event_batch,
4100 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4101 );
4102 }
4103
4104 #[test]
4105 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4106 let dir = tempfile::tempdir().unwrap();
4107 let db_path = dir.path().join("events_dual_write.rdb");
4108 let wal_path = db_path.with_extension("rdb-uwal");
4109 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4110
4111 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4112 .unwrap();
4113 assert_statement_writes_collections_in_one_new_wal_batch(
4114 &rt,
4115 &wal_path,
4116 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4117 "users",
4118 "users_events",
4119 );
4120 }
4121
4122 #[test]
4123 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4124 let dir = tempfile::tempdir().unwrap();
4125 let db_path = dir.path().join("events_update_atomic.rdb");
4126 let wal_path = db_path.with_extension("rdb-uwal");
4127 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4128
4129 rt.execute_query(
4130 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4131 )
4132 .unwrap();
4133 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4134 .unwrap();
4135
4136 assert_statement_writes_collections_in_one_new_wal_batch(
4137 &rt,
4138 &wal_path,
4139 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4140 "users",
4141 "user_updates",
4142 );
4143 }
4144
4145 #[test]
4146 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4147 let dir = tempfile::tempdir().unwrap();
4148 let db_path = dir.path().join("events_delete_atomic.rdb");
4149 let wal_path = db_path.with_extension("rdb-uwal");
4150 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4151
4152 rt.execute_query(
4153 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4154 )
4155 .unwrap();
4156 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4157 .unwrap();
4158
4159 assert_statement_writes_collections_in_one_new_wal_batch(
4160 &rt,
4161 &wal_path,
4162 "DELETE FROM users WHERE id = 1",
4163 "users",
4164 "user_deletes",
4165 );
4166 }
4167
4168 #[test]
4169 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4170 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4171 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4172 .unwrap();
4173 for id in 0..5 {
4174 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4175 .unwrap();
4176 }
4177 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4178 .unwrap();
4179
4180 let updated = rt
4181 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4182 .unwrap();
4183 assert_eq!(updated.affected_rows, 3);
4184
4185 let selected = rt
4186 .execute_query("SELECT id, score FROM users ORDER BY id")
4187 .unwrap();
4188 let scores: Vec<(i64, i64)> = selected
4189 .result
4190 .records
4191 .iter()
4192 .map(|record| {
4193 let id = match record.get("id").unwrap() {
4194 Value::Integer(value) => *value,
4195 other => panic!("expected integer id, got {other:?}"),
4196 };
4197 let score = match record.get("score").unwrap() {
4198 Value::Integer(value) => *value,
4199 other => panic!("expected integer score, got {other:?}"),
4200 };
4201 (id, score)
4202 })
4203 .collect();
4204 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4205 }
4206
4207 #[test]
4214 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4215 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4216 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4217 .unwrap();
4218 for id in 0..5 {
4219 rt.execute_query(&format!(
4220 "INSERT INTO items (id, score) VALUES ({id}, {})",
4221 id * 10
4222 ))
4223 .unwrap();
4224 }
4225 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4226 .unwrap();
4227
4228 let updated_one = rt
4232 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4233 .unwrap();
4234 assert_eq!(updated_one.affected_rows, 1);
4235
4236 let updated_many = rt
4240 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4241 .unwrap();
4242 assert_eq!(updated_many.affected_rows, 2);
4243
4244 let snapshot = rt
4245 .execute_query("SELECT id, score FROM items ORDER BY id")
4246 .unwrap();
4247 let pairs: Vec<(i64, i64)> = snapshot
4248 .result
4249 .records
4250 .iter()
4251 .map(|record| {
4252 let id = match record.get("id").unwrap() {
4253 Value::Integer(value) => *value,
4254 other => panic!("expected integer id, got {other:?}"),
4255 };
4256 let score = match record.get("score").unwrap() {
4257 Value::Integer(value) => *value,
4258 other => panic!("expected integer score, got {other:?}"),
4259 };
4260 (id, score)
4261 })
4262 .collect();
4263 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4264
4265 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4267 assert_eq!(updated_all.affected_rows, 5);
4268 let after = rt
4269 .execute_query("SELECT score FROM items ORDER BY id")
4270 .unwrap();
4271 let scores: Vec<i64> = after
4272 .result
4273 .records
4274 .iter()
4275 .map(|record| match record.get("score").unwrap() {
4276 Value::Integer(value) => *value,
4277 other => panic!("expected integer score, got {other:?}"),
4278 })
4279 .collect();
4280 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4281 }
4282
4283 #[test]
4289 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4290 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4291 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4292 .unwrap();
4293 for id in 0..5 {
4294 rt.execute_query(&format!(
4295 "INSERT INTO items (id, score) VALUES ({id}, {})",
4296 id * 10
4297 ))
4298 .unwrap();
4299 }
4300 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4301 .unwrap();
4302
4303 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4306 assert_eq!(deleted_one.affected_rows, 1);
4307
4308 let deleted_many = rt
4312 .execute_query("DELETE FROM items WHERE score > 25")
4313 .unwrap();
4314 assert_eq!(deleted_many.affected_rows, 2);
4315
4316 let surviving = rt
4317 .execute_query("SELECT id FROM items ORDER BY id")
4318 .unwrap();
4319 let ids: Vec<i64> = surviving
4320 .result
4321 .records
4322 .iter()
4323 .map(|record| match record.get("id").unwrap() {
4324 Value::Integer(value) => *value,
4325 other => panic!("expected integer id, got {other:?}"),
4326 })
4327 .collect();
4328 assert_eq!(ids, vec![0, 1]);
4329
4330 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4332 assert_eq!(deleted_rest.affected_rows, 2);
4333 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4334 assert!(empty.result.records.is_empty());
4335 }
4336
4337 #[test]
4342 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4343 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4344 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4345 .unwrap();
4346
4347 let inserted = rt
4350 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4351 .unwrap();
4352 assert_eq!(inserted.affected_rows, 1);
4353
4354 let update_err = rt
4356 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4357 .unwrap_err();
4358 let msg = format!("{update_err}");
4359 assert!(
4360 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4361 "expected UPDATE rejection message, got: {msg}"
4362 );
4363
4364 let delete_err = rt
4366 .execute_query("DELETE FROM events WHERE id = 1")
4367 .unwrap_err();
4368 let msg = format!("{delete_err}");
4369 assert!(
4370 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4371 "expected DELETE rejection message, got: {msg}"
4372 );
4373
4374 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4377 assert_eq!(surviving.result.records.len(), 1);
4378 }
4379
4380 #[test]
4384 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4385 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4386 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4387 .unwrap();
4388
4389 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4390 .unwrap();
4391 let updated = rt
4392 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4393 .unwrap();
4394 assert_eq!(updated.affected_rows, 1);
4395 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4396 assert_eq!(deleted.affected_rows, 1);
4397 }
4398
4399 #[test]
4400 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4401 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4402 rt.execute_query(
4403 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4404 )
4405 .unwrap();
4406
4407 let inserted = rt
4408 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4409 .unwrap();
4410 assert_eq!(inserted.affected_rows, 1);
4411
4412 let events = queue_payloads(&rt, "audit_log");
4413 assert_eq!(events.len(), 1);
4414 let event = events[0].as_object().expect("event payload object");
4415 assert!(event
4416 .get("event_id")
4417 .and_then(crate::json::Value::as_str)
4418 .is_some_and(|value| !value.is_empty()));
4419 assert_eq!(
4420 event.get("op").and_then(crate::json::Value::as_str),
4421 Some("insert")
4422 );
4423 assert_eq!(
4424 event.get("collection").and_then(crate::json::Value::as_str),
4425 Some("users")
4426 );
4427 assert_eq!(
4428 event.get("id").and_then(crate::json::Value::as_u64),
4429 Some(7)
4430 );
4431 assert!(event
4432 .get("ts")
4433 .and_then(crate::json::Value::as_u64)
4434 .is_some());
4435 assert!(event
4436 .get("lsn")
4437 .and_then(crate::json::Value::as_u64)
4438 .is_some());
4439 assert!(matches!(
4440 event.get("tenant"),
4441 Some(crate::json::Value::Null)
4442 ));
4443 assert!(matches!(
4444 event.get("before"),
4445 Some(crate::json::Value::Null)
4446 ));
4447 let after = event
4448 .get("after")
4449 .and_then(crate::json::Value::as_object)
4450 .expect("after object");
4451 assert_eq!(
4452 after.get("id").and_then(crate::json::Value::as_u64),
4453 Some(7)
4454 );
4455 assert_eq!(
4456 after.get("email").and_then(crate::json::Value::as_str),
4457 Some("a@example.com")
4458 );
4459 }
4460
4461 #[test]
4462 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4463 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4464 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4465 .unwrap();
4466
4467 rt.execute_query(
4468 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4469 )
4470 .unwrap();
4471
4472 let events = queue_payloads(&rt, "users_events");
4473 assert_eq!(events.len(), 2);
4474 let mut previous_lsn = 0;
4475 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4476 let object = event.as_object().expect("event payload object");
4477 assert_eq!(
4478 object.get("op").and_then(crate::json::Value::as_str),
4479 Some("insert")
4480 );
4481 assert_eq!(
4482 object.get("id").and_then(crate::json::Value::as_u64),
4483 Some(expected_id)
4484 );
4485 let lsn = object
4486 .get("lsn")
4487 .and_then(crate::json::Value::as_u64)
4488 .expect("event lsn");
4489 assert!(
4490 lsn > previous_lsn,
4491 "event LSNs should increase in row order"
4492 );
4493 previous_lsn = lsn;
4494 let after = object
4495 .get("after")
4496 .and_then(crate::json::Value::as_object)
4497 .expect("after object");
4498 assert_eq!(
4499 after.get("id").and_then(crate::json::Value::as_u64),
4500 Some(expected_id)
4501 );
4502 }
4503 }
4504
4505 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4506 let result = rt
4507 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4508 .expect("peek queue");
4509 result
4510 .result
4511 .records
4512 .iter()
4513 .map(
4514 |record| match record.get("payload").expect("payload column") {
4515 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4516 other => panic!("expected JSON queue payload, got {other:?}"),
4517 },
4518 )
4519 .collect()
4520 }
4521
4522 #[test]
4534 fn auto_index_id_fires_on_first_insert() {
4535 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4536 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4537 .unwrap();
4538
4539 assert!(
4541 rt.index_store_ref()
4542 .find_index_for_column("bench_users", "id")
4543 .is_none(),
4544 "freshly created collection should not have an `id` index"
4545 );
4546
4547 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4549 .unwrap();
4550
4551 let registered = rt
4553 .index_store_ref()
4554 .find_index_for_column("bench_users", "id")
4555 .expect("auto-index hook should have registered idx_id on first insert");
4556 assert_eq!(registered.name, "idx_id");
4557 assert_eq!(registered.collection, "bench_users");
4558 assert_eq!(registered.columns, vec!["id".to_string()]);
4559 assert!(matches!(
4560 registered.method,
4561 super::super::index_store::IndexMethodKind::Hash
4562 ));
4563
4564 for id in 2..=5 {
4567 rt.execute_query(&format!(
4568 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4569 id * 10
4570 ))
4571 .unwrap();
4572 }
4573 for id in 1..=5 {
4574 let result = rt
4575 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4576 .unwrap();
4577 assert_eq!(
4578 result.result.records.len(),
4579 1,
4580 "id={id} should match one row"
4581 );
4582 }
4583
4584 let deleted = rt
4589 .execute_query("DELETE FROM bench_users WHERE id = 3")
4590 .unwrap();
4591 assert_eq!(deleted.affected_rows, 1);
4592 }
4593
4594 #[test]
4599 fn auto_index_id_fires_on_first_bulk_insert() {
4600 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4601 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4602 .unwrap();
4603
4604 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4605 .unwrap();
4606
4607 let registered = rt
4608 .index_store_ref()
4609 .find_index_for_column("bench_bulk", "id")
4610 .expect("auto-index hook should fire on first bulk insert");
4611 assert_eq!(registered.name, "idx_id");
4612
4613 for id in 1..=3 {
4615 let result = rt
4616 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4617 .unwrap();
4618 assert_eq!(result.result.records.len(), 1);
4619 }
4620 }
4621
4622 #[test]
4626 fn auto_index_id_skips_when_no_id_column() {
4627 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4628 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4629 .unwrap();
4630 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4631 .unwrap();
4632
4633 assert!(rt
4634 .index_store_ref()
4635 .find_index_for_column("plain", "id")
4636 .is_none());
4637 assert!(rt
4638 .index_store_ref()
4639 .find_index_for_column("plain", "uid")
4640 .is_none());
4641 }
4642
4643 #[test]
4648 fn auto_index_id_skips_when_index_already_exists() {
4649 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4650 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4651 .unwrap();
4652 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4654 .unwrap();
4655 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4656 .unwrap();
4657
4658 let registered = rt
4659 .index_store_ref()
4660 .find_index_for_column("pre", "id")
4661 .expect("user index should still be there");
4662 assert_eq!(
4663 registered.name, "user_idx",
4664 "auto-index hook must not overwrite an existing index"
4665 );
4666 }
4667
4668 #[test]
4672 fn auto_index_id_dropped_with_collection() {
4673 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4674 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4675 .unwrap();
4676 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4677 .unwrap();
4678 assert!(rt
4679 .index_store_ref()
4680 .find_index_for_column("ephemeral", "id")
4681 .is_some());
4682
4683 rt.execute_query("DROP TABLE ephemeral").unwrap();
4684
4685 assert!(
4686 rt.index_store_ref()
4687 .find_index_for_column("ephemeral", "id")
4688 .is_none(),
4689 "implicit `idx_id` must be reaped when its collection drops"
4690 );
4691 }
4692
4693 #[test]
4698 fn auto_index_id_disabled_by_config() {
4699 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4700 let rt = RedDBRuntime::with_options(opts).unwrap();
4701
4702 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4703 .unwrap();
4704 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4705 .unwrap();
4706
4707 assert!(
4708 rt.index_store_ref()
4709 .find_index_for_column("off", "id")
4710 .is_none(),
4711 "with auto_index_id=false, no implicit index should be created"
4712 );
4713 }
4714
4715 #[test]
4718 fn update_single_row_emits_update_event() {
4719 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4720 rt.execute_query(
4721 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4722 )
4723 .unwrap();
4724 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4725 .unwrap();
4726
4727 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4728 .unwrap();
4729
4730 let events = queue_payloads(&rt, "audit_log");
4731 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4732 let event = events[0].as_object().expect("event payload object");
4733 assert_eq!(
4734 event.get("op").and_then(crate::json::Value::as_str),
4735 Some("update")
4736 );
4737 assert_eq!(
4738 event.get("collection").and_then(crate::json::Value::as_str),
4739 Some("users")
4740 );
4741 assert!(event
4742 .get("event_id")
4743 .and_then(crate::json::Value::as_str)
4744 .is_some_and(|v| !v.is_empty()));
4745 let before = event
4746 .get("before")
4747 .and_then(crate::json::Value::as_object)
4748 .expect("before must be an object");
4749 let after = event
4750 .get("after")
4751 .and_then(crate::json::Value::as_object)
4752 .expect("after must be an object");
4753 assert_eq!(
4754 before.get("name").and_then(crate::json::Value::as_str),
4755 Some("Alice"),
4756 "before.name should be the old value"
4757 );
4758 assert_eq!(
4759 after.get("name").and_then(crate::json::Value::as_str),
4760 Some("Bob"),
4761 "after.name should be the new value"
4762 );
4763 }
4764
4765 #[test]
4766 fn update_event_only_includes_changed_fields() {
4767 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4768 rt.execute_query(
4769 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4770 )
4771 .unwrap();
4772 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4773 .unwrap();
4774
4775 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4776 .unwrap();
4777
4778 let events = queue_payloads(&rt, "evts");
4779 assert_eq!(events.len(), 1);
4780 let event = events[0].as_object().unwrap();
4781 let before = event
4782 .get("before")
4783 .and_then(crate::json::Value::as_object)
4784 .unwrap();
4785 let after = event
4786 .get("after")
4787 .and_then(crate::json::Value::as_object)
4788 .unwrap();
4789 assert!(
4791 before.contains_key("name"),
4792 "before must include changed field"
4793 );
4794 assert!(
4795 after.contains_key("name"),
4796 "after must include changed field"
4797 );
4798 assert!(
4800 !before.contains_key("email"),
4801 "before must not include unchanged email"
4802 );
4803 assert!(
4804 !after.contains_key("email"),
4805 "after must not include unchanged email"
4806 );
4807 }
4808
4809 #[test]
4810 fn multi_row_update_emits_one_event_per_row() {
4811 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4812 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4813 .unwrap();
4814 rt.execute_query(
4815 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4816 )
4817 .unwrap();
4818
4819 rt.execute_query("UPDATE items SET status = 'done'")
4820 .unwrap();
4821
4822 let events = queue_payloads(&rt, "evts");
4823 assert_eq!(events.len(), 3, "expected one update event per row");
4824 for event in &events {
4825 let obj = event.as_object().unwrap();
4826 assert_eq!(
4827 obj.get("op").and_then(crate::json::Value::as_str),
4828 Some("update")
4829 );
4830 }
4831 }
4832
4833 #[test]
4834 fn delete_single_row_emits_delete_event() {
4835 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4836 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4837 .unwrap();
4838 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4839 .unwrap();
4840
4841 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4842
4843 let events = queue_payloads(&rt, "del_log");
4844 assert_eq!(events.len(), 1);
4845 let event = events[0].as_object().expect("event payload object");
4846 assert_eq!(
4847 event.get("op").and_then(crate::json::Value::as_str),
4848 Some("delete")
4849 );
4850 assert_eq!(
4851 event.get("collection").and_then(crate::json::Value::as_str),
4852 Some("users")
4853 );
4854 assert!(event
4855 .get("event_id")
4856 .and_then(crate::json::Value::as_str)
4857 .is_some_and(|v| !v.is_empty()));
4858 let before = event
4859 .get("before")
4860 .and_then(crate::json::Value::as_object)
4861 .expect("before must be an object for delete");
4862 assert_eq!(
4863 before.get("id").and_then(crate::json::Value::as_u64),
4864 Some(42)
4865 );
4866 assert_eq!(
4867 before.get("name").and_then(crate::json::Value::as_str),
4868 Some("Alice")
4869 );
4870 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4871 }
4872
4873 #[test]
4874 fn multi_row_delete_emits_one_event_per_row() {
4875 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4876 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4877 .unwrap();
4878 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4879 .unwrap();
4880
4881 rt.execute_query("DELETE FROM items").unwrap();
4882
4883 let events = queue_payloads(&rt, "del_log");
4884 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4885 for event in &events {
4886 let obj = event.as_object().unwrap();
4887 assert_eq!(
4888 obj.get("op").and_then(crate::json::Value::as_str),
4889 Some("delete")
4890 );
4891 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4892 }
4893 }
4894
4895 #[test]
4896 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4897 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4898 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4899 .unwrap();
4900
4901 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4902 .unwrap();
4903 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4904
4905 let events = queue_payloads(&rt, "evts");
4906 assert!(
4907 events.is_empty(),
4908 "UPDATE-only filter must not emit INSERT or DELETE events"
4909 );
4910 }
4911
4912 #[test]
4915 fn suppress_events_on_insert_emits_no_events() {
4916 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4917 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4918 .unwrap();
4919
4920 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4921 .unwrap();
4922
4923 let events = queue_payloads(&rt, "evts");
4924 assert!(
4925 events.is_empty(),
4926 "SUPPRESS EVENTS must prevent INSERT events"
4927 );
4928 }
4929
4930 #[test]
4931 fn suppress_events_on_update_emits_no_events() {
4932 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4933 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4934 .unwrap();
4935 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4936 .unwrap();
4937 let _ = queue_payloads(&rt, "evts");
4939 rt.execute_query("QUEUE PURGE evts").unwrap();
4941
4942 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4943 .unwrap();
4944
4945 let events = queue_payloads(&rt, "evts");
4946 assert!(
4947 events.is_empty(),
4948 "SUPPRESS EVENTS must prevent UPDATE events"
4949 );
4950 }
4951
4952 #[test]
4953 fn suppress_events_on_delete_emits_no_events() {
4954 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4955 rt.execute_query(
4956 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4957 )
4958 .unwrap();
4959 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4960 .unwrap();
4961
4962 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4963 .unwrap();
4964
4965 let events = queue_payloads(&rt, "evts");
4966 assert!(
4967 events.is_empty(),
4968 "SUPPRESS EVENTS must prevent DELETE events"
4969 );
4970 }
4971
4972 #[test]
4973 fn normal_insert_after_suppress_still_emits() {
4974 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4975 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4976 .unwrap();
4977
4978 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4979 .unwrap();
4980 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4981 .unwrap();
4982
4983 let events = queue_payloads(&rt, "evts");
4984 assert_eq!(
4985 events.len(),
4986 1,
4987 "only the non-suppressed INSERT should emit"
4988 );
4989 assert_eq!(
4990 events[0].get("id").and_then(crate::json::Value::as_u64),
4991 Some(2)
4992 );
4993 }
4994}