1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28 sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100 if !outcome.ok {
101 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102 self.inner
103 .chain_integrity_broken
104 .lock()
105 .insert(collection.to_string(), true);
106 }
107 Some(outcome)
108 }
109
110 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114 let store = self.inner.db.store();
115 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116 return false;
117 }
118 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119 self.inner
120 .chain_integrity_broken
121 .lock()
122 .insert(collection.to_string(), false);
123 true
124 }
125
126 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130 {
131 let cache = self.inner.chain_integrity_broken.lock();
132 if let Some(v) = cache.get(collection) {
133 return *v;
134 }
135 }
136 let store = self.inner.db.store();
137 let persisted =
138 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139 .unwrap_or(false);
140 self.inner
141 .chain_integrity_broken
142 .lock()
143 .insert(collection.to_string(), persisted);
144 persisted
145 }
146
147 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
161 let Some(tenant_col) = self.tenant_column(&query.table) else {
162 return Ok(None);
163 };
164 if query
166 .columns
167 .iter()
168 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
169 {
170 return Ok(None);
171 }
172
173 if let Some(dot_pos) = tenant_col.find('.') {
179 let (root, tail) = tenant_col.split_at(dot_pos);
180 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
182 }
183
184 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
185 return Err(RedDBError::Query(format!(
186 "INSERT into tenant-scoped table '{}' requires an active tenant — \
187 run SET TENANT '<id>' first or name column '{}' explicitly",
188 query.table, tenant_col
189 )));
190 };
191
192 let mut augmented = query.clone();
193 augmented.columns.push(tenant_col);
194 let lit = Value::text(tenant_id.clone());
195 for row in augmented.values.iter_mut() {
196 row.push(lit.clone());
197 }
198 for row in augmented.value_exprs.iter_mut() {
199 row.push(crate::storage::query::ast::Expr::Literal {
200 value: lit.clone(),
201 span: crate::storage::query::ast::Span::synthetic(),
202 });
203 }
204 Ok(Some(augmented))
205 }
206
207 fn inject_dotted_tenant(
217 &self,
218 query: &InsertQuery,
219 root: &str,
220 tail: &str,
221 ) -> RedDBResult<Option<InsertQuery>> {
222 let active_tenant = crate::runtime::impl_core::current_tenant();
223 let mut augmented = query.clone();
224 let root_idx = augmented
225 .columns
226 .iter()
227 .position(|c| c.eq_ignore_ascii_case(root));
228
229 if let Some(idx) = root_idx {
230 for row in augmented.values.iter_mut() {
236 let Some(slot) = row.get_mut(idx) else {
237 continue;
238 };
239 if dotted_tail_already_set(slot, tail) {
240 continue;
241 }
242 let Some(tenant_id) = &active_tenant else {
243 return Err(RedDBError::Query(format!(
244 "INSERT into tenant-scoped table '{}' requires an active tenant — \
245 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
246 query.table, root, tail
247 )));
248 };
249 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
250 }
251 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
255 if let Some(slot) = row.get_mut(idx) {
256 let new_value = augmented
257 .values
258 .get(row_idx)
259 .and_then(|v| v.get(idx))
260 .cloned()
261 .unwrap_or(Value::Null);
262 *slot = crate::storage::query::ast::Expr::Literal {
263 value: new_value,
264 span: crate::storage::query::ast::Span::synthetic(),
265 };
266 }
267 }
268 } else {
269 let Some(tenant_id) = &active_tenant else {
273 return Err(RedDBError::Query(format!(
274 "INSERT into tenant-scoped table '{}' requires an active tenant — \
275 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
276 query.table, root, tail
277 )));
278 };
279 augmented.columns.push(root.to_string());
281 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
282 for row in augmented.values.iter_mut() {
283 row.push(fresh.clone());
284 }
285 for row in augmented.value_exprs.iter_mut() {
286 row.push(crate::storage::query::ast::Expr::Literal {
287 value: fresh.clone(),
288 span: crate::storage::query::ast::Span::synthetic(),
289 });
290 }
291 }
292
293 Ok(Some(augmented))
294 }
295
296 fn delete_entities_batch(
299 &self,
300 collection: &str,
301 ids: &[EntityId],
302 ) -> RedDBResult<(u64, Vec<u64>)> {
303 if ids.is_empty() {
304 return Ok((0, vec![]));
305 }
306
307 let store = self.db().store();
308 let Some(manager) = store.get_collection(collection) else {
309 return Ok((0, vec![]));
310 };
311
312 let active_xid = self.current_xid();
313 let conn_id = crate::runtime::impl_core::current_connection_id();
314 let mut autocommit_xid = None;
315 let mut tombstoned_ids = Vec::new();
316 let mut tombstoned_entities = Vec::new();
317 let mut physical_delete_ids = Vec::new();
318 let table_row_resolver =
319 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
320
321 for &id in ids {
322 let Some(mut entity) = manager.get(id) else {
323 continue;
324 };
325 if matches!(entity.data, EntityData::Row(_)) {
326 let previous_xmax = entity.xmax;
327 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
328 if table_row_resolver.resolve_candidate(&entity).is_none() {
329 continue;
330 }
331 } else if entity.xmax != 0 {
332 continue;
333 }
334
335 let xid = match active_xid {
336 Some(xid) => xid,
337 None => match autocommit_xid {
338 Some(xid) => xid,
339 None => {
340 let mgr = self.snapshot_manager();
341 let xid = mgr.begin();
342 autocommit_xid = Some(xid);
343 xid
344 }
345 },
346 };
347 entity.set_xmax(xid);
348 if manager.update(entity.clone()).is_ok() {
349 if active_xid.is_some() {
350 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
351 }
352 tombstoned_entities.push(entity);
353 tombstoned_ids.push(id);
354 }
355 } else {
356 physical_delete_ids.push(id);
357 }
358 }
359
360 if let Some(xid) = autocommit_xid {
361 self.snapshot_manager().commit(xid);
362 }
363
364 let mut affected = tombstoned_ids.len() as u64;
365 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
366 if active_xid.is_some() {
367 store
368 .persist_entities_to_pager(collection, &tombstoned_entities)
369 .map_err(|err| RedDBError::Internal(err.to_string()))?;
370 } else {
371 store
372 .persist_entities_to_pager(collection, &tombstoned_entities)
373 .map_err(|err| RedDBError::Internal(err.to_string()))?;
374 for id in &tombstoned_ids {
375 store.context_index().remove_entity(*id);
376 let lsn = self.cdc_emit(
377 crate::replication::cdc::ChangeOperation::Delete,
378 collection,
379 id.raw(),
380 "entity",
381 );
382 lsns.push(lsn);
383 }
384 }
385
386 let deleted_ids = store
387 .delete_batch(collection, &physical_delete_ids)
388 .map_err(|err| RedDBError::Internal(err.to_string()))?;
389 affected += deleted_ids.len() as u64;
390 for id in &deleted_ids {
391 store.context_index().remove_entity(*id);
392 let lsn = self.cdc_emit(
393 crate::replication::cdc::ChangeOperation::Delete,
394 collection,
395 id.raw(),
396 "entity",
397 );
398 lsns.push(lsn);
399 }
400
401 Ok((affected, lsns))
402 }
403
404 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
407 if applied.is_empty() {
408 return Ok(Vec::new());
409 }
410
411 let store = self.db().store();
412 if applied.iter().any(|item| item.context_index_dirty) {
413 store.context_index().index_entities(
414 &applied[0].collection,
415 applied
416 .iter()
417 .filter(|item| item.context_index_dirty)
418 .map(|item| &item.entity),
419 );
420 }
421
422 for item in applied {
423 self.refresh_update_secondary_indexes(item)?;
424 }
425
426 let mut lsns = Vec::with_capacity(applied.len());
427 for item in applied {
428 let lsn = self.cdc_emit_prebuilt(
429 crate::replication::cdc::ChangeOperation::Update,
430 &item.collection,
431 &item.entity,
432 update_cdc_item_kind(self, &item.collection, &item.entity),
433 item.metadata.as_ref(),
434 false,
435 );
436 lsns.push(lsn);
437 }
438 Ok(lsns)
439 }
440
441 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
442 self.persist_applied_entity_mutations(applied)
443 }
444
445 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
446 if applied.pre_mutation_fields.is_empty() {
447 return Ok(());
448 }
449 let post = entity_row_fields_snapshot(&applied.entity);
450 if post.is_empty() {
451 return Ok(());
452 }
453
454 let indexed_cols = self
455 .index_store_ref()
456 .indexed_columns_set(&applied.collection);
457 if indexed_cols.is_empty() {
458 return Ok(());
459 }
460
461 if let Some(old_version) = applied.replaced_entity.as_ref() {
462 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
463 .pre_mutation_fields
464 .iter()
465 .filter(|(col, _)| indexed_cols.contains(col))
466 .cloned()
467 .collect();
468 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
469 .iter()
470 .filter(|(col, _)| indexed_cols.contains(col))
471 .cloned()
472 .collect();
473 if !old_index_fields.is_empty() {
474 self.index_store_ref()
475 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
476 .map_err(crate::RedDBError::Internal)?;
477 }
478 if !new_index_fields.is_empty() {
479 self.index_store_ref()
480 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
481 .map_err(crate::RedDBError::Internal)?;
482 }
483 return Ok(());
484 }
485
486 let damage =
487 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
488 if damage
489 .touched_columns()
490 .into_iter()
491 .any(|col| indexed_cols.contains(col))
492 {
493 self.index_store_ref()
494 .index_entity_update(
495 &applied.collection,
496 applied.id,
497 &applied.pre_mutation_fields,
498 &post,
499 )
500 .map_err(crate::RedDBError::Internal)?;
501 }
502 Ok(())
503 }
504
505 pub fn execute_insert(
510 &self,
511 raw_query: &str,
512 query: &InsertQuery,
513 ) -> RedDBResult<RuntimeQueryResult> {
514 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
515 crate::runtime::collection_contract::CollectionContractGate::check(
521 self,
522 &query.table,
523 crate::runtime::collection_contract::MutationKind::Insert,
524 )?;
525 let augmented_owned;
534 let query = match self.maybe_inject_tenant_column(query)? {
535 Some(new_q) => {
536 augmented_owned = new_q;
537 &augmented_owned
538 }
539 None => query,
540 };
541 self.check_insert_column_policy(query)?;
542
543 let mut inserted_count: u64 = 0;
544 let effective_rows =
545 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
546
547 let store = self.inner.db.store();
549 let _ = store.get_or_create_collection(&query.table);
550 let declared_model = self
551 .db()
552 .collection_contract_arc(&query.table)
553 .map(|contract| contract.declared_model);
554
555 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
556 if query.returning.is_some() {
557 Some(Vec::with_capacity(effective_rows.len()))
558 } else {
559 None
560 };
561 let mut returning_result: Option<UnifiedResult> = None;
562
563 if matches!(query.entity_type, InsertEntityType::Row)
564 && !matches!(
565 declared_model,
566 Some(crate::catalog::CollectionModel::TimeSeries)
567 )
568 {
569 let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
580 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
581 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
582 } else {
583 None
584 };
585 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
586
587 if chain_mode && self.is_chain_integrity_broken(&query.table) {
590 return Err(RedDBError::InvalidOperation(format!(
591 "ChainIntegrityBroken: collection '{}' is locked until \
592 POST /collections/{}/clear-integrity-flag is called by an admin",
593 query.table, query.table
594 )));
595 }
596
597 let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
601 if chain_mode {
602 let mut cache = self.inner.chain_tip_cache.lock();
603 if let Some(existing) = cache.get(&query.table) {
604 Some(existing.clone())
605 } else if let Some(scanned) =
606 crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
607 {
608 cache.insert(query.table.clone(), scanned.clone());
609 Some(scanned)
610 } else {
611 None
612 }
613 } else {
614 None
615 };
616
617 let mut rows = Vec::with_capacity(effective_rows.len());
618 for row_values in &effective_rows {
619 if row_values.len() != query.columns.len() {
620 return Err(RedDBError::Query(format!(
621 "INSERT column count ({}) does not match value count ({})",
622 query.columns.len(),
623 row_values.len()
624 )));
625 }
626 let (mut fields, mut metadata) =
627 split_insert_metadata(self, &query.columns, row_values)?;
628 if chain_mode {
629 use crate::runtime::blockchain_kind::{
630 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
631 COL_TIMESTAMP, RESERVED_COLUMNS,
632 };
633 let supplied_height = fields
634 .iter()
635 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
636 .map(|(_, v)| v.clone());
637 let supplied_prev = fields
638 .iter()
639 .find(|(k, _)| k == COL_PREV_HASH)
640 .map(|(_, v)| v.clone());
641 let supplied_ts = fields
642 .iter()
643 .find(|(k, _)| k == COL_TIMESTAMP)
644 .map(|(_, v)| v.clone());
645 let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
646 let user_supplied_any = supplied_height.is_some()
647 || supplied_prev.is_some()
648 || supplied_ts.is_some()
649 || supplied_hash;
650
651 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
652 let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
653
654 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
655 Some(t) => (t.hash, t.height + 1),
656 None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
657 };
658 let server_now = crate::runtime::blockchain_kind::now_ms();
659
660 let (use_prev, use_height, use_ts) = if user_supplied_any {
661 if supplied_hash {
664 return Err(chain_conflict_error(
665 tip_next_height.saturating_sub(1),
666 tip_prev_hash,
667 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
668 server_now,
669 "hash column is engine-computed and cannot be supplied",
670 ));
671 }
672 let caller_prev = match &supplied_prev {
673 Some(Value::Blob(b)) if b.len() == 32 => {
674 let mut a = [0u8; 32];
675 a.copy_from_slice(b);
676 a
677 }
678 Some(Value::Text(s)) if s.len() == 64 => {
679 let mut a = [0u8; 32];
683 let mut ok = true;
684 for (i, slot) in a.iter_mut().enumerate() {
685 let pair = &s.as_ref()[i * 2..i * 2 + 2];
686 match u8::from_str_radix(pair, 16) {
687 Ok(byte) => *slot = byte,
688 Err(_) => {
689 ok = false;
690 break;
691 }
692 }
693 }
694 if !ok {
695 return Err(chain_conflict_error(
696 tip_next_height.saturating_sub(1),
697 tip_prev_hash,
698 chain_tip_full
699 .as_ref()
700 .map(|t| t.timestamp_ms)
701 .unwrap_or(0),
702 server_now,
703 "prev_hash is not valid hex",
704 ));
705 }
706 a
707 }
708 _ => {
709 return Err(chain_conflict_error(
710 tip_next_height.saturating_sub(1),
711 tip_prev_hash,
712 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
713 server_now,
714 "prev_hash missing or not a 32-byte Blob",
715 ));
716 }
717 };
718 if caller_prev != tip_prev_hash {
719 return Err(chain_conflict_error(
720 tip_next_height.saturating_sub(1),
721 tip_prev_hash,
722 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
723 server_now,
724 "prev_hash does not match current tip",
725 ));
726 }
727 let caller_height = match &supplied_height {
728 Some(Value::UnsignedInteger(v)) => *v,
729 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
730 _ => {
731 return Err(chain_conflict_error(
732 tip_next_height.saturating_sub(1),
733 tip_prev_hash,
734 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
735 server_now,
736 "block_height missing or not an unsigned integer",
737 ));
738 }
739 };
740 if caller_height != tip_next_height {
741 return Err(chain_conflict_error(
742 tip_next_height.saturating_sub(1),
743 tip_prev_hash,
744 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
745 server_now,
746 "block_height does not match tip+1",
747 ));
748 }
749 let caller_ts = match &supplied_ts {
750 Some(Value::UnsignedInteger(v)) => *v,
751 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
752 _ => {
753 return Err(chain_conflict_error(
754 tip_next_height.saturating_sub(1),
755 tip_prev_hash,
756 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
757 server_now,
758 "timestamp missing or not an unsigned integer",
759 ));
760 }
761 };
762 let drift = (caller_ts as i128) - (server_now as i128);
763 if drift.abs() > 60_000 {
764 return Err(chain_conflict_error(
765 tip_next_height.saturating_sub(1),
766 tip_prev_hash,
767 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
768 server_now,
769 "timestamp outside ±60s of server_time",
770 ));
771 }
772 (caller_prev, caller_height, caller_ts)
773 } else {
774 (tip_prev_hash, tip_next_height, server_now)
775 };
776
777 let (reserved, new_hash) =
778 crate::runtime::blockchain_kind::make_block_reserved_fields(
779 use_prev, use_height, use_ts, &payload,
780 );
781 fields.extend(reserved);
782 chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
783 height: use_height,
784 hash: new_hash,
785 timestamp_ms: use_ts,
786 });
787 }
788 if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
800 let (pk_col, sig_col, residual) =
801 crate::runtime::signed_writes_kind::split_signature_fields(fields);
802 let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
803 let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
804 crate::runtime::signed_writes_kind::verify_row(
805 ®,
806 pk_col.as_ref().map(|c| c.bytes.as_slice()),
807 sig_col.as_ref().map(|c| c.bytes.as_slice()),
808 &payload,
809 )
810 .map_err(crate::runtime::signed_writes_kind::map_error)?;
811 fields = residual;
812 if let Some(col) = pk_col {
817 fields.push((
818 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
819 col.raw_value,
820 ));
821 }
822 if let Some(col) = sig_col {
823 fields.push((
824 crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
825 col.raw_value,
826 ));
827 }
828 }
829 merge_with_clauses(
830 &mut metadata,
831 query.ttl_ms,
832 query.expires_at_ms,
833 &query.with_metadata,
834 );
835 if let Some(snaps) = returning_snapshots.as_mut() {
836 snaps.push(fields.clone());
837 }
838 rows.push(CreateRowInput {
839 collection: query.table.clone(),
840 fields,
841 metadata,
842 node_links: Vec::new(),
843 vector_links: Vec::new(),
844 });
845 }
846 let outputs = self.create_rows_batch(CreateRowsBatchInput {
847 collection: query.table.clone(),
848 rows,
849 suppress_events: query.suppress_events,
850 })?;
851 inserted_count = outputs.len() as u64;
852
853 if chain_mode {
857 if let Some(new_tip) = chain_tip_full.as_ref() {
858 self.inner
859 .chain_tip_cache
860 .lock()
861 .insert(query.table.clone(), new_tip.clone());
862 }
863 }
864
865 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
873 let time_col = &spec.time_column;
874 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
876 for row in &effective_rows {
877 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
878 if *n >= 0 {
879 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
880 }
881 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
882 let _ = self.inner.db.hypertables().route(&query.table, *n);
883 }
884 }
885 }
886 }
887
888 if let (Some(items), Some(snaps)) =
889 (query.returning.as_ref(), returning_snapshots.take())
890 {
891 let snaps = row_insert_returning_snapshots(&outputs, snaps);
892 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
893 }
894 } else {
895 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
902 Vec::with_capacity(effective_rows.len());
903 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
904 {
905 Vec::with_capacity(effective_rows.len())
906 } else {
907 Vec::new()
908 };
909 if matches!(
910 query.entity_type,
911 InsertEntityType::Node | InsertEntityType::Edge
912 ) {
913 enum PreparedGraphInsert {
914 Node {
915 fields: Vec<(String, Value)>,
916 input: CreateNodeInput,
917 },
918 Edge {
919 fields: Vec<(String, Value)>,
920 input: CreateEdgeInput,
921 },
922 }
923
924 let mut prepared = Vec::with_capacity(effective_rows.len());
925 for row_values in &effective_rows {
926 if row_values.len() != query.columns.len() {
927 return Err(RedDBError::Query(format!(
928 "INSERT column count ({}) does not match value count ({})",
929 query.columns.len(),
930 row_values.len()
931 )));
932 }
933
934 match query.entity_type {
935 InsertEntityType::Node => {
936 let (node_values, mut metadata) =
937 split_insert_metadata(self, &query.columns, row_values)?;
938 merge_with_clauses(
939 &mut metadata,
940 query.ttl_ms,
941 query.expires_at_ms,
942 &query.with_metadata,
943 );
944 ensure_non_tree_reserved_metadata_entries(&metadata)?;
945 apply_collection_default_ttl_metadata(
946 self,
947 &query.table,
948 &mut metadata,
949 );
950 let (columns, values) = pairwise_columns_values(&node_values);
951 let label = find_column_value_string(&columns, &values, "label")?;
952 let node_type =
953 find_column_value_opt_string(&columns, &values, "node_type");
954 let properties = extract_remaining_properties(
955 &columns,
956 &values,
957 &["label", "node_type"],
958 );
959 crate::reserved_fields::ensure_no_reserved_public_item_fields(
960 properties.iter().map(|(key, _)| key.as_str()),
961 &format!("node '{}'", query.table),
962 )?;
963 prepared.push(PreparedGraphInsert::Node {
964 fields: node_values,
965 input: CreateNodeInput {
966 collection: query.table.clone(),
967 label,
968 node_type,
969 properties,
970 metadata,
971 embeddings: Vec::new(),
972 table_links: Vec::new(),
973 node_links: Vec::new(),
974 },
975 });
976 }
977 InsertEntityType::Edge => {
978 let (edge_values, mut metadata) =
979 split_insert_metadata(self, &query.columns, row_values)?;
980 merge_with_clauses(
981 &mut metadata,
982 query.ttl_ms,
983 query.expires_at_ms,
984 &query.with_metadata,
985 );
986 ensure_non_tree_reserved_metadata_entries(&metadata)?;
987 apply_collection_default_ttl_metadata(
988 self,
989 &query.table,
990 &mut metadata,
991 );
992 let (columns, values) = pairwise_columns_values(&edge_values);
993 let label = find_column_value_string(&columns, &values, "label")?;
994 ensure_non_tree_structural_edge_label(&label)?;
995 let from_id = resolve_edge_endpoint_any(
996 self.inner.db.store().as_ref(),
997 &query.table,
998 &columns,
999 &values,
1000 &["from_rid", "from"],
1001 )?;
1002 let to_id = resolve_edge_endpoint_any(
1003 self.inner.db.store().as_ref(),
1004 &query.table,
1005 &columns,
1006 &values,
1007 &["to_rid", "to"],
1008 )?;
1009 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1010 let properties = extract_remaining_properties(
1011 &columns,
1012 &values,
1013 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1014 );
1015 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1016 properties.iter().map(|(key, _)| key.as_str()),
1017 &format!("edge '{}'", query.table),
1018 )?;
1019 prepared.push(PreparedGraphInsert::Edge {
1020 fields: edge_values,
1021 input: CreateEdgeInput {
1022 collection: query.table.clone(),
1023 label,
1024 from: EntityId::new(from_id),
1025 to: EntityId::new(to_id),
1026 weight,
1027 properties,
1028 metadata,
1029 },
1030 });
1031 }
1032 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1033 }
1034 }
1035
1036 ensure_graph_insert_contract(self, &query.table)?;
1037 let mut batch = self.inner.db.batch();
1038 for item in prepared {
1039 match item {
1040 PreparedGraphInsert::Node { fields, input } => {
1041 if query.returning.is_some() {
1042 returning_field_snaps.push(fields);
1043 }
1044 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1045 batch = batch.add_node_with_type(
1046 input.collection,
1047 input.label,
1048 node_type,
1049 input.properties.into_iter().collect(),
1050 input.metadata.into_iter().collect(),
1051 );
1052 }
1053 PreparedGraphInsert::Edge { fields, input } => {
1054 if query.returning.is_some() {
1055 returning_field_snaps.push(fields);
1056 }
1057 batch = batch.add_edge(
1058 input.collection,
1059 input.label,
1060 input.from,
1061 input.to,
1062 input.weight.unwrap_or(1.0),
1063 input.properties.into_iter().collect(),
1064 input.metadata.into_iter().collect(),
1065 );
1066 }
1067 }
1068 }
1069 let batch_result = batch
1070 .execute()
1071 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1072 let (ids, entity_kind) = match query.entity_type {
1073 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1074 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1075 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1076 };
1077 for id in &ids {
1078 self.stamp_xmin_if_in_txn(&query.table, *id);
1079 }
1080 if query.returning.is_some() {
1081 returning_field_snaps = graph_insert_returning_snapshots(
1082 self.inner.db.store().as_ref(),
1083 &query.table,
1084 &ids,
1085 );
1086 }
1087 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1088 let store = self.inner.db.store();
1089 entity_outputs.extend(ids.iter().map(|id| {
1090 crate::application::entity::CreateEntityOutput {
1091 id: *id,
1092 entity: store.get(&query.table, *id),
1093 }
1094 }));
1095 inserted_count = ids.len() as u64;
1096 } else {
1097 for row_values in &effective_rows {
1098 if row_values.len() != query.columns.len() {
1099 return Err(RedDBError::Query(format!(
1100 "INSERT column count ({}) does not match value count ({})",
1101 query.columns.len(),
1102 row_values.len()
1103 )));
1104 }
1105
1106 match query.entity_type {
1107 InsertEntityType::Row => {
1108 if query.returning.is_some() {
1109 return Err(RedDBError::Query(
1110 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1111 .to_string(),
1112 ));
1113 }
1114 let (fields, mut metadata) =
1115 split_insert_metadata(self, &query.columns, row_values)?;
1116 merge_with_clauses(
1117 &mut metadata,
1118 query.ttl_ms,
1119 query.expires_at_ms,
1120 &query.with_metadata,
1121 );
1122 self.insert_timeseries_point(&query.table, fields, metadata)?;
1123 }
1124 InsertEntityType::Node | InsertEntityType::Edge => {
1125 unreachable!("NODE and EDGE are handled by the prepared graph path")
1126 }
1127 InsertEntityType::Vector => {
1128 let (vector_values, mut metadata) =
1129 split_insert_metadata(self, &query.columns, row_values)?;
1130 merge_with_clauses(
1131 &mut metadata,
1132 query.ttl_ms,
1133 query.expires_at_ms,
1134 &query.with_metadata,
1135 );
1136 let (columns, values) = pairwise_columns_values(&vector_values);
1137 let dense = find_column_value_vec_f32_any(
1138 &columns,
1139 &values,
1140 &["dense", "embedding"],
1141 )?;
1142 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1143 let content =
1144 find_column_value_opt_string(&columns, &values, "content");
1145 if query.returning.is_some() {
1146 returning_field_snaps.push(vector_values.clone());
1147 }
1148 let input = CreateVectorInput {
1149 collection: query.table.clone(),
1150 dense,
1151 content,
1152 metadata,
1153 link_row: None,
1154 link_node: None,
1155 };
1156 entity_outputs.push(self.create_vector(input)?);
1157 }
1158 InsertEntityType::Document => {
1159 let (document_values, mut metadata) =
1160 split_insert_metadata(self, &query.columns, row_values)?;
1161 merge_with_clauses(
1162 &mut metadata,
1163 query.ttl_ms,
1164 query.expires_at_ms,
1165 &query.with_metadata,
1166 );
1167 let (columns, values) = pairwise_columns_values(&document_values);
1168 let body_str = find_column_value_string(&columns, &values, "body")?;
1169 let body: crate::json::Value = crate::json::from_str(&body_str)
1170 .map_err(|e| {
1171 RedDBError::Query(format!("invalid JSON body: {e}"))
1172 })?;
1173 let input = CreateDocumentInput {
1174 collection: query.table.clone(),
1175 body,
1176 metadata,
1177 node_links: Vec::new(),
1178 vector_links: Vec::new(),
1179 };
1180 let output = self.create_document(input)?;
1181 if query.returning.is_some() {
1182 let fields = output
1183 .entity
1184 .as_ref()
1185 .map(entity_row_fields_snapshot)
1186 .filter(|fields| !fields.is_empty())
1187 .unwrap_or(document_values);
1188 returning_field_snaps.push(fields);
1189 }
1190 entity_outputs.push(output);
1191 }
1192 InsertEntityType::Kv => {
1193 let (kv_values, mut metadata) =
1194 split_insert_metadata(self, &query.columns, row_values)?;
1195 merge_with_clauses(
1196 &mut metadata,
1197 query.ttl_ms,
1198 query.expires_at_ms,
1199 &query.with_metadata,
1200 );
1201 let (columns, values) = pairwise_columns_values(&kv_values);
1202 let key = find_column_value_string(&columns, &values, "key")?;
1203 let value = find_column_value(&columns, &values, "value")?;
1204 if query.returning.is_some() {
1205 returning_field_snaps.push(kv_values.clone());
1206 }
1207 let input = CreateKvInput {
1208 collection: query.table.clone(),
1209 key,
1210 value,
1211 metadata,
1212 };
1213 entity_outputs.push(self.create_kv(input)?);
1214 }
1215 }
1216
1217 inserted_count += 1;
1218 }
1219 }
1220
1221 if let Some(items) = query.returning.as_ref() {
1222 if !entity_outputs.is_empty() {
1223 returning_result = Some(build_returning_result(
1224 items,
1225 &returning_field_snaps,
1226 Some(&entity_outputs),
1227 ));
1228 }
1229 }
1230 }
1231
1232 if let Some(ref embed_config) = query.auto_embed {
1234 let store = self.inner.db.store();
1235 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1236 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
1237 let model = embed_config.model.clone().unwrap_or_else(|| {
1238 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1239 .ok()
1240 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1241 });
1242
1243 let manager = store
1245 .get_collection(&query.table)
1246 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1247 let entities = manager.query_all(|_| true);
1248 let recent: Vec<_> = entities
1249 .into_iter()
1250 .rev()
1251 .take(effective_rows.len())
1252 .collect();
1253
1254 let entity_combos: Vec<(usize, String)> = recent
1256 .iter()
1257 .enumerate()
1258 .filter_map(|(i, entity)| {
1259 if let EntityData::Row(ref row) = entity.data {
1260 if let Some(ref named) = row.named {
1261 let texts: Vec<String> = embed_config
1262 .fields
1263 .iter()
1264 .filter_map(|field| match named.get(field) {
1265 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1266 _ => None,
1267 })
1268 .collect();
1269 if !texts.is_empty() {
1270 return Some((i, texts.join(" ")));
1271 }
1272 }
1273 }
1274 None
1275 })
1276 .collect();
1277
1278 if !entity_combos.is_empty() {
1279 let batch_texts: Vec<String> =
1281 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1282
1283 let batch_client =
1284 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1285
1286 let embeddings = match tokio::runtime::Handle::try_current() {
1287 Ok(handle) => tokio::task::block_in_place(|| {
1288 handle.block_on(batch_client.embed_batch(
1289 &provider,
1290 &model,
1291 &api_key,
1292 batch_texts,
1293 ))
1294 }),
1295 Err(_) => {
1296 return Err(RedDBError::Query(
1297 "AUTO EMBED requires a Tokio runtime context".to_string(),
1298 ));
1299 }
1300 }
1301 .map_err(|e| RedDBError::Query(e.to_string()))?;
1302
1303 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1305 if dense.is_empty() {
1306 continue;
1307 }
1308 self.create_vector(CreateVectorInput {
1309 collection: query.table.clone(),
1310 dense,
1311 content: Some(combined.clone()),
1312 metadata: Vec::new(),
1313 link_row: None,
1314 link_node: None,
1315 })?;
1316 }
1317 }
1318 }
1319
1320 if inserted_count > 0 {
1321 self.note_table_write(&query.table);
1322 }
1323
1324 let mut result = RuntimeQueryResult::dml_result(
1325 raw_query.to_string(),
1326 inserted_count,
1327 "insert",
1328 "runtime-dml",
1329 );
1330 if let Some(returning) = returning_result {
1331 result.result = returning;
1332 }
1333 Ok(result)
1334 }
1335
1336 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1337 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1338 return Ok(());
1339 };
1340 if !auth_store.iam_authorization_enabled() {
1341 return Ok(());
1342 }
1343 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1344 return Ok(());
1345 };
1346
1347 let tenant = crate::runtime::impl_core::current_tenant();
1348 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1349 let request = crate::auth::ColumnAccessRequest {
1350 action: "insert".to_string(),
1351 schema: None,
1352 table: query.table.clone(),
1353 columns: query.columns.clone(),
1354 };
1355 let ctx = crate::auth::policies::EvalContext {
1356 principal_tenant: tenant.clone(),
1357 current_tenant: tenant,
1358 peer_ip: None,
1359 mfa_present: false,
1360 now_ms: crate::auth::now_ms(),
1361 principal_is_admin_role: role == crate::auth::Role::Admin,
1362 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1363 principal_is_platform_scoped: principal.tenant.is_none(),
1364 };
1365
1366 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1367 let table_allowed = matches!(
1368 outcome.table_decision,
1369 crate::auth::policies::Decision::Allow { .. }
1370 | crate::auth::policies::Decision::AdminBypass
1371 );
1372 if !table_allowed {
1373 return Err(RedDBError::Query(format!(
1374 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1375 outcome.table_resource.kind, outcome.table_resource.name
1376 )));
1377 }
1378 if let Some(denied) = outcome.first_denied_column() {
1379 return Err(RedDBError::Query(format!(
1380 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1381 denied.resource.kind, denied.resource.name
1382 )));
1383 }
1384
1385 Ok(())
1386 }
1387
1388 pub(crate) fn insert_timeseries_point(
1389 &self,
1390 collection: &str,
1391 fields: Vec<(String, Value)>,
1392 mut metadata: Vec<(String, MetadataValue)>,
1393 ) -> RedDBResult<EntityId> {
1394 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1395
1396 let (columns, values) = pairwise_columns_values(&fields);
1397 validate_timeseries_insert_columns(&columns)?;
1398
1399 let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1408 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1409 if let Some(event_name) = event_name_opt.as_deref() {
1410 let store_for_schema = self.inner.db.store();
1411 if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1412 .is_some()
1413 {
1414 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1415 super::analytics_schema_registry::validate(
1416 store_for_schema.as_ref(),
1417 event_name,
1418 payload_json,
1419 )
1420 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1421 }
1422 }
1423
1424 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1429 Some(m) => m,
1430 None => event_name_opt.clone().ok_or_else(|| {
1431 RedDBError::Query(
1432 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1433 )
1434 })?,
1435 };
1436 let value = match find_column_value_opt_string(&columns, &values, "value") {
1440 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1441 None => columns
1442 .iter()
1443 .position(|c| c.eq_ignore_ascii_case("value"))
1444 .and_then(|i| match &values[i] {
1445 Value::Float(f) => Some(*f),
1446 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1447 Value::UnsignedInteger(n) => Some(*n as f64),
1448 _ => None,
1449 })
1450 .unwrap_or(1.0),
1451 };
1452 let timestamp_ns =
1453 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1454 let mut tags = find_timeseries_tags(&columns, &values)?;
1455 if let Some(ref name) = event_name_opt {
1456 tags.entry("event_name".to_string())
1457 .or_insert_with(|| name.clone());
1458 }
1459 if let Some(ref payload) = payload_opt {
1460 tags.entry("payload".to_string())
1461 .or_insert_with(|| payload.clone());
1462 }
1463
1464 let mut entity = UnifiedEntity::new(
1465 EntityId::new(0),
1466 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1467 series: collection.to_string(),
1468 metric: metric.clone(),
1469 })),
1470 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1471 metric,
1472 timestamp_ns,
1473 value,
1474 tags,
1475 }),
1476 );
1477 let writer_xid = match self.current_xid() {
1481 Some(xid) => xid,
1482 None => {
1483 let mgr = self.snapshot_manager();
1484 let xid = mgr.begin();
1485 mgr.commit(xid);
1486 xid
1487 }
1488 };
1489 entity.set_xmin(writer_xid);
1490
1491 let store = self.inner.db.store();
1492 let id = store
1493 .insert_auto(collection, entity)
1494 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1495
1496 if !metadata.is_empty() {
1497 let _ = store.set_metadata(
1498 collection,
1499 id,
1500 Metadata::with_fields(metadata.into_iter().collect()),
1501 );
1502 }
1503
1504 self.cdc_emit(
1505 crate::replication::cdc::ChangeOperation::Insert,
1506 collection,
1507 id.raw(),
1508 "timeseries",
1509 );
1510
1511 Ok(id)
1512 }
1513
1514 pub fn execute_update(
1519 &self,
1520 raw_query: &str,
1521 query: &UpdateQuery,
1522 ) -> RedDBResult<RuntimeQueryResult> {
1523 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1524 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1528 return Err(RedDBError::InvalidOperation(format!(
1529 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1530 query.table
1531 )));
1532 }
1533 crate::runtime::collection_contract::CollectionContractGate::check(
1539 self,
1540 &query.table,
1541 crate::runtime::collection_contract::MutationKind::Update,
1542 )?;
1543 ensure_update_target_contract(self, &query.table, query.target)?;
1544 ensure_graph_identity_update_target_allowed(query)?;
1545
1546 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1552 let augmented_query: UpdateQuery;
1553 let effective_query: &UpdateQuery = if rls_gated {
1554 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1555 self,
1556 &query.table,
1557 crate::storage::query::ast::PolicyAction::Update,
1558 );
1559 let Some(policy) = rls_filter else {
1560 let mut response = RuntimeQueryResult::dml_result(
1563 raw_query.to_string(),
1564 0,
1565 "update",
1566 "runtime-dml-rls",
1567 );
1568 if let Some(items) = query.returning.clone() {
1569 response.result = build_returning_result(&items, &[], None);
1570 }
1571 return Ok(response);
1572 };
1573 let mut augmented = query.clone();
1574 augmented.filter = Some(match augmented.filter.take() {
1575 Some(existing) => {
1576 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1577 }
1578 None => policy,
1579 });
1580 augmented_query = augmented;
1581 &augmented_query
1582 } else {
1583 query
1584 };
1585
1586 if let Some(items) = effective_query.returning.clone() {
1591 let mut inner_query = effective_query.clone();
1592 inner_query.returning = None;
1593 let (mut response, touched_ids) =
1594 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1595
1596 let snapshots = if matches!(
1597 effective_query.target,
1598 UpdateTarget::Nodes | UpdateTarget::Edges
1599 ) {
1600 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1601 } else {
1602 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1603 .row_snapshots(&touched_ids)
1604 };
1605
1606 response.result = build_returning_result(&items, &snapshots, None);
1607 response.engine = "runtime-dml-returning";
1608 return Ok(response);
1609 }
1610
1611 self.execute_update_inner(raw_query, effective_query)
1612 }
1613
1614 fn execute_update_inner(
1616 &self,
1617 raw_query: &str,
1618 query: &UpdateQuery,
1619 ) -> RedDBResult<RuntimeQueryResult> {
1620 self.execute_update_inner_tracked(raw_query, query)
1621 .map(|(res, _)| res)
1622 }
1623
1624 fn execute_update_inner_tracked(
1625 &self,
1626 raw_query: &str,
1627 query: &UpdateQuery,
1628 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1629 let store = self.inner.db.store();
1630 let effective_filter = effective_update_filter(query);
1631 let compiled_plan = self.compile_update_plan(query)?;
1632 let mut touched_ids: Vec<EntityId> = Vec::new();
1633 let limit_cap = query.limit.map(|l| l as usize);
1634 let manager = store
1635 .get_collection(&query.table)
1636 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1637 let scan_limit = if query.order_by.is_empty() {
1638 limit_cap
1639 } else {
1640 None
1641 };
1642 let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1643 self,
1644 &query.table,
1645 effective_filter.as_ref(),
1646 scan_limit,
1647 query.target,
1648 )
1649 .find_target_ids()?;
1650 let ids_to_update = if query.order_by.is_empty() {
1651 ids_to_update
1652 } else {
1653 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1654 };
1655
1656 if update_needs_rmw_lock(query) {
1657 return self.execute_update_inner_tracked_locked(
1658 raw_query,
1659 query,
1660 &compiled_plan,
1661 &ids_to_update,
1662 effective_filter.as_ref(),
1663 );
1664 }
1665
1666 let mut affected: u64 = 0;
1667 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1668 let mut applied_chunk = Vec::with_capacity(chunk.len());
1669 for entity in manager.get_many(chunk).into_iter().flatten() {
1670 let assignments =
1671 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1672 let applied = self.apply_materialized_update_for_entity(
1673 query.table.clone(),
1674 entity,
1675 &compiled_plan,
1676 assignments,
1677 )?;
1678 touched_ids.push(applied.id);
1679 applied_chunk.push(applied);
1680 }
1681 self.persist_update_chunk(&applied_chunk)?;
1682 affected += applied_chunk.len() as u64;
1683 let lsns = self.flush_update_chunk(&applied_chunk)?;
1684 if !query.suppress_events {
1685 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1686 }
1687 }
1688
1689 if affected > 0 {
1690 self.note_table_write(&query.table);
1691 }
1692
1693 Ok((
1694 RuntimeQueryResult::dml_result(
1695 raw_query.to_string(),
1696 affected,
1697 "update",
1698 "runtime-dml",
1699 ),
1700 touched_ids,
1701 ))
1702 }
1703
1704 fn execute_update_inner_tracked_locked(
1705 &self,
1706 raw_query: &str,
1707 query: &UpdateQuery,
1708 compiled_plan: &CompiledUpdatePlan,
1709 ids_to_update: &[EntityId],
1710 effective_filter: Option<&Filter>,
1711 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1712 let store = self.inner.db.store();
1713 let mut touched_ids = Vec::new();
1714 let mut lock_entries = Vec::new();
1715
1716 for id in ids_to_update {
1717 let Some(candidate) = store.get(&query.table, *id) else {
1718 continue;
1719 };
1720 let logical_id = candidate.logical_id();
1721 let lock_key = format!("row:{}", logical_id.raw());
1722 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1723 lock_entries.push((lock_key, logical_id, rmw_lock));
1724 }
1725
1726 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1727 lock_entries.dedup_by(|left, right| left.0 == right.0);
1728 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1729
1730 let mut applied_chunk = Vec::new();
1731 for (_, logical_id, _) in &lock_entries {
1732 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1733 else {
1734 continue;
1735 };
1736 if let Some(filter) = effective_filter {
1737 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1738 Some(self.inner.db.as_ref()),
1739 &entity,
1740 filter,
1741 &query.table,
1742 &query.table,
1743 ) {
1744 continue;
1745 }
1746 }
1747
1748 let assignments =
1749 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1750 let applied = self.apply_materialized_update_for_entity(
1751 query.table.clone(),
1752 entity,
1753 compiled_plan,
1754 assignments,
1755 )?;
1756 touched_ids.push(applied.id);
1757 applied_chunk.push(applied);
1758 }
1759
1760 let affected = applied_chunk.len() as u64;
1761 if !applied_chunk.is_empty() {
1762 self.persist_update_chunk(&applied_chunk)?;
1763 let lsns = self.flush_update_chunk(&applied_chunk)?;
1764 if !query.suppress_events {
1765 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1766 }
1767 }
1768
1769 if affected > 0 {
1770 self.note_table_write(&query.table);
1771 }
1772
1773 Ok((
1774 RuntimeQueryResult::dml_result(
1775 raw_query.to_string(),
1776 affected,
1777 "update",
1778 "runtime-dml",
1779 ),
1780 touched_ids,
1781 ))
1782 }
1783
1784 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1785 let mut static_field_assignments = Vec::new();
1786 let mut static_metadata_assignments = Vec::new();
1787 let mut dynamic_assignments = Vec::new();
1788 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1789 let mut row_modified_columns = Vec::new();
1790
1791 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1792 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1793 let metadata_key = resolve_sql_ttl_metadata_key(column);
1794 if compound_op.is_some() && metadata_key.is_some() {
1795 return Err(RedDBError::Query(format!(
1796 "compound assignment is only supported for row fields: {column}"
1797 )));
1798 }
1799 if compound_op.is_none() {
1800 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1801 if let Some(metadata_key) = metadata_key {
1802 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1803 let (canonical_key, canonical_value) =
1804 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1805 static_metadata_assignments
1806 .push((canonical_key.to_string(), canonical_value));
1807 } else {
1808 let value = self.resolve_crypto_sentinel(value)?;
1809 static_field_assignments.push((
1810 column.clone(),
1811 normalize_row_update_assignment_with_plan(
1812 &query.table,
1813 column,
1814 value,
1815 row_contract_plan.as_ref(),
1816 )?,
1817 ));
1818 row_modified_columns.push(column.clone());
1819 }
1820 continue;
1821 }
1822 }
1823
1824 dynamic_assignments.push(CompiledUpdateAssignment {
1825 column: column.clone(),
1826 expr: expr.clone(),
1827 compound_op,
1828 metadata_key,
1829 row_rule: if metadata_key.is_none() {
1830 if let Some(plan) = row_contract_plan.as_ref() {
1831 if plan.timestamps_enabled
1832 && (column == "created_at" || column == "updated_at")
1833 {
1834 return Err(RedDBError::Query(format!(
1835 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1836 query.table, column
1837 )));
1838 }
1839 if let Some(rule) = plan.declared_rules.get(column) {
1840 Some(rule.clone())
1841 } else if plan.strict_schema {
1842 return Err(RedDBError::Query(format!(
1843 "collection '{}' is strict and does not allow undeclared fields: {}",
1844 query.table, column
1845 )));
1846 } else {
1847 None
1848 }
1849 } else {
1850 None
1851 }
1852 } else {
1853 None
1854 },
1855 });
1856 if metadata_key.is_none() {
1857 row_modified_columns.push(column.clone());
1858 }
1859 }
1860
1861 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1862 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1863 row_modified_columns.iter().any(|column| {
1864 plan.unique_columns
1865 .keys()
1866 .any(|unique| unique.eq_ignore_ascii_case(column))
1867 })
1868 });
1869
1870 if let Some(ttl_ms) = query.ttl_ms {
1871 static_metadata_assignments
1872 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1873 }
1874 if let Some(expires_at_ms) = query.expires_at_ms {
1875 static_metadata_assignments.push((
1876 "_expires_at".to_string(),
1877 metadata_u64_to_value(expires_at_ms),
1878 ));
1879 }
1880 for (key, val) in &query.with_metadata {
1881 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1882 }
1883
1884 Ok(CompiledUpdatePlan {
1885 static_field_assignments,
1886 static_metadata_assignments,
1887 dynamic_assignments,
1888 row_contract_plan,
1889 row_modified_columns,
1890 row_touches_unique_columns,
1891 })
1892 }
1893
1894 fn materialize_update_assignments_for_entity(
1895 &self,
1896 query: &UpdateQuery,
1897 entity: &UnifiedEntity,
1898 compiled_plan: &CompiledUpdatePlan,
1899 ) -> RedDBResult<MaterializedUpdateAssignments> {
1900 let mut assignments = MaterializedUpdateAssignments::default();
1901 let mut record: Option<UnifiedRecord> = None;
1902
1903 for assignment in &compiled_plan.dynamic_assignments {
1904 if assignment.compound_op.is_some()
1905 && !matches!(
1906 entity.data,
1907 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1908 )
1909 {
1910 return Err(RedDBError::Query(format!(
1911 "compound assignment is only supported for row or graph UPDATE column '{}'",
1912 assignment.column
1913 )));
1914 }
1915 if record.is_none() {
1916 record = runtime_any_record_from_entity_ref(entity);
1917 }
1918 let Some(record) = record.as_ref() else {
1919 return Err(RedDBError::Query(format!(
1920 "UPDATE could not materialize runtime record for entity {} in '{}'",
1921 entity.id.raw(),
1922 query.table
1923 )));
1924 };
1925 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1926 Some(self.inner.db.as_ref()),
1927 &assignment.expr,
1928 record,
1929 Some(query.table.as_str()),
1930 Some(query.table.as_str()),
1931 )
1932 .ok_or_else(|| {
1933 RedDBError::Query(format!(
1934 "failed to evaluate UPDATE expression for column '{}'",
1935 assignment.column
1936 ))
1937 })?;
1938 let value = if let Some(op) = assignment.compound_op {
1939 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1940 } else {
1941 rhs
1942 };
1943
1944 if let Some(metadata_key) = assignment.metadata_key {
1945 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1946 let (canonical_key, canonical_value) =
1947 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1948 assignments
1949 .dynamic_metadata_assignments
1950 .push((canonical_key.to_string(), canonical_value));
1951 } else {
1952 assignments.dynamic_field_assignments.push((
1953 assignment.column.clone(),
1954 normalize_row_update_value_for_rule(
1955 &query.table,
1956 self.resolve_crypto_sentinel(value)?,
1957 assignment.row_rule.as_ref(),
1958 )?,
1959 ));
1960 }
1961 }
1962
1963 Ok(assignments)
1964 }
1965
1966 fn apply_materialized_update_for_entity(
1967 &self,
1968 collection: String,
1969 entity: UnifiedEntity,
1970 compiled_plan: &CompiledUpdatePlan,
1971 assignments: MaterializedUpdateAssignments,
1972 ) -> RedDBResult<AppliedEntityMutation> {
1973 if matches!(entity.data, EntityData::Row(_)) {
1974 return self.apply_loaded_sql_update_row_core(
1975 collection,
1976 entity,
1977 &compiled_plan.static_field_assignments,
1978 assignments.dynamic_field_assignments,
1979 &compiled_plan.static_metadata_assignments,
1980 assignments.dynamic_metadata_assignments,
1981 compiled_plan.row_contract_plan.as_ref(),
1982 &compiled_plan.row_modified_columns,
1983 compiled_plan.row_touches_unique_columns,
1984 );
1985 }
1986
1987 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
1988
1989 let operations = build_patch_operations_from_materialized_assignments(
1990 &entity,
1991 compiled_plan,
1992 assignments,
1993 );
1994 self.apply_loaded_patch_entity_core(
1995 collection,
1996 entity,
1997 crate::json::Value::Null,
1998 operations,
1999 )
2000 }
2001
2002 pub fn execute_delete(
2004 &self,
2005 raw_query: &str,
2006 query: &DeleteQuery,
2007 ) -> RedDBResult<RuntimeQueryResult> {
2008 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2009 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2012 return Err(RedDBError::InvalidOperation(format!(
2013 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2014 query.table
2015 )));
2016 }
2017 crate::runtime::collection_contract::CollectionContractGate::check(
2021 self,
2022 &query.table,
2023 crate::runtime::collection_contract::MutationKind::Delete,
2024 )?;
2025
2026 if let Some(items) = query.returning.clone() {
2033 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2034 RedDBError::Query(
2035 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2036 )
2037 })?;
2038 let captured = self.execute_query(&select_sql)?;
2039
2040 let mut inner_query = query.clone();
2041 inner_query.returning = None;
2042 let _ = self.execute_delete(raw_query, &inner_query)?;
2043
2044 let snapshots: Vec<Vec<(String, Value)>> = captured
2045 .result
2046 .records
2047 .iter()
2048 .map(|rec| {
2049 rec.iter_fields()
2050 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2051 .collect()
2052 })
2053 .collect();
2054 let affected = snapshots.len() as u64;
2055 let result = build_returning_result(&items, &snapshots, None);
2056
2057 let mut response = RuntimeQueryResult::dml_result(
2058 raw_query.to_string(),
2059 affected,
2060 "delete",
2061 "runtime-dml-returning",
2062 );
2063 response.result = result;
2064 return Ok(response);
2065 }
2066 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2073 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2074 self,
2075 &query.table,
2076 crate::storage::query::ast::PolicyAction::Delete,
2077 );
2078 let Some(policy) = rls_filter else {
2079 return Ok(RuntimeQueryResult::dml_result(
2080 raw_query.to_string(),
2081 0,
2082 "delete",
2083 "runtime-dml-rls",
2084 ));
2085 };
2086 let mut augmented = query.clone();
2091 augmented.filter = Some(match augmented.filter.take() {
2092 Some(existing) => {
2093 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2094 }
2095 None => policy,
2096 });
2097 return self.execute_delete_inner(raw_query, &augmented);
2098 }
2099 self.execute_delete_inner(raw_query, query)
2100 }
2101
2102 fn execute_delete_inner(
2103 &self,
2104 raw_query: &str,
2105 query: &DeleteQuery,
2106 ) -> RedDBResult<RuntimeQueryResult> {
2107 let effective_filter = effective_delete_filter(query);
2108
2109 let scan = super::dml_target_scan::DmlTargetScan::new(
2113 self,
2114 &query.table,
2115 effective_filter.as_ref(),
2116 None,
2117 );
2118 let ids_to_delete = scan.find_target_ids()?;
2119
2120 let needs_delete_events =
2123 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2124 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2125 scan.row_json_pre_images(&ids_to_delete)
2126 } else {
2127 HashMap::new()
2128 };
2129
2130 let mut affected: u64 = 0;
2131 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2132 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2133 affected += count;
2134 if needs_delete_events && !lsns.is_empty() {
2135 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2139 self.emit_delete_events_for_collection(
2140 &query.table,
2141 deleted_chunk,
2142 &lsns,
2143 &pre_images,
2144 )?;
2145 }
2146 }
2147 pre_images.clear();
2148
2149 if affected > 0 {
2150 self.note_table_write(&query.table);
2151 }
2152
2153 Ok(RuntimeQueryResult::dml_result(
2154 raw_query.to_string(),
2155 affected,
2156 "delete",
2157 "runtime-dml",
2158 ))
2159 }
2160}
2161
2162fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2168 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2169 return Ok(());
2170 }
2171 for (column, _) in &query.assignment_exprs {
2172 if is_immutable_graph_identity_field(column) {
2173 return Err(RedDBError::Query(format!(
2174 "immutable graph field '{column}' cannot be updated"
2175 )));
2176 }
2177 }
2178 Ok(())
2179}
2180
2181fn ensure_graph_identity_update_allowed(
2182 entity: &UnifiedEntity,
2183 compiled_plan: &CompiledUpdatePlan,
2184 assignments: &MaterializedUpdateAssignments,
2185) -> RedDBResult<()> {
2186 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2187 return Ok(());
2188 }
2189
2190 for (column, _) in compiled_plan
2191 .static_field_assignments
2192 .iter()
2193 .chain(assignments.dynamic_field_assignments.iter())
2194 {
2195 if is_immutable_graph_identity_field(column) {
2196 return Err(RedDBError::Query(format!(
2197 "immutable graph field '{column}' cannot be updated"
2198 )));
2199 }
2200 }
2201
2202 Ok(())
2203}
2204
2205fn is_immutable_graph_identity_field(column: &str) -> bool {
2206 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2207 .iter()
2208 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2209}
2210
2211fn build_patch_operations_from_materialized_assignments(
2212 entity: &UnifiedEntity,
2213 compiled_plan: &CompiledUpdatePlan,
2214 assignments: MaterializedUpdateAssignments,
2215) -> Vec<PatchEntityOperation> {
2216 let mut operations = Vec::with_capacity(
2217 compiled_plan.static_field_assignments.len()
2218 + compiled_plan.static_metadata_assignments.len()
2219 + assignments.dynamic_field_assignments.len()
2220 + assignments.dynamic_metadata_assignments.len(),
2221 );
2222
2223 for (column, value) in &compiled_plan.static_field_assignments {
2224 operations.push(PatchEntityOperation {
2225 op: PatchEntityOperationType::Set,
2226 path: update_patch_path_for_entity(entity, column),
2227 value: Some(storage_value_to_json(value)),
2228 });
2229 }
2230
2231 for (column, value) in assignments.dynamic_field_assignments {
2232 operations.push(PatchEntityOperation {
2233 op: PatchEntityOperationType::Set,
2234 path: update_patch_path_for_entity(entity, &column),
2235 value: Some(storage_value_to_json(&value)),
2236 });
2237 }
2238
2239 for (key, value) in &compiled_plan.static_metadata_assignments {
2240 operations.push(PatchEntityOperation {
2241 op: PatchEntityOperationType::Set,
2242 path: vec!["metadata".to_string(), key.clone()],
2243 value: Some(metadata_value_to_json(value)),
2244 });
2245 }
2246
2247 for (key, value) in assignments.dynamic_metadata_assignments {
2248 operations.push(PatchEntityOperation {
2249 op: PatchEntityOperationType::Set,
2250 path: vec!["metadata".to_string(), key],
2251 value: Some(metadata_value_to_json(&value)),
2252 });
2253 }
2254
2255 operations
2256}
2257
2258fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2259 if matches!(
2260 (&entity.kind, &entity.data),
2261 (
2262 crate::storage::EntityKind::GraphNode(_),
2263 EntityData::Node(_)
2264 )
2265 ) && column.eq_ignore_ascii_case("node_type")
2266 {
2267 return vec!["node_type".to_string()];
2268 }
2269 if matches!(
2270 (&entity.kind, &entity.data),
2271 (
2272 crate::storage::EntityKind::GraphEdge(_),
2273 EntityData::Edge(_)
2274 )
2275 ) && column.eq_ignore_ascii_case("weight")
2276 {
2277 return vec!["weight".to_string()];
2278 }
2279 vec!["fields".to_string(), column.to_string()]
2280}
2281
2282fn delete_to_select_sql(sql: &str) -> Option<String> {
2292 let trimmed = sql.trim_start();
2293 let lowered = trimmed.to_ascii_lowercase();
2294 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2295 return None;
2296 }
2297 let from_idx = lowered.find(" from ")?;
2299 let after_from = &trimmed[from_idx + " from ".len()..];
2300 let after_from_lc = &lowered[from_idx + " from ".len()..];
2301
2302 let mut body = after_from.to_string();
2307 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2308 body.truncate(pos);
2309 }
2310 Some(format!("SELECT * FROM {}", body.trim_end()))
2311}
2312
2313fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2318 let bytes = haystack.as_bytes();
2319 let nlen = needle.len();
2320 let mut i = 0usize;
2321 let mut in_string = false;
2322 while i < bytes.len() {
2323 let c = bytes[i];
2324 if c == b'\'' {
2325 in_string = !in_string;
2326 i += 1;
2327 continue;
2328 }
2329 if !in_string
2330 && i + nlen <= bytes.len()
2331 && &bytes[i..i + nlen] == needle.as_bytes()
2332 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2333 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2334 {
2335 return Some(i);
2336 }
2337 i += 1;
2338 }
2339 None
2340}
2341
2342fn build_returning_result(
2349 items: &[ReturningItem],
2350 snapshots: &[Vec<(String, Value)>],
2351 outputs: Option<&[CreateEntityOutput]>,
2352) -> UnifiedResult {
2353 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2354 let public_item_outputs = outputs.is_some_and(|outs| {
2355 outs.first()
2356 .and_then(|out| out.entity.as_ref())
2357 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2358 });
2359
2360 let mut columns: Vec<String> = if project_all {
2361 let mut cols: Vec<String> = Vec::new();
2362 if public_item_outputs {
2363 cols.extend(
2364 [
2365 "rid",
2366 "collection",
2367 "kind",
2368 "tenant",
2369 "created_at",
2370 "updated_at",
2371 ]
2372 .into_iter()
2373 .map(str::to_string),
2374 );
2375 } else if outputs.is_some() {
2376 cols.push("red_entity_id".to_string());
2377 }
2378 if let Some(first) = snapshots.first() {
2379 for (name, _) in first {
2380 cols.push(name.clone());
2381 }
2382 }
2383 cols
2384 } else {
2385 items
2386 .iter()
2387 .filter_map(|it| match it {
2388 ReturningItem::Column(c) => Some(c.clone()),
2389 ReturningItem::All => None,
2390 })
2391 .collect()
2392 };
2393 {
2395 let mut seen = std::collections::HashSet::new();
2396 columns.retain(|c| seen.insert(c.clone()));
2397 }
2398
2399 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2400 for (idx, snap) in snapshots.iter().enumerate() {
2401 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2402 if let Some(outs) = outputs {
2403 if let Some(out) = outs.get(idx) {
2404 if let Some(entity) = out.entity.as_ref() {
2405 if let Some(kind) = public_returning_item_kind(entity) {
2406 values.insert(
2407 Arc::clone(&sys_key_rid()),
2408 Value::UnsignedInteger(out.id.raw()),
2409 );
2410 values.insert(
2411 Arc::clone(&sys_key_collection()),
2412 Value::text(entity.kind.collection().to_string()),
2413 );
2414 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2415 values.insert(
2416 Arc::clone(&sys_key_created_at()),
2417 Value::UnsignedInteger(entity.created_at),
2418 );
2419 values.insert(
2420 Arc::clone(&sys_key_updated_at()),
2421 Value::UnsignedInteger(entity.updated_at),
2422 );
2423 values.insert(
2428 Arc::clone(&sys_key_red_entity_id()),
2429 Value::UnsignedInteger(out.id.raw()),
2430 );
2431 } else {
2432 values.insert(
2433 Arc::clone(&sys_key_red_entity_id()),
2434 Value::Integer(out.id.raw() as i64),
2435 );
2436 }
2437 } else {
2438 values.insert(
2439 Arc::clone(&sys_key_red_entity_id()),
2440 Value::Integer(out.id.raw() as i64),
2441 );
2442 }
2443 }
2444 }
2445 for (name, val) in snap {
2446 values.insert(Arc::from(name.as_str()), val.clone());
2447 }
2448 if !values.contains_key("tenant") {
2449 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2450 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2451 }
2452 let mut rec = UnifiedRecord::default();
2453 for col in &columns {
2455 if let Some(v) = values.get(col.as_str()) {
2456 rec.set_arc(Arc::from(col.as_str()), v.clone());
2457 }
2458 }
2459 records.push(rec);
2460 }
2461
2462 UnifiedResult {
2463 columns,
2464 records,
2465 stats: Default::default(),
2466 pre_serialized_json: None,
2467 }
2468}
2469
2470fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2471 match (&entity.kind, &entity.data) {
2472 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2473 Some("node")
2474 }
2475 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2476 Some("edge")
2477 }
2478 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2479 _ => None,
2480 }
2481}
2482
2483fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2484 let Some(row) = entity.data.as_row() else {
2485 return "row";
2486 };
2487
2488 let is_kv = row.named.as_ref().is_some_and(|named| {
2489 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2490 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2491 });
2492 if is_kv {
2493 return "kv";
2494 }
2495
2496 let is_document = row
2497 .named
2498 .as_ref()
2499 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2500 || row.columns.iter().any(runtime_returning_documentish_value);
2501 if is_document {
2502 "document"
2503 } else {
2504 "row"
2505 }
2506}
2507
2508fn runtime_returning_documentish_value(value: &Value) -> bool {
2509 matches!(value, Value::Json(_) | Value::Blob(_))
2510}
2511
2512fn row_insert_returning_snapshots(
2513 outputs: &[CreateEntityOutput],
2514 fallback: Vec<Vec<(String, Value)>>,
2515) -> Vec<Vec<(String, Value)>> {
2516 outputs
2517 .iter()
2518 .enumerate()
2519 .map(|(idx, out)| {
2520 out.entity
2521 .as_ref()
2522 .map(entity_row_fields_snapshot)
2523 .filter(|snap| !snap.is_empty())
2524 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2525 })
2526 .collect()
2527}
2528
2529fn graph_insert_returning_snapshots(
2530 store: &crate::storage::unified::UnifiedStore,
2531 collection: &str,
2532 ids: &[EntityId],
2533) -> Vec<Vec<(String, Value)>> {
2534 let Some(manager) = store.get_collection(collection) else {
2535 return Vec::new();
2536 };
2537
2538 ids.iter()
2539 .filter_map(|id| manager.get(*id))
2540 .filter_map(|entity| {
2541 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2542 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2543 Some(record)
2544 })
2545 .map(|record| {
2546 record
2547 .iter_fields()
2548 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2549 .collect()
2550 })
2551 .collect()
2552}
2553
2554fn graph_update_returning_snapshots(
2555 runtime: &RedDBRuntime,
2556 collection: &str,
2557 ids: &[EntityId],
2558) -> Vec<Vec<(String, Value)>> {
2559 let store = runtime.db().store();
2560 let Some(manager) = store.get_collection(collection) else {
2561 return Vec::new();
2562 };
2563
2564 manager
2565 .get_many(ids)
2566 .into_iter()
2567 .flatten()
2568 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2569 .map(|record| {
2570 record
2571 .iter_fields()
2572 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2573 .collect()
2574 })
2575 .collect()
2576}
2577
2578fn ensure_update_target_contract(
2579 runtime: &RedDBRuntime,
2580 collection: &str,
2581 target: UpdateTarget,
2582) -> RedDBResult<()> {
2583 let Some(contract) = runtime.db().collection_contract(collection) else {
2584 return Ok(());
2585 };
2586 if update_target_contract_is_advisory(&contract)
2587 || update_target_allows_model(contract.declared_model, update_target_model(target))
2588 {
2589 return Ok(());
2590 }
2591 Err(RedDBError::InvalidOperation(format!(
2592 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2593 collection,
2594 update_model_name(contract.declared_model),
2595 update_model_name(update_target_model(target))
2596 )))
2597}
2598
2599fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2600 matches!(
2601 (&contract.origin, &contract.schema_mode),
2602 (
2603 crate::physical::ContractOrigin::Implicit,
2604 crate::catalog::SchemaMode::Dynamic,
2605 )
2606 )
2607}
2608
2609fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2610 match target {
2611 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2612 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2613 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2614 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2615 }
2616}
2617
2618fn update_target_allows_model(
2619 declared_model: crate::catalog::CollectionModel,
2620 requested_model: crate::catalog::CollectionModel,
2621) -> bool {
2622 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2623}
2624
2625fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2626 match model {
2627 crate::catalog::CollectionModel::Table => "table",
2628 crate::catalog::CollectionModel::Document => "document",
2629 crate::catalog::CollectionModel::Graph => "graph",
2630 crate::catalog::CollectionModel::Vector => "vector",
2631 crate::catalog::CollectionModel::Hll => "hll",
2632 crate::catalog::CollectionModel::Sketch => "sketch",
2633 crate::catalog::CollectionModel::Filter => "filter",
2634 crate::catalog::CollectionModel::Kv => "kv",
2635 crate::catalog::CollectionModel::Config => "config",
2636 crate::catalog::CollectionModel::Vault => "vault",
2637 crate::catalog::CollectionModel::Mixed => "mixed",
2638 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2639 crate::catalog::CollectionModel::Queue => "queue",
2640 crate::catalog::CollectionModel::Metrics => "metrics",
2641 }
2642}
2643
2644fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2645 let db = runtime.db();
2646 if let Some(contract) = db.collection_contract(collection) {
2647 let advisory_implicit_dynamic = matches!(
2648 (&contract.origin, &contract.schema_mode),
2649 (
2650 crate::physical::ContractOrigin::Implicit,
2651 crate::catalog::SchemaMode::Dynamic,
2652 )
2653 );
2654 if advisory_implicit_dynamic
2655 || matches!(
2656 contract.declared_model,
2657 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2658 )
2659 {
2660 return Ok(());
2661 }
2662 return Err(RedDBError::InvalidOperation(format!(
2663 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2664 collection, contract.declared_model
2665 )));
2666 }
2667
2668 let now = std::time::SystemTime::now()
2669 .duration_since(std::time::UNIX_EPOCH)
2670 .unwrap_or_default()
2671 .as_millis();
2672 db.save_collection_contract(crate::physical::CollectionContract {
2673 name: collection.to_string(),
2674 declared_model: crate::catalog::CollectionModel::Graph,
2675 schema_mode: crate::catalog::SchemaMode::Dynamic,
2676 origin: crate::physical::ContractOrigin::Implicit,
2677 version: 1,
2678 created_at_unix_ms: now,
2679 updated_at_unix_ms: now,
2680 default_ttl_ms: db.collection_default_ttl_ms(collection),
2681 vector_dimension: None,
2682 vector_metric: None,
2683 context_index_fields: Vec::new(),
2684 declared_columns: Vec::new(),
2685 table_def: None,
2686 timestamps_enabled: false,
2687 context_index_enabled: false,
2688 metrics_raw_retention_ms: None,
2689 metrics_rollup_policies: Vec::new(),
2690 metrics_tenant_identity: None,
2691 metrics_namespace: None,
2692 append_only: false,
2693 subscriptions: Vec::new(),
2694 session_key: None,
2695 session_gap_ms: None,
2696 retention_duration_ms: None,
2697 })
2698 .map(|_| ())
2699 .map_err(|err| RedDBError::Internal(err.to_string()))
2700}
2701
2702fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2703 query
2704 .assignment_exprs
2705 .iter()
2706 .enumerate()
2707 .any(|(idx, (column, expr))| {
2708 query
2709 .compound_assignment_ops
2710 .get(idx)
2711 .is_some_and(|op| op.is_some())
2712 || expr_references_update_column(expr, &query.table, column)
2713 })
2714}
2715
2716fn evaluate_compound_update_assignment(
2717 column: &str,
2718 record: &UnifiedRecord,
2719 op: BinOp,
2720 rhs: Value,
2721) -> RedDBResult<Value> {
2722 let lhs = record.get(column).ok_or_else(|| {
2723 RedDBError::Query(format!(
2724 "compound assignment requires existing numeric field '{column}'"
2725 ))
2726 })?;
2727 if matches!(lhs, Value::Null) {
2728 return Err(RedDBError::Query(format!(
2729 "compound assignment requires non-null numeric field '{column}'"
2730 )));
2731 }
2732 apply_compound_numeric_op(column, op, lhs, &rhs)
2733}
2734
2735fn apply_compound_numeric_op(
2736 column: &str,
2737 op: BinOp,
2738 lhs: &Value,
2739 rhs: &Value,
2740) -> RedDBResult<Value> {
2741 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2742 return Err(RedDBError::Query(format!(
2743 "compound assignment requires numeric field '{column}'"
2744 )));
2745 };
2746 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2747 return Err(RedDBError::Query(format!(
2748 "compound assignment requires numeric right-hand value for field '{column}'"
2749 )));
2750 };
2751
2752 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2753 let a = lhs_number.as_f64();
2754 let b = rhs_number.as_f64();
2755 let out = match op {
2756 BinOp::Add => a + b,
2757 BinOp::Sub => a - b,
2758 BinOp::Mul => a * b,
2759 BinOp::Div => {
2760 if b == 0.0 {
2761 return Err(RedDBError::Query(format!(
2762 "division by zero in compound assignment for field '{column}'"
2763 )));
2764 }
2765 a / b
2766 }
2767 BinOp::Mod => {
2768 if b == 0.0 {
2769 return Err(RedDBError::Query(format!(
2770 "modulo by zero in compound assignment for field '{column}'"
2771 )));
2772 }
2773 a % b
2774 }
2775 _ => {
2776 return Err(RedDBError::Query(format!(
2777 "unsupported compound assignment operator for field '{column}'"
2778 )));
2779 }
2780 };
2781 if !out.is_finite() {
2782 return Err(RedDBError::Query(format!(
2783 "numeric overflow in compound assignment for field '{column}'"
2784 )));
2785 }
2786 return Ok(Value::Float(out));
2787 }
2788
2789 let a = lhs_number.as_i128();
2790 let b = rhs_number.as_i128();
2791 let out = match op {
2792 BinOp::Add => a.checked_add(b),
2793 BinOp::Sub => a.checked_sub(b),
2794 BinOp::Mul => a.checked_mul(b),
2795 BinOp::Mod => {
2796 if b == 0 {
2797 return Err(RedDBError::Query(format!(
2798 "modulo by zero in compound assignment for field '{column}'"
2799 )));
2800 }
2801 a.checked_rem(b)
2802 }
2803 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2804 _ => None,
2805 }
2806 .ok_or_else(|| {
2807 RedDBError::Query(format!(
2808 "numeric overflow in compound assignment for field '{column}'"
2809 ))
2810 })?;
2811
2812 if matches!(lhs, Value::UnsignedInteger(_)) {
2813 let value = u64::try_from(out).map_err(|_| {
2814 RedDBError::Query(format!(
2815 "numeric overflow in compound assignment for field '{column}'"
2816 ))
2817 })?;
2818 Ok(Value::UnsignedInteger(value))
2819 } else {
2820 let value = i64::try_from(out).map_err(|_| {
2821 RedDBError::Query(format!(
2822 "numeric overflow in compound assignment for field '{column}'"
2823 ))
2824 })?;
2825 Ok(Value::Integer(value))
2826 }
2827}
2828
2829#[derive(Clone, Copy)]
2830enum CompoundNumber {
2831 Integer(i128),
2832 Float(f64),
2833}
2834
2835impl CompoundNumber {
2836 fn from_value(value: &Value) -> Option<Self> {
2837 match value {
2838 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2839 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2840 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2841 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2842 _ => None,
2843 }
2844 }
2845
2846 fn is_float(self) -> bool {
2847 matches!(self, Self::Float(_))
2848 }
2849
2850 fn as_f64(self) -> f64 {
2851 match self {
2852 Self::Integer(value) => value as f64,
2853 Self::Float(value) => value,
2854 }
2855 }
2856
2857 fn as_i128(self) -> i128 {
2858 match self {
2859 Self::Integer(value) => value,
2860 Self::Float(_) => unreachable!("float compound number used as integer"),
2861 }
2862 }
2863}
2864
2865fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2866 match expr {
2867 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2868 Expr::Column { field, .. } => {
2869 field_ref_matches_update_column(field, table_name, target_column)
2870 }
2871 Expr::BinaryOp { lhs, rhs, .. } => {
2872 expr_references_update_column(lhs, table_name, target_column)
2873 || expr_references_update_column(rhs, table_name, target_column)
2874 }
2875 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2876 expr_references_update_column(operand, table_name, target_column)
2877 }
2878 Expr::FunctionCall { args, .. } => args
2879 .iter()
2880 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2881 Expr::Case {
2882 branches, else_, ..
2883 } => {
2884 branches.iter().any(|(cond, value)| {
2885 expr_references_update_column(cond, table_name, target_column)
2886 || expr_references_update_column(value, table_name, target_column)
2887 }) || else_
2888 .as_deref()
2889 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2890 }
2891 Expr::IsNull { operand, .. } => {
2892 expr_references_update_column(operand, table_name, target_column)
2893 }
2894 Expr::InList { target, values, .. } => {
2895 expr_references_update_column(target, table_name, target_column)
2896 || values
2897 .iter()
2898 .any(|value| expr_references_update_column(value, table_name, target_column))
2899 }
2900 Expr::Between {
2901 target, low, high, ..
2902 } => {
2903 expr_references_update_column(target, table_name, target_column)
2904 || expr_references_update_column(low, table_name, target_column)
2905 || expr_references_update_column(high, table_name, target_column)
2906 }
2907 Expr::WindowFunctionCall { args, window, .. } => {
2908 args.iter()
2909 .any(|arg| expr_references_update_column(arg, table_name, target_column))
2910 || window
2911 .partition_by
2912 .iter()
2913 .any(|e| expr_references_update_column(e, table_name, target_column))
2914 || window
2915 .order_by
2916 .iter()
2917 .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
2918 }
2919 }
2920}
2921
2922fn field_ref_matches_update_column(
2923 field: &FieldRef,
2924 table_name: &str,
2925 target_column: &str,
2926) -> bool {
2927 match field {
2928 FieldRef::TableColumn { table, column } => {
2929 column.eq_ignore_ascii_case(target_column)
2930 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2931 }
2932 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2933 false
2934 }
2935 }
2936}
2937
2938fn resolve_update_entity_by_logical_id(
2939 runtime: &RedDBRuntime,
2940 table: &str,
2941 logical_id: EntityId,
2942) -> Option<UnifiedEntity> {
2943 let store = runtime.inner.db.store();
2944 if let Some(entity) =
2945 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2946 .resolve_logical_id(&store, table, logical_id)
2947 {
2948 return Some(entity);
2949 }
2950 store.get(table, logical_id)
2953}
2954
2955fn update_cdc_item_kind(
2956 runtime: &RedDBRuntime,
2957 collection: &str,
2958 entity: &UnifiedEntity,
2959) -> &'static str {
2960 match &entity.data {
2961 EntityData::Node(_) => return "node",
2962 EntityData::Edge(_) => return "edge",
2963 _ => {}
2964 }
2965
2966 match runtime
2967 .db()
2968 .collection_contract(collection)
2969 .map(|contract| contract.declared_model)
2970 {
2971 Some(crate::catalog::CollectionModel::Document) => "document",
2972 Some(crate::catalog::CollectionModel::Kv)
2973 | Some(crate::catalog::CollectionModel::Vault) => "kv",
2974 _ => "row",
2975 }
2976}
2977
2978fn ordered_update_target_ids(
2979 manager: &Arc<crate::storage::SegmentManager>,
2980 entity_ids: &[EntityId],
2981 order_by: &[OrderByClause],
2982 limit: Option<usize>,
2983) -> Vec<EntityId> {
2984 let mut entities: Vec<UnifiedEntity> =
2985 manager.get_many(entity_ids).into_iter().flatten().collect();
2986 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
2987 if let Some(limit) = limit {
2988 entities.truncate(limit);
2989 }
2990 entities.into_iter().map(|entity| entity.id).collect()
2991}
2992
2993fn compare_update_order(
2994 left: &UnifiedEntity,
2995 right: &UnifiedEntity,
2996 order_by: &[OrderByClause],
2997) -> Ordering {
2998 for clause in order_by {
2999 let left_value = update_order_value(left, &clause.field);
3000 let right_value = update_order_value(right, &clause.field);
3001 let ordering = compare_update_order_values(
3002 left_value.as_ref(),
3003 right_value.as_ref(),
3004 clause.nulls_first,
3005 );
3006 if ordering != Ordering::Equal {
3007 return if clause.ascending {
3008 ordering
3009 } else {
3010 ordering.reverse()
3011 };
3012 }
3013 }
3014 left.logical_id().raw().cmp(&right.logical_id().raw())
3015}
3016
3017fn compare_update_order_values(
3018 left: Option<&Value>,
3019 right: Option<&Value>,
3020 nulls_first: bool,
3021) -> Ordering {
3022 match (left, right) {
3023 (None, None) => Ordering::Equal,
3024 (None, Some(_)) => {
3025 if nulls_first {
3026 Ordering::Less
3027 } else {
3028 Ordering::Greater
3029 }
3030 }
3031 (Some(_), None) => {
3032 if nulls_first {
3033 Ordering::Greater
3034 } else {
3035 Ordering::Less
3036 }
3037 }
3038 (Some(left), Some(right)) => {
3039 crate::storage::query::value_compare::total_compare_values(left, right)
3040 }
3041 }
3042}
3043
3044fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3045 let FieldRef::TableColumn { table, column } = field else {
3046 return None;
3047 };
3048 if !table.is_empty() {
3049 return None;
3050 }
3051 if column.eq_ignore_ascii_case("rid") {
3052 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3053 }
3054 match &entity.data {
3055 EntityData::Row(row) => row.get_field(column).cloned(),
3056 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3057 .and_then(|record| record.get(column).cloned()),
3058 _ => None,
3059 }
3060}
3061
3062fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3063 if columns.is_empty() {
3064 return columns;
3065 }
3066
3067 let mut unique = Vec::with_capacity(columns.len());
3068 for column in columns.drain(..) {
3069 if !unique
3070 .iter()
3071 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3072 {
3073 unique.push(column);
3074 }
3075 }
3076 unique
3077}
3078
3079const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3084
3085fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3086 if column.eq_ignore_ascii_case("_ttl") {
3087 Some(SQL_TTL_METADATA_COLUMNS[0])
3088 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3089 Some(SQL_TTL_METADATA_COLUMNS[1])
3090 } else if column.eq_ignore_ascii_case("_expires_at") {
3091 Some(SQL_TTL_METADATA_COLUMNS[2])
3092 } else {
3093 None
3094 }
3095}
3096
3097fn canonicalize_sql_ttl_metadata(
3102 key: &'static str,
3103 value: MetadataValue,
3104) -> (&'static str, MetadataValue) {
3105 if key != "_ttl" {
3106 return (key, value);
3107 }
3108 let scaled = match value {
3109 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3110 MetadataValue::Timestamp(ms_or_s) => {
3111 MetadataValue::Timestamp(ms_or_s)
3114 }
3115 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3116 other => other,
3117 };
3118 ("_ttl_ms", scaled)
3119}
3120
3121pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3125
3126impl RedDBRuntime {
3127 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3133 match value {
3134 Value::Password(marked) => {
3135 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3136 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3137 } else {
3138 Ok(Value::Password(marked))
3139 }
3140 }
3141 Value::Secret(bytes) => {
3142 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3143 if !self.secret_auto_encrypt() {
3144 return Err(RedDBError::Query(
3145 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3146 is false. Insert pre-encrypted bytes directly instead."
3147 .to_string(),
3148 ));
3149 }
3150 let key = self.secret_aes_key().ok_or_else(|| {
3151 RedDBError::Query(
3152 "SECRET() column encryption requires a bootstrapped \
3153 vault (red.secret.aes_key is missing). Start the server \
3154 with --vault to enable."
3155 .to_string(),
3156 )
3157 })?;
3158 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3159 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3160 } else {
3161 Ok(Value::Secret(bytes))
3162 }
3163 }
3164 other => Ok(other),
3165 }
3166 }
3167}
3168
3169fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3172 let nonce_bytes = crate::auth::store::random_bytes(12);
3173 let mut nonce = [0u8; 12];
3174 nonce.copy_from_slice(&nonce_bytes[..12]);
3175 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3176 let mut out = Vec::with_capacity(12 + ct.len());
3177 out.extend_from_slice(&nonce);
3178 out.extend_from_slice(&ct);
3179 out
3180}
3181
3182pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3186 if payload.len() < 12 {
3187 return None;
3188 }
3189 let mut nonce = [0u8; 12];
3190 nonce.copy_from_slice(&payload[..12]);
3191 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3192}
3193
3194fn split_insert_metadata(
3195 runtime: &RedDBRuntime,
3196 columns: &[String],
3197 values: &[Value],
3198) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3199 let mut fields = Vec::new();
3200 let mut metadata = Vec::new();
3201
3202 for (column, value) in columns.iter().zip(values.iter()) {
3203 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3205 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3206 let (canonical_key, canonical_value) =
3207 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3208 metadata.push((canonical_key.to_string(), canonical_value));
3209 continue;
3210 }
3211 fields.push((
3212 column.clone(),
3213 runtime.resolve_crypto_sentinel(value.clone())?,
3214 ));
3215 }
3216
3217 Ok((fields, metadata))
3218}
3219
3220fn merge_with_clauses(
3222 metadata: &mut Vec<(String, MetadataValue)>,
3223 ttl_ms: Option<u64>,
3224 expires_at_ms: Option<u64>,
3225 with_metadata: &[(String, Value)],
3226) {
3227 if let Some(ms) = ttl_ms {
3228 metadata.push((
3229 "_ttl_ms".to_string(),
3230 if ms <= i64::MAX as u64 {
3231 MetadataValue::Int(ms as i64)
3232 } else {
3233 MetadataValue::Timestamp(ms)
3234 },
3235 ));
3236 }
3237 if let Some(ms) = expires_at_ms {
3238 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3239 }
3240 for (key, value) in with_metadata {
3241 let meta_value = match value {
3242 Value::Text(s) => MetadataValue::String(s.to_string()),
3243 Value::Integer(n) => MetadataValue::Int(*n),
3244 Value::Float(n) => MetadataValue::Float(*n),
3245 Value::Boolean(b) => MetadataValue::Bool(*b),
3246 _ => MetadataValue::String(value.to_string()),
3247 };
3248 metadata.push((key.clone(), meta_value));
3249 }
3250}
3251
3252fn merge_vector_metadata_column(
3253 metadata: &mut Vec<(String, MetadataValue)>,
3254 columns: &[String],
3255 values: &[Value],
3256) -> RedDBResult<()> {
3257 let Some(value) = columns
3258 .iter()
3259 .position(|column| column.eq_ignore_ascii_case("metadata"))
3260 .map(|index| &values[index])
3261 else {
3262 return Ok(());
3263 };
3264 let json = match value {
3265 Value::Null => return Ok(()),
3266 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3267 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3268 })?,
3269 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3270 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3271 })?,
3272 other => {
3273 return Err(RedDBError::Query(format!(
3274 "column 'metadata' expected JSON object, got {other:?}"
3275 )))
3276 }
3277 };
3278 let parsed = metadata_from_json(&json)?;
3279 for (key, value) in parsed.iter() {
3280 metadata.push((key.clone(), value.clone()));
3281 }
3282 Ok(())
3283}
3284
3285fn apply_collection_default_ttl_metadata(
3286 runtime: &RedDBRuntime,
3287 collection: &str,
3288 metadata: &mut Vec<(String, MetadataValue)>,
3289) {
3290 if has_internal_ttl_metadata(metadata) {
3291 return;
3292 }
3293
3294 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3295 return;
3296 };
3297
3298 metadata.push((
3299 "_ttl_ms".to_string(),
3300 if default_ttl_ms <= i64::MAX as u64 {
3301 MetadataValue::Int(default_ttl_ms as i64)
3302 } else {
3303 MetadataValue::Timestamp(default_ttl_ms)
3304 },
3305 ));
3306}
3307
3308fn ensure_non_tree_reserved_metadata_entries(
3309 metadata: &[(String, MetadataValue)],
3310) -> RedDBResult<()> {
3311 for (key, _) in metadata {
3312 ensure_non_tree_reserved_metadata_key(key)?;
3313 }
3314 Ok(())
3315}
3316
3317fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3318 if key.starts_with(TREE_METADATA_PREFIX) {
3319 return Err(RedDBError::Query(format!(
3320 "metadata key '{}' is reserved for managed trees",
3321 key
3322 )));
3323 }
3324 Ok(())
3325}
3326
3327fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3328 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3329 return Err(RedDBError::Query(format!(
3330 "edge label '{}' is reserved for managed trees",
3331 TREE_CHILD_EDGE_LABEL
3332 )));
3333 }
3334 Ok(())
3335}
3336
3337fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3338 let mut columns = Vec::with_capacity(pairs.len());
3339 let mut values = Vec::with_capacity(pairs.len());
3340
3341 for (column, value) in pairs {
3342 columns.push(column.clone());
3343 values.push(value.clone());
3344 }
3345
3346 (columns, values)
3347}
3348
3349fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3351 for (i, col) in columns.iter().enumerate() {
3352 if col.eq_ignore_ascii_case(name) {
3353 return Ok(values[i].clone());
3354 }
3355 }
3356 Err(RedDBError::Query(format!(
3357 "required column '{name}' not found in INSERT"
3358 )))
3359}
3360
3361fn find_column_value_string(
3363 columns: &[String],
3364 values: &[Value],
3365 name: &str,
3366) -> RedDBResult<String> {
3367 let val = find_column_value(columns, values, name)?;
3368 match val {
3369 Value::Text(s) => Ok(s.to_string()),
3370 Value::Integer(n) => Ok(n.to_string()),
3371 Value::Float(n) => Ok(n.to_string()),
3372 other => Err(RedDBError::Query(format!(
3373 "column '{name}' expected text, got {other:?}"
3374 ))),
3375 }
3376}
3377
3378fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3379 let val = find_column_value(columns, values, name)?;
3380 match val {
3381 Value::Float(n) => Ok(n),
3382 Value::Integer(n) => Ok(n as f64),
3383 Value::UnsignedInteger(n) => Ok(n as f64),
3384 Value::Text(s) => s
3385 .parse::<f64>()
3386 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3387 other => Err(RedDBError::Query(format!(
3388 "column '{name}' expected number, got {other:?}"
3389 ))),
3390 }
3391}
3392
3393fn find_column_value_opt_string(
3395 columns: &[String],
3396 values: &[Value],
3397 name: &str,
3398) -> Option<String> {
3399 for (i, col) in columns.iter().enumerate() {
3400 if col.eq_ignore_ascii_case(name) {
3401 return match &values[i] {
3402 Value::Null => None,
3403 Value::Text(s) => Some(s.to_string()),
3404 Value::Integer(n) => Some(n.to_string()),
3405 Value::Float(n) => Some(n.to_string()),
3406 _ => None,
3407 };
3408 }
3409 }
3410 None
3411}
3412
3413fn resolve_edge_endpoint(
3420 store: &crate::storage::unified::UnifiedStore,
3421 collection: &str,
3422 columns: &[String],
3423 values: &[Value],
3424 name: &str,
3425) -> RedDBResult<u64> {
3426 let val = find_column_value(columns, values, name)?;
3427 match val {
3428 Value::Integer(n) => Ok(n as u64),
3429 Value::UnsignedInteger(n) => Ok(n),
3430 Value::Text(s) => {
3431 if let Ok(n) = s.parse::<u64>() {
3432 return Ok(n);
3433 }
3434 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3435 match matches.len() {
3436 0 => Err(RedDBError::Query(format!(
3437 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3438 ))),
3439 1 => Ok(matches[0].raw()),
3440 n => Err(RedDBError::Query(format!(
3441 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3442 ))),
3443 }
3444 }
3445 other => Err(RedDBError::Query(format!(
3446 "column '{name}' expected integer or node label, got {other:?}"
3447 ))),
3448 }
3449}
3450
3451fn resolve_edge_endpoint_any(
3452 store: &crate::storage::unified::UnifiedStore,
3453 collection: &str,
3454 columns: &[String],
3455 values: &[Value],
3456 names: &[&str],
3457) -> RedDBResult<u64> {
3458 for name in names {
3459 if columns
3460 .iter()
3461 .any(|column| column.eq_ignore_ascii_case(name))
3462 {
3463 return resolve_edge_endpoint(store, collection, columns, values, name);
3464 }
3465 }
3466
3467 Err(RedDBError::Query(format!(
3468 "required column '{}' not found in INSERT",
3469 names.first().copied().unwrap_or("from_rid")
3470 )))
3471}
3472
3473fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3475 let val = find_column_value(columns, values, name)?;
3476 match val {
3477 Value::Integer(n) => Ok(n as u64),
3478 Value::UnsignedInteger(n) => Ok(n),
3479 Value::Text(s) => s
3480 .parse::<u64>()
3481 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3482 other => Err(RedDBError::Query(format!(
3483 "column '{name}' expected integer, got {other:?}"
3484 ))),
3485 }
3486}
3487
3488fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3490 for (i, col) in columns.iter().enumerate() {
3491 if col.eq_ignore_ascii_case(name) {
3492 return match &values[i] {
3493 Value::Float(n) => Some(*n as f32),
3494 Value::Integer(n) => Some(*n as f32),
3495 Value::Null => None,
3496 _ => None,
3497 };
3498 }
3499 }
3500 None
3501}
3502
3503fn find_column_value_vec_f32(
3505 columns: &[String],
3506 values: &[Value],
3507 name: &str,
3508) -> RedDBResult<Vec<f32>> {
3509 let val = find_column_value(columns, values, name)?;
3510 match val {
3511 Value::Vector(v) => Ok(v),
3512 Value::Json(bytes) => {
3513 let s = std::str::from_utf8(&bytes).map_err(|_| {
3515 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3516 })?;
3517 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3518 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3519 })?;
3520 Ok(arr)
3521 }
3522 other => Err(RedDBError::Query(format!(
3523 "column '{name}' expected vector, got {other:?}"
3524 ))),
3525 }
3526}
3527
3528fn find_column_value_vec_f32_any(
3529 columns: &[String],
3530 values: &[Value],
3531 names: &[&str],
3532) -> RedDBResult<Vec<f32>> {
3533 for name in names {
3534 if columns
3535 .iter()
3536 .any(|column| column.eq_ignore_ascii_case(name))
3537 {
3538 return find_column_value_vec_f32(columns, values, name);
3539 }
3540 }
3541 Err(RedDBError::Query(format!(
3542 "required vector column '{}' not found in INSERT",
3543 names.join("' or '")
3544 )))
3545}
3546
3547fn extract_remaining_properties(
3549 columns: &[String],
3550 values: &[Value],
3551 exclude: &[&str],
3552) -> Vec<(String, Value)> {
3553 columns
3554 .iter()
3555 .zip(values.iter())
3556 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3557 .map(|(col, val)| (col.clone(), val.clone()))
3558 .collect()
3559}
3560
3561fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3562 let mut invalid = Vec::new();
3563 for column in columns {
3564 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3565 invalid.push(column.clone());
3566 }
3567 }
3568
3569 if invalid.is_empty() {
3570 Ok(())
3571 } else {
3572 Err(RedDBError::Query(format!(
3573 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3574 invalid.join(", ")
3575 )))
3576 }
3577}
3578
3579fn is_timeseries_insert_column(column: &str) -> bool {
3580 matches!(
3581 column.to_ascii_lowercase().as_str(),
3582 "metric"
3583 | "value"
3584 | "tags"
3585 | "timestamp"
3586 | "timestamp_ns"
3587 | "time"
3588 | "event_name"
3593 | "payload"
3594 )
3595}
3596
3597fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3598 let mut found = None;
3599
3600 for alias in ["timestamp_ns", "timestamp", "time"] {
3601 for (index, column) in columns.iter().enumerate() {
3602 if !column.eq_ignore_ascii_case(alias) {
3603 continue;
3604 }
3605
3606 if found.is_some() {
3607 return Err(RedDBError::Query(
3608 "timeseries INSERT accepts only one timestamp column".to_string(),
3609 ));
3610 }
3611
3612 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3613 }
3614 }
3615
3616 Ok(found)
3617}
3618
3619fn find_timeseries_tags(
3620 columns: &[String],
3621 values: &[Value],
3622) -> RedDBResult<std::collections::HashMap<String, String>> {
3623 for (index, column) in columns.iter().enumerate() {
3624 if column.eq_ignore_ascii_case("tags") {
3625 return parse_timeseries_tags(&values[index]);
3626 }
3627 }
3628 Ok(std::collections::HashMap::new())
3629}
3630
3631fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3632 match value {
3633 Value::Null => Ok(std::collections::HashMap::new()),
3634 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3635 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3636 other => Err(RedDBError::Query(format!(
3637 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3638 ))),
3639 }
3640}
3641
3642fn parse_timeseries_tags_json(
3643 bytes: &[u8],
3644) -> RedDBResult<std::collections::HashMap<String, String>> {
3645 let json: crate::json::Value = crate::json::from_slice(bytes)
3646 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3647
3648 let object = match json {
3649 crate::json::Value::Object(object) => object,
3650 other => {
3651 return Err(RedDBError::Query(format!(
3652 "timeseries tags must be a JSON object, got {other:?}"
3653 )))
3654 }
3655 };
3656
3657 let mut tags = std::collections::HashMap::with_capacity(object.len());
3658 for (key, value) in object {
3659 tags.insert(key, json_tag_value_to_string(&value));
3660 }
3661 Ok(tags)
3662}
3663
3664fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3680 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3681 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3682 buf.push_str(&value.to_string_compact());
3683 buf
3684}
3685
3686fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3687 match value {
3688 Value::UnsignedInteger(value) => Ok(*value),
3689 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3690 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3691 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3692 RedDBError::Query(format!(
3693 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3694 ))
3695 }),
3696 other => Err(RedDBError::Query(format!(
3697 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3698 ))),
3699 }
3700}
3701
3702fn current_unix_ns() -> u64 {
3703 std::time::SystemTime::now()
3704 .duration_since(std::time::UNIX_EPOCH)
3705 .unwrap_or_default()
3706 .as_nanos()
3707 .min(u128::from(u64::MAX)) as u64
3708}
3709
3710fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3711 use crate::json::{Map, Value as JV};
3712 match value {
3713 MetadataValue::Null => JV::Null,
3714 MetadataValue::Bool(value) => JV::Bool(*value),
3715 MetadataValue::Int(value) => JV::Number(*value as f64),
3716 MetadataValue::Float(value) => JV::Number(*value),
3717 MetadataValue::String(value) => JV::String(value.clone()),
3718 MetadataValue::Bytes(value) => JV::Array(
3719 value
3720 .iter()
3721 .map(|value| JV::Number(*value as f64))
3722 .collect(),
3723 ),
3724 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3725 MetadataValue::Array(values) => {
3726 JV::Array(values.iter().map(metadata_value_to_json).collect())
3727 }
3728 MetadataValue::Object(object) => {
3729 let entries = object
3730 .iter()
3731 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3732 .collect();
3733 JV::Object(entries)
3734 }
3735 MetadataValue::Geo { lat, lon } => {
3736 let mut object = Map::new();
3737 object.insert("lat".to_string(), JV::Number(*lat));
3738 object.insert("lon".to_string(), JV::Number(*lon));
3739 JV::Object(object)
3740 }
3741 MetadataValue::Reference(target) => {
3742 let mut object = Map::new();
3743 object.insert(
3744 "collection".to_string(),
3745 JV::String(target.collection().to_string()),
3746 );
3747 object.insert(
3748 "entity_id".to_string(),
3749 JV::Number(target.entity_id().raw() as f64),
3750 );
3751 JV::Object(object)
3752 }
3753 MetadataValue::References(values) => {
3754 let refs = values
3755 .iter()
3756 .map(|target| {
3757 let mut object = Map::new();
3758 object.insert(
3759 "collection".to_string(),
3760 JV::String(target.collection().to_string()),
3761 );
3762 object.insert(
3763 "entity_id".to_string(),
3764 JV::Number(target.entity_id().raw() as f64),
3765 );
3766 JV::Object(object)
3767 })
3768 .collect();
3769 JV::Array(refs)
3770 }
3771 }
3772}
3773
3774fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3775 match value {
3776 Value::Null => MetadataValue::Null,
3777 Value::Boolean(value) => MetadataValue::Bool(*value),
3778 Value::Integer(value) => MetadataValue::Int(*value),
3779 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3780 Value::Float(value) => MetadataValue::Float(*value),
3781 Value::Text(value) => MetadataValue::String(value.to_string()),
3782 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3783 Value::Timestamp(value) => {
3784 if *value >= 0 {
3785 metadata_u64_to_value(*value as u64)
3786 } else {
3787 MetadataValue::Int(*value)
3788 }
3789 }
3790 Value::TimestampMs(value) => {
3791 if *value >= 0 {
3792 metadata_u64_to_value(*value as u64)
3793 } else {
3794 MetadataValue::Int(*value)
3795 }
3796 }
3797 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3798 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3799 Value::Date(value) => MetadataValue::String(value.to_string()),
3800 Value::Time(value) => MetadataValue::String(value.to_string()),
3801 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3802 Value::Ipv4(value) => MetadataValue::String(format!(
3803 "{}.{}.{}.{}",
3804 (value >> 24) & 0xFF,
3805 (value >> 16) & 0xFF,
3806 (value >> 8) & 0xFF,
3807 value & 0xFF
3808 )),
3809 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3810 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3811 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3812 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3813 lat: *lat as f64 / 1_000_000.0,
3814 lon: *lon as f64 / 1_000_000.0,
3815 },
3816 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3817 Value::TableRef(value) => MetadataValue::String(value.clone()),
3818 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3819 Value::Password(value) => MetadataValue::String(value.clone()),
3820 Value::Array(values) => {
3821 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3822 }
3823 _ => MetadataValue::String(value.to_string()),
3824 }
3825}
3826
3827fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3828 match value {
3829 Value::Null => Ok(MetadataValue::Null),
3830 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3831 Value::Integer(_) => Err(RedDBError::Query(format!(
3832 "column '{field}' must be non-negative for TTL metadata"
3833 ))),
3834 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3835 Value::Float(value) if value.is_finite() => {
3836 if value.fract().abs() >= f64::EPSILON {
3837 return Err(RedDBError::Query(format!(
3838 "column '{field}' must be an integer (TTL metadata must be an integer)"
3839 )));
3840 }
3841 if *value < 0.0 {
3842 return Err(RedDBError::Query(format!(
3843 "column '{field}' must be non-negative for TTL metadata"
3844 )));
3845 }
3846 if *value > u64::MAX as f64 {
3847 return Err(RedDBError::Query(format!(
3848 "column '{field}' value is too large"
3849 )));
3850 }
3851 Ok(metadata_u64_to_value(*value as u64))
3852 }
3853 Value::Float(_) => Err(RedDBError::Query(format!(
3854 "column '{field}' must be a finite number"
3855 ))),
3856 Value::Text(value) => {
3857 let value = value.trim();
3858 if let Ok(value) = value.parse::<u64>() {
3859 Ok(metadata_u64_to_value(value))
3860 } else if let Ok(value) = value.parse::<i64>() {
3861 if value < 0 {
3862 return Err(RedDBError::Query(format!(
3863 "column '{field}' must be non-negative for TTL metadata"
3864 )));
3865 }
3866 Ok(metadata_u64_to_value(value as u64))
3867 } else if let Ok(value) = value.parse::<f64>() {
3868 if !value.is_finite() {
3869 return Err(RedDBError::Query(format!(
3870 "column '{field}' must be a finite number"
3871 )));
3872 }
3873 if value.fract().abs() >= f64::EPSILON {
3874 return Err(RedDBError::Query(format!(
3875 "column '{field}' must be an integer (TTL metadata must be an integer)"
3876 )));
3877 }
3878 if value < 0.0 {
3879 return Err(RedDBError::Query(format!(
3880 "column '{field}' must be non-negative for TTL metadata"
3881 )));
3882 }
3883 if value > u64::MAX as f64 {
3884 return Err(RedDBError::Query(format!(
3885 "column '{field}' value is too large"
3886 )));
3887 }
3888 Ok(metadata_u64_to_value(value as u64))
3889 } else {
3890 Err(RedDBError::Query(format!(
3891 "column '{field}' expects a numeric value for TTL metadata"
3892 )))
3893 }
3894 }
3895 _ => Err(RedDBError::Query(format!(
3896 "column '{field}' expects a numeric value for TTL metadata"
3897 ))),
3898 }
3899}
3900
3901fn metadata_u64_to_value(value: u64) -> MetadataValue {
3902 if value <= i64::MAX as u64 {
3903 MetadataValue::Int(value as i64)
3904 } else {
3905 MetadataValue::Timestamp(value)
3906 }
3907}
3908
3909fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3915 let json = match value {
3916 Value::Null => return false,
3917 Value::Json(bytes) | Value::Blob(bytes) => {
3918 match crate::json::from_slice::<crate::json::Value>(bytes) {
3919 Ok(v) => v,
3920 Err(_) => return false,
3921 }
3922 }
3923 Value::Text(s) => {
3924 let trimmed = s.trim_start();
3925 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3926 return false;
3927 }
3928 match crate::json::from_str::<crate::json::Value>(s) {
3929 Ok(v) => v,
3930 Err(_) => return false,
3931 }
3932 }
3933 _ => return false,
3934 };
3935 let mut cursor = &json;
3936 for seg in tail.split('.') {
3937 match cursor {
3938 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3939 Some((_, v)) => cursor = v,
3940 None => return false,
3941 },
3942 _ => return false,
3943 }
3944 }
3945 !matches!(cursor, crate::json::Value::Null)
3946}
3947
3948fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3959 let mut root = match current {
3960 Value::Null => crate::json::Value::Object(Default::default()),
3961 Value::Json(bytes) | Value::Blob(bytes) => {
3962 crate::json::from_slice(&bytes).map_err(|err| {
3963 RedDBError::Query(format!(
3964 "tenant auto-fill: root column is not valid JSON ({err})"
3965 ))
3966 })?
3967 }
3968 Value::Text(s) => {
3969 if s.trim().is_empty() {
3970 crate::json::Value::Object(Default::default())
3971 } else {
3972 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3973 RedDBError::Query(format!(
3974 "tenant auto-fill: text root is not valid JSON ({err})"
3975 ))
3976 })?
3977 }
3978 }
3979 other => {
3980 return Err(RedDBError::Query(format!(
3981 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
3982 )));
3983 }
3984 };
3985
3986 let segments: Vec<&str> = tail.split('.').collect();
3988 let mut cursor: &mut crate::json::Value = &mut root;
3989 for (i, seg) in segments.iter().enumerate() {
3990 let is_last = i + 1 == segments.len();
3991 let map = match cursor {
3992 crate::json::Value::Object(m) => m,
3993 _ => {
3994 return Err(RedDBError::Query(format!(
3995 "tenant auto-fill: segment '{seg}' is not inside an object"
3996 )));
3997 }
3998 };
3999 if is_last {
4000 map.insert(
4001 seg.to_string(),
4002 crate::json::Value::String(tenant_id.to_string()),
4003 );
4004 break;
4005 }
4006 cursor = map
4007 .entry(seg.to_string())
4008 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4009 }
4010
4011 let bytes = crate::json::to_vec(&root).map_err(|err| {
4012 RedDBError::Query(format!(
4013 "tenant auto-fill: failed to re-serialize JSON ({err})"
4014 ))
4015 })?;
4016 Ok(Value::Json(bytes))
4017}
4018
4019#[cfg(test)]
4020mod tests {
4021 use crate::storage::schema::Value;
4022 use crate::storage::wal::{WalReader, WalRecord};
4023 use crate::{RedDBOptions, RedDBRuntime};
4024 use std::path::Path;
4025
4026 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4027 WalReader::open(wal_path)
4028 .expect("wal opens")
4029 .iter()
4030 .map(|record| record.expect("wal record decodes").1)
4031 .filter_map(|record| match record {
4032 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4033 _ => None,
4034 })
4035 .collect()
4036 }
4037
4038 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4039 action
4040 .windows(needle.len())
4041 .any(|window| window == needle.as_bytes())
4042 }
4043
4044 fn assert_statement_writes_collections_in_one_new_wal_batch(
4045 rt: &RedDBRuntime,
4046 wal_path: &Path,
4047 statement: &str,
4048 source: &str,
4049 event_queue: &str,
4050 ) {
4051 let before_batches = store_commit_batches(wal_path).len();
4052
4053 rt.execute_query(statement).unwrap();
4054
4055 let batches = store_commit_batches(wal_path);
4056 let statement_batches = &batches[before_batches..];
4057 let source_batch = statement_batches
4058 .iter()
4059 .position(|actions| {
4060 actions.iter().any(|action| {
4061 action_contains_text(action, source)
4062 && !action_contains_text(action, event_queue)
4063 })
4064 })
4065 .expect("source collection write batch is present");
4066 let event_batch = statement_batches
4067 .iter()
4068 .position(|actions| {
4069 actions
4070 .iter()
4071 .any(|action| action_contains_text(action, event_queue))
4072 })
4073 .expect("event queue write batch is present");
4074
4075 assert_eq!(
4076 source_batch, event_batch,
4077 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4078 );
4079 }
4080
4081 #[test]
4082 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4083 let dir = tempfile::tempdir().unwrap();
4084 let db_path = dir.path().join("events_dual_write.rdb");
4085 let wal_path = db_path.with_extension("rdb-uwal");
4086 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4087
4088 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4089 .unwrap();
4090 assert_statement_writes_collections_in_one_new_wal_batch(
4091 &rt,
4092 &wal_path,
4093 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4094 "users",
4095 "users_events",
4096 );
4097 }
4098
4099 #[test]
4100 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4101 let dir = tempfile::tempdir().unwrap();
4102 let db_path = dir.path().join("events_update_atomic.rdb");
4103 let wal_path = db_path.with_extension("rdb-uwal");
4104 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4105
4106 rt.execute_query(
4107 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4108 )
4109 .unwrap();
4110 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4111 .unwrap();
4112
4113 assert_statement_writes_collections_in_one_new_wal_batch(
4114 &rt,
4115 &wal_path,
4116 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4117 "users",
4118 "user_updates",
4119 );
4120 }
4121
4122 #[test]
4123 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4124 let dir = tempfile::tempdir().unwrap();
4125 let db_path = dir.path().join("events_delete_atomic.rdb");
4126 let wal_path = db_path.with_extension("rdb-uwal");
4127 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4128
4129 rt.execute_query(
4130 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4131 )
4132 .unwrap();
4133 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4134 .unwrap();
4135
4136 assert_statement_writes_collections_in_one_new_wal_batch(
4137 &rt,
4138 &wal_path,
4139 "DELETE FROM users WHERE id = 1",
4140 "users",
4141 "user_deletes",
4142 );
4143 }
4144
4145 #[test]
4146 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4147 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4148 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4149 .unwrap();
4150 for id in 0..5 {
4151 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4152 .unwrap();
4153 }
4154 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4155 .unwrap();
4156
4157 let updated = rt
4158 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4159 .unwrap();
4160 assert_eq!(updated.affected_rows, 3);
4161
4162 let selected = rt
4163 .execute_query("SELECT id, score FROM users ORDER BY id")
4164 .unwrap();
4165 let scores: Vec<(i64, i64)> = selected
4166 .result
4167 .records
4168 .iter()
4169 .map(|record| {
4170 let id = match record.get("id").unwrap() {
4171 Value::Integer(value) => *value,
4172 other => panic!("expected integer id, got {other:?}"),
4173 };
4174 let score = match record.get("score").unwrap() {
4175 Value::Integer(value) => *value,
4176 other => panic!("expected integer score, got {other:?}"),
4177 };
4178 (id, score)
4179 })
4180 .collect();
4181 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4182 }
4183
4184 #[test]
4191 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4192 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4193 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4194 .unwrap();
4195 for id in 0..5 {
4196 rt.execute_query(&format!(
4197 "INSERT INTO items (id, score) VALUES ({id}, {})",
4198 id * 10
4199 ))
4200 .unwrap();
4201 }
4202 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4203 .unwrap();
4204
4205 let updated_one = rt
4209 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4210 .unwrap();
4211 assert_eq!(updated_one.affected_rows, 1);
4212
4213 let updated_many = rt
4217 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4218 .unwrap();
4219 assert_eq!(updated_many.affected_rows, 2);
4220
4221 let snapshot = rt
4222 .execute_query("SELECT id, score FROM items ORDER BY id")
4223 .unwrap();
4224 let pairs: Vec<(i64, i64)> = snapshot
4225 .result
4226 .records
4227 .iter()
4228 .map(|record| {
4229 let id = match record.get("id").unwrap() {
4230 Value::Integer(value) => *value,
4231 other => panic!("expected integer id, got {other:?}"),
4232 };
4233 let score = match record.get("score").unwrap() {
4234 Value::Integer(value) => *value,
4235 other => panic!("expected integer score, got {other:?}"),
4236 };
4237 (id, score)
4238 })
4239 .collect();
4240 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4241
4242 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4244 assert_eq!(updated_all.affected_rows, 5);
4245 let after = rt
4246 .execute_query("SELECT score FROM items ORDER BY id")
4247 .unwrap();
4248 let scores: Vec<i64> = after
4249 .result
4250 .records
4251 .iter()
4252 .map(|record| match record.get("score").unwrap() {
4253 Value::Integer(value) => *value,
4254 other => panic!("expected integer score, got {other:?}"),
4255 })
4256 .collect();
4257 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4258 }
4259
4260 #[test]
4266 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4267 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4268 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4269 .unwrap();
4270 for id in 0..5 {
4271 rt.execute_query(&format!(
4272 "INSERT INTO items (id, score) VALUES ({id}, {})",
4273 id * 10
4274 ))
4275 .unwrap();
4276 }
4277 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4278 .unwrap();
4279
4280 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4283 assert_eq!(deleted_one.affected_rows, 1);
4284
4285 let deleted_many = rt
4289 .execute_query("DELETE FROM items WHERE score > 25")
4290 .unwrap();
4291 assert_eq!(deleted_many.affected_rows, 2);
4292
4293 let surviving = rt
4294 .execute_query("SELECT id FROM items ORDER BY id")
4295 .unwrap();
4296 let ids: Vec<i64> = surviving
4297 .result
4298 .records
4299 .iter()
4300 .map(|record| match record.get("id").unwrap() {
4301 Value::Integer(value) => *value,
4302 other => panic!("expected integer id, got {other:?}"),
4303 })
4304 .collect();
4305 assert_eq!(ids, vec![0, 1]);
4306
4307 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4309 assert_eq!(deleted_rest.affected_rows, 2);
4310 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4311 assert!(empty.result.records.is_empty());
4312 }
4313
4314 #[test]
4319 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4320 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4321 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4322 .unwrap();
4323
4324 let inserted = rt
4327 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4328 .unwrap();
4329 assert_eq!(inserted.affected_rows, 1);
4330
4331 let update_err = rt
4333 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4334 .unwrap_err();
4335 let msg = format!("{update_err}");
4336 assert!(
4337 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4338 "expected UPDATE rejection message, got: {msg}"
4339 );
4340
4341 let delete_err = rt
4343 .execute_query("DELETE FROM events WHERE id = 1")
4344 .unwrap_err();
4345 let msg = format!("{delete_err}");
4346 assert!(
4347 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4348 "expected DELETE rejection message, got: {msg}"
4349 );
4350
4351 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4354 assert_eq!(surviving.result.records.len(), 1);
4355 }
4356
4357 #[test]
4361 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4362 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4363 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4364 .unwrap();
4365
4366 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4367 .unwrap();
4368 let updated = rt
4369 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4370 .unwrap();
4371 assert_eq!(updated.affected_rows, 1);
4372 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4373 assert_eq!(deleted.affected_rows, 1);
4374 }
4375
4376 #[test]
4377 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4378 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4379 rt.execute_query(
4380 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4381 )
4382 .unwrap();
4383
4384 let inserted = rt
4385 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4386 .unwrap();
4387 assert_eq!(inserted.affected_rows, 1);
4388
4389 let events = queue_payloads(&rt, "audit_log");
4390 assert_eq!(events.len(), 1);
4391 let event = events[0].as_object().expect("event payload object");
4392 assert!(event
4393 .get("event_id")
4394 .and_then(crate::json::Value::as_str)
4395 .is_some_and(|value| !value.is_empty()));
4396 assert_eq!(
4397 event.get("op").and_then(crate::json::Value::as_str),
4398 Some("insert")
4399 );
4400 assert_eq!(
4401 event.get("collection").and_then(crate::json::Value::as_str),
4402 Some("users")
4403 );
4404 assert_eq!(
4405 event.get("id").and_then(crate::json::Value::as_u64),
4406 Some(7)
4407 );
4408 assert!(event
4409 .get("ts")
4410 .and_then(crate::json::Value::as_u64)
4411 .is_some());
4412 assert!(event
4413 .get("lsn")
4414 .and_then(crate::json::Value::as_u64)
4415 .is_some());
4416 assert!(matches!(
4417 event.get("tenant"),
4418 Some(crate::json::Value::Null)
4419 ));
4420 assert!(matches!(
4421 event.get("before"),
4422 Some(crate::json::Value::Null)
4423 ));
4424 let after = event
4425 .get("after")
4426 .and_then(crate::json::Value::as_object)
4427 .expect("after object");
4428 assert_eq!(
4429 after.get("id").and_then(crate::json::Value::as_u64),
4430 Some(7)
4431 );
4432 assert_eq!(
4433 after.get("email").and_then(crate::json::Value::as_str),
4434 Some("a@example.com")
4435 );
4436 }
4437
4438 #[test]
4439 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4440 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4441 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4442 .unwrap();
4443
4444 rt.execute_query(
4445 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4446 )
4447 .unwrap();
4448
4449 let events = queue_payloads(&rt, "users_events");
4450 assert_eq!(events.len(), 2);
4451 let mut previous_lsn = 0;
4452 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4453 let object = event.as_object().expect("event payload object");
4454 assert_eq!(
4455 object.get("op").and_then(crate::json::Value::as_str),
4456 Some("insert")
4457 );
4458 assert_eq!(
4459 object.get("id").and_then(crate::json::Value::as_u64),
4460 Some(expected_id)
4461 );
4462 let lsn = object
4463 .get("lsn")
4464 .and_then(crate::json::Value::as_u64)
4465 .expect("event lsn");
4466 assert!(
4467 lsn > previous_lsn,
4468 "event LSNs should increase in row order"
4469 );
4470 previous_lsn = lsn;
4471 let after = object
4472 .get("after")
4473 .and_then(crate::json::Value::as_object)
4474 .expect("after object");
4475 assert_eq!(
4476 after.get("id").and_then(crate::json::Value::as_u64),
4477 Some(expected_id)
4478 );
4479 }
4480 }
4481
4482 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4483 let result = rt
4484 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4485 .expect("peek queue");
4486 result
4487 .result
4488 .records
4489 .iter()
4490 .map(
4491 |record| match record.get("payload").expect("payload column") {
4492 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4493 other => panic!("expected JSON queue payload, got {other:?}"),
4494 },
4495 )
4496 .collect()
4497 }
4498
4499 #[test]
4511 fn auto_index_id_fires_on_first_insert() {
4512 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4513 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4514 .unwrap();
4515
4516 assert!(
4518 rt.index_store_ref()
4519 .find_index_for_column("bench_users", "id")
4520 .is_none(),
4521 "freshly created collection should not have an `id` index"
4522 );
4523
4524 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4526 .unwrap();
4527
4528 let registered = rt
4530 .index_store_ref()
4531 .find_index_for_column("bench_users", "id")
4532 .expect("auto-index hook should have registered idx_id on first insert");
4533 assert_eq!(registered.name, "idx_id");
4534 assert_eq!(registered.collection, "bench_users");
4535 assert_eq!(registered.columns, vec!["id".to_string()]);
4536 assert!(matches!(
4537 registered.method,
4538 super::super::index_store::IndexMethodKind::Hash
4539 ));
4540
4541 for id in 2..=5 {
4544 rt.execute_query(&format!(
4545 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4546 id * 10
4547 ))
4548 .unwrap();
4549 }
4550 for id in 1..=5 {
4551 let result = rt
4552 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4553 .unwrap();
4554 assert_eq!(
4555 result.result.records.len(),
4556 1,
4557 "id={id} should match one row"
4558 );
4559 }
4560
4561 let deleted = rt
4566 .execute_query("DELETE FROM bench_users WHERE id = 3")
4567 .unwrap();
4568 assert_eq!(deleted.affected_rows, 1);
4569 }
4570
4571 #[test]
4576 fn auto_index_id_fires_on_first_bulk_insert() {
4577 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4578 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4579 .unwrap();
4580
4581 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4582 .unwrap();
4583
4584 let registered = rt
4585 .index_store_ref()
4586 .find_index_for_column("bench_bulk", "id")
4587 .expect("auto-index hook should fire on first bulk insert");
4588 assert_eq!(registered.name, "idx_id");
4589
4590 for id in 1..=3 {
4592 let result = rt
4593 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4594 .unwrap();
4595 assert_eq!(result.result.records.len(), 1);
4596 }
4597 }
4598
4599 #[test]
4603 fn auto_index_id_skips_when_no_id_column() {
4604 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4605 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4606 .unwrap();
4607 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4608 .unwrap();
4609
4610 assert!(rt
4611 .index_store_ref()
4612 .find_index_for_column("plain", "id")
4613 .is_none());
4614 assert!(rt
4615 .index_store_ref()
4616 .find_index_for_column("plain", "uid")
4617 .is_none());
4618 }
4619
4620 #[test]
4625 fn auto_index_id_skips_when_index_already_exists() {
4626 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4627 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4628 .unwrap();
4629 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4631 .unwrap();
4632 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4633 .unwrap();
4634
4635 let registered = rt
4636 .index_store_ref()
4637 .find_index_for_column("pre", "id")
4638 .expect("user index should still be there");
4639 assert_eq!(
4640 registered.name, "user_idx",
4641 "auto-index hook must not overwrite an existing index"
4642 );
4643 }
4644
4645 #[test]
4649 fn auto_index_id_dropped_with_collection() {
4650 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4651 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4652 .unwrap();
4653 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4654 .unwrap();
4655 assert!(rt
4656 .index_store_ref()
4657 .find_index_for_column("ephemeral", "id")
4658 .is_some());
4659
4660 rt.execute_query("DROP TABLE ephemeral").unwrap();
4661
4662 assert!(
4663 rt.index_store_ref()
4664 .find_index_for_column("ephemeral", "id")
4665 .is_none(),
4666 "implicit `idx_id` must be reaped when its collection drops"
4667 );
4668 }
4669
4670 #[test]
4675 fn auto_index_id_disabled_by_config() {
4676 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4677 let rt = RedDBRuntime::with_options(opts).unwrap();
4678
4679 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4680 .unwrap();
4681 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4682 .unwrap();
4683
4684 assert!(
4685 rt.index_store_ref()
4686 .find_index_for_column("off", "id")
4687 .is_none(),
4688 "with auto_index_id=false, no implicit index should be created"
4689 );
4690 }
4691
4692 #[test]
4695 fn update_single_row_emits_update_event() {
4696 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4697 rt.execute_query(
4698 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4699 )
4700 .unwrap();
4701 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4702 .unwrap();
4703
4704 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4705 .unwrap();
4706
4707 let events = queue_payloads(&rt, "audit_log");
4708 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4709 let event = events[0].as_object().expect("event payload object");
4710 assert_eq!(
4711 event.get("op").and_then(crate::json::Value::as_str),
4712 Some("update")
4713 );
4714 assert_eq!(
4715 event.get("collection").and_then(crate::json::Value::as_str),
4716 Some("users")
4717 );
4718 assert!(event
4719 .get("event_id")
4720 .and_then(crate::json::Value::as_str)
4721 .is_some_and(|v| !v.is_empty()));
4722 let before = event
4723 .get("before")
4724 .and_then(crate::json::Value::as_object)
4725 .expect("before must be an object");
4726 let after = event
4727 .get("after")
4728 .and_then(crate::json::Value::as_object)
4729 .expect("after must be an object");
4730 assert_eq!(
4731 before.get("name").and_then(crate::json::Value::as_str),
4732 Some("Alice"),
4733 "before.name should be the old value"
4734 );
4735 assert_eq!(
4736 after.get("name").and_then(crate::json::Value::as_str),
4737 Some("Bob"),
4738 "after.name should be the new value"
4739 );
4740 }
4741
4742 #[test]
4743 fn update_event_only_includes_changed_fields() {
4744 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4745 rt.execute_query(
4746 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4747 )
4748 .unwrap();
4749 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4750 .unwrap();
4751
4752 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4753 .unwrap();
4754
4755 let events = queue_payloads(&rt, "evts");
4756 assert_eq!(events.len(), 1);
4757 let event = events[0].as_object().unwrap();
4758 let before = event
4759 .get("before")
4760 .and_then(crate::json::Value::as_object)
4761 .unwrap();
4762 let after = event
4763 .get("after")
4764 .and_then(crate::json::Value::as_object)
4765 .unwrap();
4766 assert!(
4768 before.contains_key("name"),
4769 "before must include changed field"
4770 );
4771 assert!(
4772 after.contains_key("name"),
4773 "after must include changed field"
4774 );
4775 assert!(
4777 !before.contains_key("email"),
4778 "before must not include unchanged email"
4779 );
4780 assert!(
4781 !after.contains_key("email"),
4782 "after must not include unchanged email"
4783 );
4784 }
4785
4786 #[test]
4787 fn multi_row_update_emits_one_event_per_row() {
4788 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4789 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4790 .unwrap();
4791 rt.execute_query(
4792 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4793 )
4794 .unwrap();
4795
4796 rt.execute_query("UPDATE items SET status = 'done'")
4797 .unwrap();
4798
4799 let events = queue_payloads(&rt, "evts");
4800 assert_eq!(events.len(), 3, "expected one update event per row");
4801 for event in &events {
4802 let obj = event.as_object().unwrap();
4803 assert_eq!(
4804 obj.get("op").and_then(crate::json::Value::as_str),
4805 Some("update")
4806 );
4807 }
4808 }
4809
4810 #[test]
4811 fn delete_single_row_emits_delete_event() {
4812 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4813 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4814 .unwrap();
4815 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4816 .unwrap();
4817
4818 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4819
4820 let events = queue_payloads(&rt, "del_log");
4821 assert_eq!(events.len(), 1);
4822 let event = events[0].as_object().expect("event payload object");
4823 assert_eq!(
4824 event.get("op").and_then(crate::json::Value::as_str),
4825 Some("delete")
4826 );
4827 assert_eq!(
4828 event.get("collection").and_then(crate::json::Value::as_str),
4829 Some("users")
4830 );
4831 assert!(event
4832 .get("event_id")
4833 .and_then(crate::json::Value::as_str)
4834 .is_some_and(|v| !v.is_empty()));
4835 let before = event
4836 .get("before")
4837 .and_then(crate::json::Value::as_object)
4838 .expect("before must be an object for delete");
4839 assert_eq!(
4840 before.get("id").and_then(crate::json::Value::as_u64),
4841 Some(42)
4842 );
4843 assert_eq!(
4844 before.get("name").and_then(crate::json::Value::as_str),
4845 Some("Alice")
4846 );
4847 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4848 }
4849
4850 #[test]
4851 fn multi_row_delete_emits_one_event_per_row() {
4852 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4853 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4854 .unwrap();
4855 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4856 .unwrap();
4857
4858 rt.execute_query("DELETE FROM items").unwrap();
4859
4860 let events = queue_payloads(&rt, "del_log");
4861 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4862 for event in &events {
4863 let obj = event.as_object().unwrap();
4864 assert_eq!(
4865 obj.get("op").and_then(crate::json::Value::as_str),
4866 Some("delete")
4867 );
4868 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4869 }
4870 }
4871
4872 #[test]
4873 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4874 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4875 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4876 .unwrap();
4877
4878 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4879 .unwrap();
4880 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4881
4882 let events = queue_payloads(&rt, "evts");
4883 assert!(
4884 events.is_empty(),
4885 "UPDATE-only filter must not emit INSERT or DELETE events"
4886 );
4887 }
4888
4889 #[test]
4892 fn suppress_events_on_insert_emits_no_events() {
4893 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4894 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4895 .unwrap();
4896
4897 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4898 .unwrap();
4899
4900 let events = queue_payloads(&rt, "evts");
4901 assert!(
4902 events.is_empty(),
4903 "SUPPRESS EVENTS must prevent INSERT events"
4904 );
4905 }
4906
4907 #[test]
4908 fn suppress_events_on_update_emits_no_events() {
4909 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4910 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4911 .unwrap();
4912 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4913 .unwrap();
4914 let _ = queue_payloads(&rt, "evts");
4916 rt.execute_query("QUEUE PURGE evts").unwrap();
4918
4919 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4920 .unwrap();
4921
4922 let events = queue_payloads(&rt, "evts");
4923 assert!(
4924 events.is_empty(),
4925 "SUPPRESS EVENTS must prevent UPDATE events"
4926 );
4927 }
4928
4929 #[test]
4930 fn suppress_events_on_delete_emits_no_events() {
4931 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4932 rt.execute_query(
4933 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4934 )
4935 .unwrap();
4936 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4937 .unwrap();
4938
4939 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4940 .unwrap();
4941
4942 let events = queue_payloads(&rt, "evts");
4943 assert!(
4944 events.is_empty(),
4945 "SUPPRESS EVENTS must prevent DELETE events"
4946 );
4947 }
4948
4949 #[test]
4950 fn normal_insert_after_suppress_still_emits() {
4951 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4952 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4953 .unwrap();
4954
4955 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4956 .unwrap();
4957 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4958 .unwrap();
4959
4960 let events = queue_payloads(&rt, "evts");
4961 assert_eq!(
4962 events.len(),
4963 1,
4964 "only the non-suppressed INSERT should emit"
4965 );
4966 assert_eq!(
4967 events[0].get("id").and_then(crate::json::Value::as_u64),
4968 Some(2)
4969 );
4970 }
4971}