1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28 sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 pub fn chain_tip_for_collection(
72 &self,
73 collection: &str,
74 ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75 let store = self.inner.db.store();
76 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77 return None;
78 }
79 let mut cache = self.inner.chain_tip_cache.lock();
80 if let Some(existing) = cache.get(collection) {
81 return Some(existing.clone());
82 }
83 let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84 cache.insert(collection.to_string(), scanned.clone());
85 Some(scanned)
86 }
87
88 pub fn verify_chain_for_collection(
95 &self,
96 collection: &str,
97 ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98 let store = self.inner.db.store();
99 let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100 if !outcome.ok {
101 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102 self.inner
103 .chain_integrity_broken
104 .lock()
105 .insert(collection.to_string(), true);
106 }
107 Some(outcome)
108 }
109
110 pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114 let store = self.inner.db.store();
115 if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116 return false;
117 }
118 crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119 self.inner
120 .chain_integrity_broken
121 .lock()
122 .insert(collection.to_string(), false);
123 true
124 }
125
126 fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130 {
131 let cache = self.inner.chain_integrity_broken.lock();
132 if let Some(v) = cache.get(collection) {
133 return *v;
134 }
135 }
136 let store = self.inner.db.store();
137 let persisted =
138 crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139 .unwrap_or(false);
140 self.inner
141 .chain_integrity_broken
142 .lock()
143 .insert(collection.to_string(), persisted);
144 persisted
145 }
146
147 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
161 let Some(tenant_col) = self.tenant_column(&query.table) else {
162 return Ok(None);
163 };
164 if query
166 .columns
167 .iter()
168 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
169 {
170 return Ok(None);
171 }
172
173 if let Some(dot_pos) = tenant_col.find('.') {
179 let (root, tail) = tenant_col.split_at(dot_pos);
180 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
182 }
183
184 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
185 return Err(RedDBError::Query(format!(
186 "INSERT into tenant-scoped table '{}' requires an active tenant — \
187 run SET TENANT '<id>' first or name column '{}' explicitly",
188 query.table, tenant_col
189 )));
190 };
191
192 let mut augmented = query.clone();
193 augmented.columns.push(tenant_col);
194 let lit = Value::text(tenant_id.clone());
195 for row in augmented.values.iter_mut() {
196 row.push(lit.clone());
197 }
198 for row in augmented.value_exprs.iter_mut() {
199 row.push(crate::storage::query::ast::Expr::Literal {
200 value: lit.clone(),
201 span: crate::storage::query::ast::Span::synthetic(),
202 });
203 }
204 Ok(Some(augmented))
205 }
206
207 fn inject_dotted_tenant(
217 &self,
218 query: &InsertQuery,
219 root: &str,
220 tail: &str,
221 ) -> RedDBResult<Option<InsertQuery>> {
222 let active_tenant = crate::runtime::impl_core::current_tenant();
223 let mut augmented = query.clone();
224 let root_idx = augmented
225 .columns
226 .iter()
227 .position(|c| c.eq_ignore_ascii_case(root));
228
229 if let Some(idx) = root_idx {
230 for row in augmented.values.iter_mut() {
236 let Some(slot) = row.get_mut(idx) else {
237 continue;
238 };
239 if dotted_tail_already_set(slot, tail) {
240 continue;
241 }
242 let Some(tenant_id) = &active_tenant else {
243 return Err(RedDBError::Query(format!(
244 "INSERT into tenant-scoped table '{}' requires an active tenant — \
245 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
246 query.table, root, tail
247 )));
248 };
249 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
250 }
251 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
255 if let Some(slot) = row.get_mut(idx) {
256 let new_value = augmented
257 .values
258 .get(row_idx)
259 .and_then(|v| v.get(idx))
260 .cloned()
261 .unwrap_or(Value::Null);
262 *slot = crate::storage::query::ast::Expr::Literal {
263 value: new_value,
264 span: crate::storage::query::ast::Span::synthetic(),
265 };
266 }
267 }
268 } else {
269 let Some(tenant_id) = &active_tenant else {
273 return Err(RedDBError::Query(format!(
274 "INSERT into tenant-scoped table '{}' requires an active tenant — \
275 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
276 query.table, root, tail
277 )));
278 };
279 augmented.columns.push(root.to_string());
281 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
282 for row in augmented.values.iter_mut() {
283 row.push(fresh.clone());
284 }
285 for row in augmented.value_exprs.iter_mut() {
286 row.push(crate::storage::query::ast::Expr::Literal {
287 value: fresh.clone(),
288 span: crate::storage::query::ast::Span::synthetic(),
289 });
290 }
291 }
292
293 Ok(Some(augmented))
294 }
295
296 fn delete_entities_batch(
299 &self,
300 collection: &str,
301 ids: &[EntityId],
302 ) -> RedDBResult<(u64, Vec<u64>)> {
303 if ids.is_empty() {
304 return Ok((0, vec![]));
305 }
306
307 let store = self.db().store();
308 let Some(manager) = store.get_collection(collection) else {
309 return Ok((0, vec![]));
310 };
311
312 let active_xid = self.current_xid();
313 let conn_id = crate::runtime::impl_core::current_connection_id();
314 let mut autocommit_xid = None;
315 let mut tombstoned_ids = Vec::new();
316 let mut tombstoned_entities = Vec::new();
317 let mut physical_delete_ids = Vec::new();
318 let table_row_resolver =
319 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
320
321 for &id in ids {
322 let Some(mut entity) = manager.get(id) else {
323 continue;
324 };
325 if matches!(entity.data, EntityData::Row(_)) {
326 let previous_xmax = entity.xmax;
327 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
328 if table_row_resolver.resolve_candidate(&entity).is_none() {
329 continue;
330 }
331 } else if entity.xmax != 0 {
332 continue;
333 }
334
335 let xid = match active_xid {
336 Some(xid) => xid,
337 None => match autocommit_xid {
338 Some(xid) => xid,
339 None => {
340 let mgr = self.snapshot_manager();
341 let xid = mgr.begin();
342 autocommit_xid = Some(xid);
343 xid
344 }
345 },
346 };
347 entity.set_xmax(xid);
348 if manager.update(entity.clone()).is_ok() {
349 if active_xid.is_some() {
350 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
351 }
352 tombstoned_entities.push(entity);
353 tombstoned_ids.push(id);
354 }
355 } else {
356 physical_delete_ids.push(id);
357 }
358 }
359
360 if let Some(xid) = autocommit_xid {
361 self.snapshot_manager().commit(xid);
362 }
363
364 let mut affected = tombstoned_ids.len() as u64;
365 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
366 if active_xid.is_some() {
367 store
368 .persist_entities_to_pager(collection, &tombstoned_entities)
369 .map_err(|err| RedDBError::Internal(err.to_string()))?;
370 } else {
371 store
372 .persist_entities_to_pager(collection, &tombstoned_entities)
373 .map_err(|err| RedDBError::Internal(err.to_string()))?;
374 for id in &tombstoned_ids {
375 store.context_index().remove_entity(*id);
376 let lsn = self.cdc_emit(
377 crate::replication::cdc::ChangeOperation::Delete,
378 collection,
379 id.raw(),
380 "entity",
381 );
382 lsns.push(lsn);
383 }
384 }
385
386 let deleted_ids = store
387 .delete_batch(collection, &physical_delete_ids)
388 .map_err(|err| RedDBError::Internal(err.to_string()))?;
389 affected += deleted_ids.len() as u64;
390 for id in &deleted_ids {
391 store.context_index().remove_entity(*id);
392 let lsn = self.cdc_emit(
393 crate::replication::cdc::ChangeOperation::Delete,
394 collection,
395 id.raw(),
396 "entity",
397 );
398 lsns.push(lsn);
399 }
400
401 Ok((affected, lsns))
402 }
403
404 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
407 if applied.is_empty() {
408 return Ok(Vec::new());
409 }
410
411 let store = self.db().store();
412 if applied.iter().any(|item| item.context_index_dirty) {
413 store.context_index().index_entities(
414 &applied[0].collection,
415 applied
416 .iter()
417 .filter(|item| item.context_index_dirty)
418 .map(|item| &item.entity),
419 );
420 }
421
422 for item in applied {
423 self.refresh_update_secondary_indexes(item)?;
424 }
425
426 let mut lsns = Vec::with_capacity(applied.len());
427 for item in applied {
428 let lsn = self.cdc_emit_prebuilt(
429 crate::replication::cdc::ChangeOperation::Update,
430 &item.collection,
431 &item.entity,
432 update_cdc_item_kind(self, &item.collection, &item.entity),
433 item.metadata.as_ref(),
434 false,
435 );
436 lsns.push(lsn);
437 }
438 Ok(lsns)
439 }
440
441 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
442 self.persist_applied_entity_mutations(applied)
443 }
444
445 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
446 if applied.pre_mutation_fields.is_empty() {
447 return Ok(());
448 }
449 let post = entity_row_fields_snapshot(&applied.entity);
450 if post.is_empty() {
451 return Ok(());
452 }
453
454 let indexed_cols = self
455 .index_store_ref()
456 .indexed_columns_set(&applied.collection);
457 if indexed_cols.is_empty() {
458 return Ok(());
459 }
460
461 if let Some(old_version) = applied.replaced_entity.as_ref() {
462 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
463 .pre_mutation_fields
464 .iter()
465 .filter(|(col, _)| indexed_cols.contains(col))
466 .cloned()
467 .collect();
468 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
469 .iter()
470 .filter(|(col, _)| indexed_cols.contains(col))
471 .cloned()
472 .collect();
473 if !old_index_fields.is_empty() {
474 self.index_store_ref()
475 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
476 .map_err(crate::RedDBError::Internal)?;
477 }
478 if !new_index_fields.is_empty() {
479 self.index_store_ref()
480 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
481 .map_err(crate::RedDBError::Internal)?;
482 }
483 return Ok(());
484 }
485
486 let damage =
487 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
488 if damage
489 .touched_columns()
490 .into_iter()
491 .any(|col| indexed_cols.contains(col))
492 {
493 self.index_store_ref()
494 .index_entity_update(
495 &applied.collection,
496 applied.id,
497 &applied.pre_mutation_fields,
498 &post,
499 )
500 .map_err(crate::RedDBError::Internal)?;
501 }
502 Ok(())
503 }
504
505 pub fn execute_insert(
510 &self,
511 raw_query: &str,
512 query: &InsertQuery,
513 ) -> RedDBResult<RuntimeQueryResult> {
514 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
515 crate::runtime::collection_contract::CollectionContractGate::check(
521 self,
522 &query.table,
523 crate::runtime::collection_contract::MutationKind::Insert,
524 )?;
525 let augmented_owned;
534 let query = match self.maybe_inject_tenant_column(query)? {
535 Some(new_q) => {
536 augmented_owned = new_q;
537 &augmented_owned
538 }
539 None => query,
540 };
541 self.check_insert_column_policy(query)?;
542 if let Some(ref embed_config) = query.auto_embed {
543 let provider = crate::ai::parse_provider(&embed_config.provider)?;
544 if matches!(provider, crate::ai::AiProvider::Local) {
545 let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
553 if model_name.is_empty() {
554 return Err(RedDBError::Query(
555 "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
556 the local provider does not have an implicit default model"
557 .to_string(),
558 ));
559 }
560 crate::runtime::ai::local_embedding::preflight_local_embedding(
561 &self.inner.db,
562 model_name,
563 )?;
564 }
565 }
566
567 let mut inserted_count: u64 = 0;
568 let effective_rows =
569 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
570
571 let store = self.inner.db.store();
573 let _ = store.get_or_create_collection(&query.table);
574 let declared_model = self
575 .db()
576 .collection_contract_arc(&query.table)
577 .map(|contract| contract.declared_model);
578
579 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
580 if query.returning.is_some() {
581 Some(Vec::with_capacity(effective_rows.len()))
582 } else {
583 None
584 };
585 let mut returning_result: Option<UnifiedResult> = None;
586
587 if matches!(query.entity_type, InsertEntityType::Row)
588 && !matches!(
589 declared_model,
590 Some(crate::catalog::CollectionModel::TimeSeries)
591 )
592 {
593 let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
604 let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
605 Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
606 } else {
607 None
608 };
609 let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
610
611 if chain_mode && self.is_chain_integrity_broken(&query.table) {
614 return Err(RedDBError::InvalidOperation(format!(
615 "ChainIntegrityBroken: collection '{}' is locked until \
616 POST /collections/{}/clear-integrity-flag is called by an admin",
617 query.table, query.table
618 )));
619 }
620
621 let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
625 if chain_mode {
626 let mut cache = self.inner.chain_tip_cache.lock();
627 if let Some(existing) = cache.get(&query.table) {
628 Some(existing.clone())
629 } else if let Some(scanned) =
630 crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
631 {
632 cache.insert(query.table.clone(), scanned.clone());
633 Some(scanned)
634 } else {
635 None
636 }
637 } else {
638 None
639 };
640
641 let mut rows = Vec::with_capacity(effective_rows.len());
642 for row_values in &effective_rows {
643 if row_values.len() != query.columns.len() {
644 return Err(RedDBError::Query(format!(
645 "INSERT column count ({}) does not match value count ({})",
646 query.columns.len(),
647 row_values.len()
648 )));
649 }
650 let (mut fields, mut metadata) =
651 split_insert_metadata(self, &query.columns, row_values)?;
652 if chain_mode {
653 use crate::runtime::blockchain_kind::{
654 chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
655 COL_TIMESTAMP, RESERVED_COLUMNS,
656 };
657 let supplied_height = fields
658 .iter()
659 .find(|(k, _)| k == COL_BLOCK_HEIGHT)
660 .map(|(_, v)| v.clone());
661 let supplied_prev = fields
662 .iter()
663 .find(|(k, _)| k == COL_PREV_HASH)
664 .map(|(_, v)| v.clone());
665 let supplied_ts = fields
666 .iter()
667 .find(|(k, _)| k == COL_TIMESTAMP)
668 .map(|(_, v)| v.clone());
669 let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
670 let user_supplied_any = supplied_height.is_some()
671 || supplied_prev.is_some()
672 || supplied_ts.is_some()
673 || supplied_hash;
674
675 fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
676 let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
677
678 let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
679 Some(t) => (t.hash, t.height + 1),
680 None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
681 };
682 let server_now = crate::runtime::blockchain_kind::now_ms();
683
684 let (use_prev, use_height, use_ts) = if user_supplied_any {
685 if supplied_hash {
688 return Err(chain_conflict_error(
689 tip_next_height.saturating_sub(1),
690 tip_prev_hash,
691 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
692 server_now,
693 "hash column is engine-computed and cannot be supplied",
694 ));
695 }
696 let caller_prev = match &supplied_prev {
697 Some(Value::Blob(b)) if b.len() == 32 => {
698 let mut a = [0u8; 32];
699 a.copy_from_slice(b);
700 a
701 }
702 Some(Value::Text(s)) if s.len() == 64 => {
703 let mut a = [0u8; 32];
707 let mut ok = true;
708 for (i, slot) in a.iter_mut().enumerate() {
709 let pair = &s.as_ref()[i * 2..i * 2 + 2];
710 match u8::from_str_radix(pair, 16) {
711 Ok(byte) => *slot = byte,
712 Err(_) => {
713 ok = false;
714 break;
715 }
716 }
717 }
718 if !ok {
719 return Err(chain_conflict_error(
720 tip_next_height.saturating_sub(1),
721 tip_prev_hash,
722 chain_tip_full
723 .as_ref()
724 .map(|t| t.timestamp_ms)
725 .unwrap_or(0),
726 server_now,
727 "prev_hash is not valid hex",
728 ));
729 }
730 a
731 }
732 _ => {
733 return Err(chain_conflict_error(
734 tip_next_height.saturating_sub(1),
735 tip_prev_hash,
736 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
737 server_now,
738 "prev_hash missing or not a 32-byte Blob",
739 ));
740 }
741 };
742 if caller_prev != tip_prev_hash {
743 return Err(chain_conflict_error(
744 tip_next_height.saturating_sub(1),
745 tip_prev_hash,
746 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
747 server_now,
748 "prev_hash does not match current tip",
749 ));
750 }
751 let caller_height = match &supplied_height {
752 Some(Value::UnsignedInteger(v)) => *v,
753 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
754 _ => {
755 return Err(chain_conflict_error(
756 tip_next_height.saturating_sub(1),
757 tip_prev_hash,
758 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
759 server_now,
760 "block_height missing or not an unsigned integer",
761 ));
762 }
763 };
764 if caller_height != tip_next_height {
765 return Err(chain_conflict_error(
766 tip_next_height.saturating_sub(1),
767 tip_prev_hash,
768 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
769 server_now,
770 "block_height does not match tip+1",
771 ));
772 }
773 let caller_ts = match &supplied_ts {
774 Some(Value::UnsignedInteger(v)) => *v,
775 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
776 _ => {
777 return Err(chain_conflict_error(
778 tip_next_height.saturating_sub(1),
779 tip_prev_hash,
780 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
781 server_now,
782 "timestamp missing or not an unsigned integer",
783 ));
784 }
785 };
786 let drift = (caller_ts as i128) - (server_now as i128);
787 if drift.abs() > 60_000 {
788 return Err(chain_conflict_error(
789 tip_next_height.saturating_sub(1),
790 tip_prev_hash,
791 chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
792 server_now,
793 "timestamp outside ±60s of server_time",
794 ));
795 }
796 (caller_prev, caller_height, caller_ts)
797 } else {
798 (tip_prev_hash, tip_next_height, server_now)
799 };
800
801 let (reserved, new_hash) =
802 crate::runtime::blockchain_kind::make_block_reserved_fields(
803 use_prev, use_height, use_ts, &payload,
804 );
805 fields.extend(reserved);
806 chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
807 height: use_height,
808 hash: new_hash,
809 timestamp_ms: use_ts,
810 });
811 }
812 if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
824 let (pk_col, sig_col, residual) =
825 crate::runtime::signed_writes_kind::split_signature_fields(fields);
826 let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
827 let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
828 crate::runtime::signed_writes_kind::verify_row(
829 ®,
830 pk_col.as_ref().map(|c| c.bytes.as_slice()),
831 sig_col.as_ref().map(|c| c.bytes.as_slice()),
832 &payload,
833 )
834 .map_err(crate::runtime::signed_writes_kind::map_error)?;
835 fields = residual;
836 if let Some(col) = pk_col {
841 fields.push((
842 crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
843 col.raw_value,
844 ));
845 }
846 if let Some(col) = sig_col {
847 fields.push((
848 crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
849 col.raw_value,
850 ));
851 }
852 }
853 merge_with_clauses(
854 &mut metadata,
855 query.ttl_ms,
856 query.expires_at_ms,
857 &query.with_metadata,
858 );
859 if let Some(snaps) = returning_snapshots.as_mut() {
860 snaps.push(fields.clone());
861 }
862 rows.push(CreateRowInput {
863 collection: query.table.clone(),
864 fields,
865 metadata,
866 node_links: Vec::new(),
867 vector_links: Vec::new(),
868 });
869 }
870 let outputs = self.create_rows_batch(CreateRowsBatchInput {
871 collection: query.table.clone(),
872 rows,
873 suppress_events: query.suppress_events,
874 })?;
875 inserted_count = outputs.len() as u64;
876
877 if chain_mode {
881 if let Some(new_tip) = chain_tip_full.as_ref() {
882 self.inner
883 .chain_tip_cache
884 .lock()
885 .insert(query.table.clone(), new_tip.clone());
886 }
887 }
888
889 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
897 let time_col = &spec.time_column;
898 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
900 for row in &effective_rows {
901 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
902 if *n >= 0 {
903 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
904 }
905 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
906 let _ = self.inner.db.hypertables().route(&query.table, *n);
907 }
908 }
909 }
910 }
911
912 if let (Some(items), Some(snaps)) =
913 (query.returning.as_ref(), returning_snapshots.take())
914 {
915 let snaps = row_insert_returning_snapshots(&outputs, snaps);
916 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
917 }
918 } else {
919 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
926 Vec::with_capacity(effective_rows.len());
927 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
928 {
929 Vec::with_capacity(effective_rows.len())
930 } else {
931 Vec::new()
932 };
933 if matches!(
934 query.entity_type,
935 InsertEntityType::Node | InsertEntityType::Edge
936 ) {
937 enum PreparedGraphInsert {
938 Node {
939 fields: Vec<(String, Value)>,
940 input: CreateNodeInput,
941 },
942 Edge {
943 fields: Vec<(String, Value)>,
944 input: CreateEdgeInput,
945 },
946 }
947
948 let mut prepared = Vec::with_capacity(effective_rows.len());
949 for row_values in &effective_rows {
950 if row_values.len() != query.columns.len() {
951 return Err(RedDBError::Query(format!(
952 "INSERT column count ({}) does not match value count ({})",
953 query.columns.len(),
954 row_values.len()
955 )));
956 }
957
958 match query.entity_type {
959 InsertEntityType::Node => {
960 let (node_values, mut metadata) =
961 split_insert_metadata(self, &query.columns, row_values)?;
962 merge_with_clauses(
963 &mut metadata,
964 query.ttl_ms,
965 query.expires_at_ms,
966 &query.with_metadata,
967 );
968 ensure_non_tree_reserved_metadata_entries(&metadata)?;
969 apply_collection_default_ttl_metadata(
970 self,
971 &query.table,
972 &mut metadata,
973 );
974 let (columns, values) = pairwise_columns_values(&node_values);
975 let label = find_column_value_string(&columns, &values, "label")?;
976 let node_type =
977 find_column_value_opt_string(&columns, &values, "node_type");
978 let properties = extract_remaining_properties(
979 &columns,
980 &values,
981 &["label", "node_type"],
982 );
983 crate::reserved_fields::ensure_no_reserved_public_item_fields(
984 properties.iter().map(|(key, _)| key.as_str()),
985 &format!("node '{}'", query.table),
986 )?;
987 prepared.push(PreparedGraphInsert::Node {
988 fields: node_values,
989 input: CreateNodeInput {
990 collection: query.table.clone(),
991 label,
992 node_type,
993 properties,
994 metadata,
995 embeddings: Vec::new(),
996 table_links: Vec::new(),
997 node_links: Vec::new(),
998 },
999 });
1000 }
1001 InsertEntityType::Edge => {
1002 let (edge_values, mut metadata) =
1003 split_insert_metadata(self, &query.columns, row_values)?;
1004 merge_with_clauses(
1005 &mut metadata,
1006 query.ttl_ms,
1007 query.expires_at_ms,
1008 &query.with_metadata,
1009 );
1010 ensure_non_tree_reserved_metadata_entries(&metadata)?;
1011 apply_collection_default_ttl_metadata(
1012 self,
1013 &query.table,
1014 &mut metadata,
1015 );
1016 let (columns, values) = pairwise_columns_values(&edge_values);
1017 let label = find_column_value_string(&columns, &values, "label")?;
1018 ensure_non_tree_structural_edge_label(&label)?;
1019 let from_id = resolve_edge_endpoint_any(
1020 self.inner.db.store().as_ref(),
1021 &query.table,
1022 &columns,
1023 &values,
1024 &["from_rid", "from"],
1025 )?;
1026 let to_id = resolve_edge_endpoint_any(
1027 self.inner.db.store().as_ref(),
1028 &query.table,
1029 &columns,
1030 &values,
1031 &["to_rid", "to"],
1032 )?;
1033 let weight = find_column_value_f32_opt(&columns, &values, "weight");
1034 let properties = extract_remaining_properties(
1035 &columns,
1036 &values,
1037 &["label", "from_rid", "to_rid", "from", "to", "weight"],
1038 );
1039 crate::reserved_fields::ensure_no_reserved_public_item_fields(
1040 properties.iter().map(|(key, _)| key.as_str()),
1041 &format!("edge '{}'", query.table),
1042 )?;
1043 prepared.push(PreparedGraphInsert::Edge {
1044 fields: edge_values,
1045 input: CreateEdgeInput {
1046 collection: query.table.clone(),
1047 label,
1048 from: EntityId::new(from_id),
1049 to: EntityId::new(to_id),
1050 weight,
1051 properties,
1052 metadata,
1053 },
1054 });
1055 }
1056 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1057 }
1058 }
1059
1060 ensure_graph_insert_contract(self, &query.table)?;
1061 let mut batch = self.inner.db.batch();
1062 for item in prepared {
1063 match item {
1064 PreparedGraphInsert::Node { fields, input } => {
1065 if query.returning.is_some() {
1066 returning_field_snaps.push(fields);
1067 }
1068 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1069 batch = batch.add_node_with_type(
1070 input.collection,
1071 input.label,
1072 node_type,
1073 input.properties.into_iter().collect(),
1074 input.metadata.into_iter().collect(),
1075 );
1076 }
1077 PreparedGraphInsert::Edge { fields, input } => {
1078 if query.returning.is_some() {
1079 returning_field_snaps.push(fields);
1080 }
1081 batch = batch.add_edge(
1082 input.collection,
1083 input.label,
1084 input.from,
1085 input.to,
1086 input.weight.unwrap_or(1.0),
1087 input.properties.into_iter().collect(),
1088 input.metadata.into_iter().collect(),
1089 );
1090 }
1091 }
1092 }
1093 let batch_result = batch
1094 .execute()
1095 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1096 let (ids, entity_kind) = match query.entity_type {
1097 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1098 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1099 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1100 };
1101 for id in &ids {
1102 self.stamp_xmin_if_in_txn(&query.table, *id);
1103 }
1104 if query.returning.is_some() {
1105 returning_field_snaps = graph_insert_returning_snapshots(
1106 self.inner.db.store().as_ref(),
1107 &query.table,
1108 &ids,
1109 );
1110 }
1111 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1112 let store = self.inner.db.store();
1113 entity_outputs.extend(ids.iter().map(|id| {
1114 crate::application::entity::CreateEntityOutput {
1115 id: *id,
1116 entity: store.get(&query.table, *id),
1117 }
1118 }));
1119 inserted_count = ids.len() as u64;
1120 } else {
1121 for row_values in &effective_rows {
1122 if row_values.len() != query.columns.len() {
1123 return Err(RedDBError::Query(format!(
1124 "INSERT column count ({}) does not match value count ({})",
1125 query.columns.len(),
1126 row_values.len()
1127 )));
1128 }
1129
1130 match query.entity_type {
1131 InsertEntityType::Row => {
1132 if query.returning.is_some() {
1133 return Err(RedDBError::Query(
1134 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1135 .to_string(),
1136 ));
1137 }
1138 let (fields, mut metadata) =
1139 split_insert_metadata(self, &query.columns, row_values)?;
1140 merge_with_clauses(
1141 &mut metadata,
1142 query.ttl_ms,
1143 query.expires_at_ms,
1144 &query.with_metadata,
1145 );
1146 self.insert_timeseries_point(&query.table, fields, metadata)?;
1147 }
1148 InsertEntityType::Node | InsertEntityType::Edge => {
1149 unreachable!("NODE and EDGE are handled by the prepared graph path")
1150 }
1151 InsertEntityType::Vector => {
1152 let (vector_values, mut metadata) =
1153 split_insert_metadata(self, &query.columns, row_values)?;
1154 merge_with_clauses(
1155 &mut metadata,
1156 query.ttl_ms,
1157 query.expires_at_ms,
1158 &query.with_metadata,
1159 );
1160 let (columns, values) = pairwise_columns_values(&vector_values);
1161 let dense = find_column_value_vec_f32_any(
1162 &columns,
1163 &values,
1164 &["dense", "embedding"],
1165 )?;
1166 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1167 let content =
1168 find_column_value_opt_string(&columns, &values, "content");
1169 if query.returning.is_some() {
1170 returning_field_snaps.push(vector_values.clone());
1171 }
1172 let input = CreateVectorInput {
1173 collection: query.table.clone(),
1174 dense,
1175 content,
1176 metadata,
1177 link_row: None,
1178 link_node: None,
1179 };
1180 entity_outputs.push(self.create_vector(input)?);
1181 }
1182 InsertEntityType::Document => {
1183 let (document_values, mut metadata) =
1184 split_insert_metadata(self, &query.columns, row_values)?;
1185 merge_with_clauses(
1186 &mut metadata,
1187 query.ttl_ms,
1188 query.expires_at_ms,
1189 &query.with_metadata,
1190 );
1191 let (columns, values) = pairwise_columns_values(&document_values);
1192 let body_str = find_column_value_string(&columns, &values, "body")?;
1193 let body: crate::json::Value = crate::json::from_str(&body_str)
1194 .map_err(|e| {
1195 RedDBError::Query(format!("invalid JSON body: {e}"))
1196 })?;
1197 let input = CreateDocumentInput {
1198 collection: query.table.clone(),
1199 body,
1200 metadata,
1201 node_links: Vec::new(),
1202 vector_links: Vec::new(),
1203 };
1204 let output = self.create_document(input)?;
1205 if query.returning.is_some() {
1206 let fields = output
1207 .entity
1208 .as_ref()
1209 .map(entity_row_fields_snapshot)
1210 .filter(|fields| !fields.is_empty())
1211 .unwrap_or(document_values);
1212 returning_field_snaps.push(fields);
1213 }
1214 entity_outputs.push(output);
1215 }
1216 InsertEntityType::Kv => {
1217 let (kv_values, mut metadata) =
1218 split_insert_metadata(self, &query.columns, row_values)?;
1219 merge_with_clauses(
1220 &mut metadata,
1221 query.ttl_ms,
1222 query.expires_at_ms,
1223 &query.with_metadata,
1224 );
1225 let (columns, values) = pairwise_columns_values(&kv_values);
1226 let key = find_column_value_string(&columns, &values, "key")?;
1227 let value = find_column_value(&columns, &values, "value")?;
1228 if query.returning.is_some() {
1229 returning_field_snaps.push(kv_values.clone());
1230 }
1231 let input = CreateKvInput {
1232 collection: query.table.clone(),
1233 key,
1234 value,
1235 metadata,
1236 };
1237 entity_outputs.push(self.create_kv(input)?);
1238 }
1239 }
1240
1241 inserted_count += 1;
1242 }
1243 }
1244
1245 if let Some(items) = query.returning.as_ref() {
1246 if !entity_outputs.is_empty() {
1247 returning_result = Some(build_returning_result(
1248 items,
1249 &returning_field_snaps,
1250 Some(&entity_outputs),
1251 ));
1252 }
1253 }
1254 }
1255
1256 if let Some(ref embed_config) = query.auto_embed {
1258 let store = self.inner.db.store();
1259 let provider = crate::ai::parse_provider(&embed_config.provider)?;
1260 let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1261 let api_key = if is_local_provider {
1266 String::new()
1267 } else {
1268 crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1269 };
1270 let model = embed_config.model.clone().unwrap_or_else(|| {
1271 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1272 .ok()
1273 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1274 });
1275
1276 let manager = store
1278 .get_collection(&query.table)
1279 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1280 let entities = manager.query_all(|_| true);
1281 let recent: Vec<_> = entities
1282 .into_iter()
1283 .rev()
1284 .take(effective_rows.len())
1285 .collect();
1286
1287 let entity_combos: Vec<(usize, String)> = recent
1289 .iter()
1290 .enumerate()
1291 .filter_map(|(i, entity)| {
1292 if let EntityData::Row(ref row) = entity.data {
1293 if let Some(ref named) = row.named {
1294 let texts: Vec<String> = embed_config
1295 .fields
1296 .iter()
1297 .filter_map(|field| match named.get(field) {
1298 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1299 _ => None,
1300 })
1301 .collect();
1302 if !texts.is_empty() {
1303 return Some((i, texts.join(" ")));
1304 }
1305 }
1306 }
1307 None
1308 })
1309 .collect();
1310
1311 if !entity_combos.is_empty() {
1312 let batch_texts: Vec<String> =
1314 entity_combos.iter().map(|(_, t)| t.clone()).collect();
1315
1316 let embeddings = if is_local_provider {
1326 let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1327 &self.inner.db,
1328 &model,
1329 batch_texts,
1330 )?;
1331 response.embeddings
1332 } else {
1333 let batch_client =
1334 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1335
1336 match tokio::runtime::Handle::try_current() {
1337 Ok(handle) => tokio::task::block_in_place(|| {
1338 handle.block_on(batch_client.embed_batch(
1339 &provider,
1340 &model,
1341 &api_key,
1342 batch_texts,
1343 ))
1344 }),
1345 Err(_) => {
1346 return Err(RedDBError::Query(
1347 "AUTO EMBED requires a Tokio runtime context".to_string(),
1348 ));
1349 }
1350 }
1351 .map_err(|e| RedDBError::Query(e.to_string()))?
1352 };
1353
1354 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1356 if dense.is_empty() {
1357 continue;
1358 }
1359 self.create_vector(CreateVectorInput {
1360 collection: query.table.clone(),
1361 dense,
1362 content: Some(combined.clone()),
1363 metadata: Vec::new(),
1364 link_row: None,
1365 link_node: None,
1366 })?;
1367 }
1368 }
1369 }
1370
1371 if inserted_count > 0 {
1372 self.note_table_write(&query.table);
1373 }
1374
1375 let mut result = RuntimeQueryResult::dml_result(
1376 raw_query.to_string(),
1377 inserted_count,
1378 "insert",
1379 "runtime-dml",
1380 );
1381 if let Some(returning) = returning_result {
1382 result.result = returning;
1383 }
1384 Ok(result)
1385 }
1386
1387 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1388 let Some(auth_store) = self.inner.auth_store.read().clone() else {
1389 return Ok(());
1390 };
1391 if !auth_store.iam_authorization_enabled() {
1392 return Ok(());
1393 }
1394 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1395 return Ok(());
1396 };
1397
1398 let tenant = crate::runtime::impl_core::current_tenant();
1399 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1400 let request = crate::auth::ColumnAccessRequest {
1401 action: "insert".to_string(),
1402 schema: None,
1403 table: query.table.clone(),
1404 columns: query.columns.clone(),
1405 };
1406 let ctx = crate::auth::policies::EvalContext {
1407 principal_tenant: tenant.clone(),
1408 current_tenant: tenant,
1409 peer_ip: None,
1410 mfa_present: false,
1411 now_ms: crate::auth::now_ms(),
1412 principal_is_admin_role: role == crate::auth::Role::Admin,
1413 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1414 principal_is_platform_scoped: principal.tenant.is_none(),
1415 };
1416
1417 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1418 let table_allowed = matches!(
1419 outcome.table_decision,
1420 crate::auth::policies::Decision::Allow { .. }
1421 | crate::auth::policies::Decision::AdminBypass
1422 );
1423 if !table_allowed {
1424 return Err(RedDBError::Query(format!(
1425 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1426 outcome.table_resource.kind, outcome.table_resource.name
1427 )));
1428 }
1429 if let Some(denied) = outcome.first_denied_column() {
1430 return Err(RedDBError::Query(format!(
1431 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1432 denied.resource.kind, denied.resource.name
1433 )));
1434 }
1435
1436 Ok(())
1437 }
1438
1439 pub(crate) fn insert_timeseries_point(
1440 &self,
1441 collection: &str,
1442 fields: Vec<(String, Value)>,
1443 mut metadata: Vec<(String, MetadataValue)>,
1444 ) -> RedDBResult<EntityId> {
1445 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1446
1447 let (columns, values) = pairwise_columns_values(&fields);
1448 validate_timeseries_insert_columns(&columns)?;
1449
1450 let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1459 let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1460 if let Some(event_name) = event_name_opt.as_deref() {
1461 let store_for_schema = self.inner.db.store();
1462 if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1463 .is_some()
1464 {
1465 let payload_json = payload_opt.as_deref().unwrap_or("{}");
1466 super::analytics_schema_registry::validate(
1467 store_for_schema.as_ref(),
1468 event_name,
1469 payload_json,
1470 )
1471 .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1472 }
1473 }
1474
1475 let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1480 Some(m) => m,
1481 None => event_name_opt.clone().ok_or_else(|| {
1482 RedDBError::Query(
1483 "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1484 )
1485 })?,
1486 };
1487 let value = match find_column_value_opt_string(&columns, &values, "value") {
1491 Some(s) => s.parse::<f64>().unwrap_or(1.0),
1492 None => columns
1493 .iter()
1494 .position(|c| c.eq_ignore_ascii_case("value"))
1495 .and_then(|i| match &values[i] {
1496 Value::Float(f) => Some(*f),
1497 Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1498 Value::UnsignedInteger(n) => Some(*n as f64),
1499 _ => None,
1500 })
1501 .unwrap_or(1.0),
1502 };
1503 let timestamp_ns =
1504 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1505 let mut tags = find_timeseries_tags(&columns, &values)?;
1506 if let Some(ref name) = event_name_opt {
1507 tags.entry("event_name".to_string())
1508 .or_insert_with(|| name.clone());
1509 }
1510 if let Some(ref payload) = payload_opt {
1511 tags.entry("payload".to_string())
1512 .or_insert_with(|| payload.clone());
1513 }
1514
1515 let mut entity = UnifiedEntity::new(
1516 EntityId::new(0),
1517 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1518 series: collection.to_string(),
1519 metric: metric.clone(),
1520 })),
1521 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1522 metric,
1523 timestamp_ns,
1524 value,
1525 tags,
1526 }),
1527 );
1528 let writer_xid = match self.current_xid() {
1532 Some(xid) => xid,
1533 None => {
1534 let mgr = self.snapshot_manager();
1535 let xid = mgr.begin();
1536 mgr.commit(xid);
1537 xid
1538 }
1539 };
1540 entity.set_xmin(writer_xid);
1541
1542 let store = self.inner.db.store();
1543 let id = store
1544 .insert_auto(collection, entity)
1545 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1546
1547 if !metadata.is_empty() {
1548 let _ = store.set_metadata(
1549 collection,
1550 id,
1551 Metadata::with_fields(metadata.into_iter().collect()),
1552 );
1553 }
1554
1555 self.cdc_emit(
1556 crate::replication::cdc::ChangeOperation::Insert,
1557 collection,
1558 id.raw(),
1559 "timeseries",
1560 );
1561
1562 Ok(id)
1563 }
1564
1565 pub fn execute_update(
1570 &self,
1571 raw_query: &str,
1572 query: &UpdateQuery,
1573 ) -> RedDBResult<RuntimeQueryResult> {
1574 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1575 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1579 return Err(RedDBError::InvalidOperation(format!(
1580 "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1581 query.table
1582 )));
1583 }
1584 crate::runtime::collection_contract::CollectionContractGate::check(
1590 self,
1591 &query.table,
1592 crate::runtime::collection_contract::MutationKind::Update,
1593 )?;
1594 ensure_update_target_contract(self, &query.table, query.target)?;
1595 ensure_graph_identity_update_target_allowed(query)?;
1596
1597 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1603 let augmented_query: UpdateQuery;
1604 let effective_query: &UpdateQuery = if rls_gated {
1605 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1606 self,
1607 &query.table,
1608 crate::storage::query::ast::PolicyAction::Update,
1609 );
1610 let Some(policy) = rls_filter else {
1611 let mut response = RuntimeQueryResult::dml_result(
1614 raw_query.to_string(),
1615 0,
1616 "update",
1617 "runtime-dml-rls",
1618 );
1619 if let Some(items) = query.returning.clone() {
1620 response.result = build_returning_result(&items, &[], None);
1621 }
1622 return Ok(response);
1623 };
1624 let mut augmented = query.clone();
1625 augmented.filter = Some(match augmented.filter.take() {
1626 Some(existing) => {
1627 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1628 }
1629 None => policy,
1630 });
1631 augmented_query = augmented;
1632 &augmented_query
1633 } else {
1634 query
1635 };
1636
1637 if let Some(items) = effective_query.returning.clone() {
1642 let mut inner_query = effective_query.clone();
1643 inner_query.returning = None;
1644 let (mut response, touched_ids) =
1645 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1646
1647 let snapshots = if matches!(
1648 effective_query.target,
1649 UpdateTarget::Nodes | UpdateTarget::Edges
1650 ) {
1651 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1652 } else {
1653 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1654 .row_snapshots(&touched_ids)
1655 };
1656
1657 response.result = build_returning_result(&items, &snapshots, None);
1658 response.engine = "runtime-dml-returning";
1659 return Ok(response);
1660 }
1661
1662 self.execute_update_inner(raw_query, effective_query)
1663 }
1664
1665 fn execute_update_inner(
1667 &self,
1668 raw_query: &str,
1669 query: &UpdateQuery,
1670 ) -> RedDBResult<RuntimeQueryResult> {
1671 self.execute_update_inner_tracked(raw_query, query)
1672 .map(|(res, _)| res)
1673 }
1674
1675 fn execute_update_inner_tracked(
1676 &self,
1677 raw_query: &str,
1678 query: &UpdateQuery,
1679 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1680 let store = self.inner.db.store();
1681 let effective_filter = effective_update_filter(query);
1682 let compiled_plan = self.compile_update_plan(query)?;
1683 let mut touched_ids: Vec<EntityId> = Vec::new();
1684 let limit_cap = query.limit.map(|l| l as usize);
1685 let manager = store
1686 .get_collection(&query.table)
1687 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1688 let scan_limit = if query.order_by.is_empty() {
1689 limit_cap
1690 } else {
1691 None
1692 };
1693 let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1694 self,
1695 &query.table,
1696 effective_filter.as_ref(),
1697 scan_limit,
1698 query.target,
1699 )
1700 .find_target_ids()?;
1701 let ids_to_update = if query.order_by.is_empty() {
1702 ids_to_update
1703 } else {
1704 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1705 };
1706
1707 if update_needs_rmw_lock(query) {
1708 return self.execute_update_inner_tracked_locked(
1709 raw_query,
1710 query,
1711 &compiled_plan,
1712 &ids_to_update,
1713 effective_filter.as_ref(),
1714 );
1715 }
1716
1717 let mut affected: u64 = 0;
1718 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1719 let mut applied_chunk = Vec::with_capacity(chunk.len());
1720 for entity in manager.get_many(chunk).into_iter().flatten() {
1721 let assignments =
1722 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1723 let applied = self.apply_materialized_update_for_entity(
1724 query.table.clone(),
1725 entity,
1726 &compiled_plan,
1727 assignments,
1728 )?;
1729 touched_ids.push(applied.id);
1730 applied_chunk.push(applied);
1731 }
1732 self.persist_update_chunk(&applied_chunk)?;
1733 affected += applied_chunk.len() as u64;
1734 let lsns = self.flush_update_chunk(&applied_chunk)?;
1735 if !query.suppress_events {
1736 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1737 }
1738 }
1739
1740 if affected > 0 {
1741 self.note_table_write(&query.table);
1742 }
1743
1744 Ok((
1745 RuntimeQueryResult::dml_result(
1746 raw_query.to_string(),
1747 affected,
1748 "update",
1749 "runtime-dml",
1750 ),
1751 touched_ids,
1752 ))
1753 }
1754
1755 fn execute_update_inner_tracked_locked(
1756 &self,
1757 raw_query: &str,
1758 query: &UpdateQuery,
1759 compiled_plan: &CompiledUpdatePlan,
1760 ids_to_update: &[EntityId],
1761 effective_filter: Option<&Filter>,
1762 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1763 let store = self.inner.db.store();
1764 let mut touched_ids = Vec::new();
1765 let mut lock_entries = Vec::new();
1766
1767 for id in ids_to_update {
1768 let Some(candidate) = store.get(&query.table, *id) else {
1769 continue;
1770 };
1771 let logical_id = candidate.logical_id();
1772 let lock_key = format!("row:{}", logical_id.raw());
1773 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1774 lock_entries.push((lock_key, logical_id, rmw_lock));
1775 }
1776
1777 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1778 lock_entries.dedup_by(|left, right| left.0 == right.0);
1779 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1780
1781 let mut applied_chunk = Vec::new();
1782 for (_, logical_id, _) in &lock_entries {
1783 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1784 else {
1785 continue;
1786 };
1787 if let Some(filter) = effective_filter {
1788 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1789 Some(self.inner.db.as_ref()),
1790 &entity,
1791 filter,
1792 &query.table,
1793 &query.table,
1794 ) {
1795 continue;
1796 }
1797 }
1798
1799 let assignments =
1800 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1801 let applied = self.apply_materialized_update_for_entity(
1802 query.table.clone(),
1803 entity,
1804 compiled_plan,
1805 assignments,
1806 )?;
1807 touched_ids.push(applied.id);
1808 applied_chunk.push(applied);
1809 }
1810
1811 let affected = applied_chunk.len() as u64;
1812 if !applied_chunk.is_empty() {
1813 self.persist_update_chunk(&applied_chunk)?;
1814 let lsns = self.flush_update_chunk(&applied_chunk)?;
1815 if !query.suppress_events {
1816 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1817 }
1818 }
1819
1820 if affected > 0 {
1821 self.note_table_write(&query.table);
1822 }
1823
1824 Ok((
1825 RuntimeQueryResult::dml_result(
1826 raw_query.to_string(),
1827 affected,
1828 "update",
1829 "runtime-dml",
1830 ),
1831 touched_ids,
1832 ))
1833 }
1834
1835 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1836 let mut static_field_assignments = Vec::new();
1837 let mut static_metadata_assignments = Vec::new();
1838 let mut dynamic_assignments = Vec::new();
1839 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1840 let mut row_modified_columns = Vec::new();
1841
1842 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1843 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1844 let metadata_key = resolve_sql_ttl_metadata_key(column);
1845 if compound_op.is_some() && metadata_key.is_some() {
1846 return Err(RedDBError::Query(format!(
1847 "compound assignment is only supported for row fields: {column}"
1848 )));
1849 }
1850 if compound_op.is_none() {
1851 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1852 if let Some(metadata_key) = metadata_key {
1853 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1854 let (canonical_key, canonical_value) =
1855 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1856 static_metadata_assignments
1857 .push((canonical_key.to_string(), canonical_value));
1858 } else {
1859 let value = self.resolve_crypto_sentinel(value)?;
1860 static_field_assignments.push((
1861 column.clone(),
1862 normalize_row_update_assignment_with_plan(
1863 &query.table,
1864 column,
1865 value,
1866 row_contract_plan.as_ref(),
1867 )?,
1868 ));
1869 row_modified_columns.push(column.clone());
1870 }
1871 continue;
1872 }
1873 }
1874
1875 dynamic_assignments.push(CompiledUpdateAssignment {
1876 column: column.clone(),
1877 expr: expr.clone(),
1878 compound_op,
1879 metadata_key,
1880 row_rule: if metadata_key.is_none() {
1881 if let Some(plan) = row_contract_plan.as_ref() {
1882 if plan.timestamps_enabled
1883 && (column == "created_at" || column == "updated_at")
1884 {
1885 return Err(RedDBError::Query(format!(
1886 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1887 query.table, column
1888 )));
1889 }
1890 if let Some(rule) = plan.declared_rules.get(column) {
1891 Some(rule.clone())
1892 } else if plan.strict_schema {
1893 return Err(RedDBError::Query(format!(
1894 "collection '{}' is strict and does not allow undeclared fields: {}",
1895 query.table, column
1896 )));
1897 } else {
1898 None
1899 }
1900 } else {
1901 None
1902 }
1903 } else {
1904 None
1905 },
1906 });
1907 if metadata_key.is_none() {
1908 row_modified_columns.push(column.clone());
1909 }
1910 }
1911
1912 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1913 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1914 row_modified_columns.iter().any(|column| {
1915 plan.unique_columns
1916 .keys()
1917 .any(|unique| unique.eq_ignore_ascii_case(column))
1918 })
1919 });
1920
1921 if let Some(ttl_ms) = query.ttl_ms {
1922 static_metadata_assignments
1923 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1924 }
1925 if let Some(expires_at_ms) = query.expires_at_ms {
1926 static_metadata_assignments.push((
1927 "_expires_at".to_string(),
1928 metadata_u64_to_value(expires_at_ms),
1929 ));
1930 }
1931 for (key, val) in &query.with_metadata {
1932 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1933 }
1934
1935 Ok(CompiledUpdatePlan {
1936 static_field_assignments,
1937 static_metadata_assignments,
1938 dynamic_assignments,
1939 row_contract_plan,
1940 row_modified_columns,
1941 row_touches_unique_columns,
1942 })
1943 }
1944
1945 fn materialize_update_assignments_for_entity(
1946 &self,
1947 query: &UpdateQuery,
1948 entity: &UnifiedEntity,
1949 compiled_plan: &CompiledUpdatePlan,
1950 ) -> RedDBResult<MaterializedUpdateAssignments> {
1951 let mut assignments = MaterializedUpdateAssignments::default();
1952 let mut record: Option<UnifiedRecord> = None;
1953
1954 for assignment in &compiled_plan.dynamic_assignments {
1955 if assignment.compound_op.is_some()
1956 && !matches!(
1957 entity.data,
1958 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1959 )
1960 {
1961 return Err(RedDBError::Query(format!(
1962 "compound assignment is only supported for row or graph UPDATE column '{}'",
1963 assignment.column
1964 )));
1965 }
1966 if record.is_none() {
1967 record = runtime_any_record_from_entity_ref(entity);
1968 }
1969 let Some(record) = record.as_ref() else {
1970 return Err(RedDBError::Query(format!(
1971 "UPDATE could not materialize runtime record for entity {} in '{}'",
1972 entity.id.raw(),
1973 query.table
1974 )));
1975 };
1976 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1977 Some(self.inner.db.as_ref()),
1978 &assignment.expr,
1979 record,
1980 Some(query.table.as_str()),
1981 Some(query.table.as_str()),
1982 )
1983 .ok_or_else(|| {
1984 RedDBError::Query(format!(
1985 "failed to evaluate UPDATE expression for column '{}'",
1986 assignment.column
1987 ))
1988 })?;
1989 let value = if let Some(op) = assignment.compound_op {
1990 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1991 } else {
1992 rhs
1993 };
1994
1995 if let Some(metadata_key) = assignment.metadata_key {
1996 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1997 let (canonical_key, canonical_value) =
1998 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1999 assignments
2000 .dynamic_metadata_assignments
2001 .push((canonical_key.to_string(), canonical_value));
2002 } else {
2003 assignments.dynamic_field_assignments.push((
2004 assignment.column.clone(),
2005 normalize_row_update_value_for_rule(
2006 &query.table,
2007 self.resolve_crypto_sentinel(value)?,
2008 assignment.row_rule.as_ref(),
2009 )?,
2010 ));
2011 }
2012 }
2013
2014 Ok(assignments)
2015 }
2016
2017 fn apply_materialized_update_for_entity(
2018 &self,
2019 collection: String,
2020 entity: UnifiedEntity,
2021 compiled_plan: &CompiledUpdatePlan,
2022 assignments: MaterializedUpdateAssignments,
2023 ) -> RedDBResult<AppliedEntityMutation> {
2024 if matches!(entity.data, EntityData::Row(_)) {
2025 return self.apply_loaded_sql_update_row_core(
2026 collection,
2027 entity,
2028 &compiled_plan.static_field_assignments,
2029 assignments.dynamic_field_assignments,
2030 &compiled_plan.static_metadata_assignments,
2031 assignments.dynamic_metadata_assignments,
2032 compiled_plan.row_contract_plan.as_ref(),
2033 &compiled_plan.row_modified_columns,
2034 compiled_plan.row_touches_unique_columns,
2035 );
2036 }
2037
2038 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2039
2040 let operations = build_patch_operations_from_materialized_assignments(
2041 &entity,
2042 compiled_plan,
2043 assignments,
2044 );
2045 self.apply_loaded_patch_entity_core(
2046 collection,
2047 entity,
2048 crate::json::Value::Null,
2049 operations,
2050 )
2051 }
2052
2053 pub fn execute_delete(
2055 &self,
2056 raw_query: &str,
2057 query: &DeleteQuery,
2058 ) -> RedDBResult<RuntimeQueryResult> {
2059 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2060 if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2063 return Err(RedDBError::InvalidOperation(format!(
2064 "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2065 query.table
2066 )));
2067 }
2068 crate::runtime::collection_contract::CollectionContractGate::check(
2072 self,
2073 &query.table,
2074 crate::runtime::collection_contract::MutationKind::Delete,
2075 )?;
2076
2077 if let Some(items) = query.returning.clone() {
2084 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2085 RedDBError::Query(
2086 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2087 )
2088 })?;
2089 let captured = self.execute_query(&select_sql)?;
2090
2091 let mut inner_query = query.clone();
2092 inner_query.returning = None;
2093 let _ = self.execute_delete(raw_query, &inner_query)?;
2094
2095 let snapshots: Vec<Vec<(String, Value)>> = captured
2096 .result
2097 .records
2098 .iter()
2099 .map(|rec| {
2100 rec.iter_fields()
2101 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2102 .collect()
2103 })
2104 .collect();
2105 let affected = snapshots.len() as u64;
2106 let result = build_returning_result(&items, &snapshots, None);
2107
2108 let mut response = RuntimeQueryResult::dml_result(
2109 raw_query.to_string(),
2110 affected,
2111 "delete",
2112 "runtime-dml-returning",
2113 );
2114 response.result = result;
2115 return Ok(response);
2116 }
2117 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2124 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2125 self,
2126 &query.table,
2127 crate::storage::query::ast::PolicyAction::Delete,
2128 );
2129 let Some(policy) = rls_filter else {
2130 return Ok(RuntimeQueryResult::dml_result(
2131 raw_query.to_string(),
2132 0,
2133 "delete",
2134 "runtime-dml-rls",
2135 ));
2136 };
2137 let mut augmented = query.clone();
2142 augmented.filter = Some(match augmented.filter.take() {
2143 Some(existing) => {
2144 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2145 }
2146 None => policy,
2147 });
2148 return self.execute_delete_inner(raw_query, &augmented);
2149 }
2150 self.execute_delete_inner(raw_query, query)
2151 }
2152
2153 fn execute_delete_inner(
2154 &self,
2155 raw_query: &str,
2156 query: &DeleteQuery,
2157 ) -> RedDBResult<RuntimeQueryResult> {
2158 let effective_filter = effective_delete_filter(query);
2159
2160 let scan = super::dml_target_scan::DmlTargetScan::new(
2164 self,
2165 &query.table,
2166 effective_filter.as_ref(),
2167 None,
2168 );
2169 let ids_to_delete = scan.find_target_ids()?;
2170
2171 let needs_delete_events =
2174 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2175 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2176 scan.row_json_pre_images(&ids_to_delete)
2177 } else {
2178 HashMap::new()
2179 };
2180
2181 let mut affected: u64 = 0;
2182 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2183 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2184 affected += count;
2185 if needs_delete_events && !lsns.is_empty() {
2186 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2190 self.emit_delete_events_for_collection(
2191 &query.table,
2192 deleted_chunk,
2193 &lsns,
2194 &pre_images,
2195 )?;
2196 }
2197 }
2198 pre_images.clear();
2199
2200 if affected > 0 {
2201 self.note_table_write(&query.table);
2202 }
2203
2204 Ok(RuntimeQueryResult::dml_result(
2205 raw_query.to_string(),
2206 affected,
2207 "delete",
2208 "runtime-dml",
2209 ))
2210 }
2211}
2212
2213fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2219 if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2220 return Ok(());
2221 }
2222 for (column, _) in &query.assignment_exprs {
2223 if is_immutable_graph_identity_field(column) {
2224 return Err(RedDBError::Query(format!(
2225 "immutable graph field '{column}' cannot be updated"
2226 )));
2227 }
2228 }
2229 Ok(())
2230}
2231
2232fn ensure_graph_identity_update_allowed(
2233 entity: &UnifiedEntity,
2234 compiled_plan: &CompiledUpdatePlan,
2235 assignments: &MaterializedUpdateAssignments,
2236) -> RedDBResult<()> {
2237 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2238 return Ok(());
2239 }
2240
2241 for (column, _) in compiled_plan
2242 .static_field_assignments
2243 .iter()
2244 .chain(assignments.dynamic_field_assignments.iter())
2245 {
2246 if is_immutable_graph_identity_field(column) {
2247 return Err(RedDBError::Query(format!(
2248 "immutable graph field '{column}' cannot be updated"
2249 )));
2250 }
2251 }
2252
2253 Ok(())
2254}
2255
2256fn is_immutable_graph_identity_field(column: &str) -> bool {
2257 ["rid", "label", "from_rid", "to_rid", "from", "to"]
2258 .iter()
2259 .any(|reserved| column.eq_ignore_ascii_case(reserved))
2260}
2261
2262fn build_patch_operations_from_materialized_assignments(
2263 entity: &UnifiedEntity,
2264 compiled_plan: &CompiledUpdatePlan,
2265 assignments: MaterializedUpdateAssignments,
2266) -> Vec<PatchEntityOperation> {
2267 let mut operations = Vec::with_capacity(
2268 compiled_plan.static_field_assignments.len()
2269 + compiled_plan.static_metadata_assignments.len()
2270 + assignments.dynamic_field_assignments.len()
2271 + assignments.dynamic_metadata_assignments.len(),
2272 );
2273
2274 for (column, value) in &compiled_plan.static_field_assignments {
2275 operations.push(PatchEntityOperation {
2276 op: PatchEntityOperationType::Set,
2277 path: update_patch_path_for_entity(entity, column),
2278 value: Some(storage_value_to_json(value)),
2279 });
2280 }
2281
2282 for (column, value) in assignments.dynamic_field_assignments {
2283 operations.push(PatchEntityOperation {
2284 op: PatchEntityOperationType::Set,
2285 path: update_patch_path_for_entity(entity, &column),
2286 value: Some(storage_value_to_json(&value)),
2287 });
2288 }
2289
2290 for (key, value) in &compiled_plan.static_metadata_assignments {
2291 operations.push(PatchEntityOperation {
2292 op: PatchEntityOperationType::Set,
2293 path: vec!["metadata".to_string(), key.clone()],
2294 value: Some(metadata_value_to_json(value)),
2295 });
2296 }
2297
2298 for (key, value) in assignments.dynamic_metadata_assignments {
2299 operations.push(PatchEntityOperation {
2300 op: PatchEntityOperationType::Set,
2301 path: vec!["metadata".to_string(), key],
2302 value: Some(metadata_value_to_json(&value)),
2303 });
2304 }
2305
2306 operations
2307}
2308
2309fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2310 if matches!(
2311 (&entity.kind, &entity.data),
2312 (
2313 crate::storage::EntityKind::GraphNode(_),
2314 EntityData::Node(_)
2315 )
2316 ) && column.eq_ignore_ascii_case("node_type")
2317 {
2318 return vec!["node_type".to_string()];
2319 }
2320 if matches!(
2321 (&entity.kind, &entity.data),
2322 (
2323 crate::storage::EntityKind::GraphEdge(_),
2324 EntityData::Edge(_)
2325 )
2326 ) && column.eq_ignore_ascii_case("weight")
2327 {
2328 return vec!["weight".to_string()];
2329 }
2330 vec!["fields".to_string(), column.to_string()]
2331}
2332
2333fn delete_to_select_sql(sql: &str) -> Option<String> {
2343 let trimmed = sql.trim_start();
2344 let lowered = trimmed.to_ascii_lowercase();
2345 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2346 return None;
2347 }
2348 let from_idx = lowered.find(" from ")?;
2350 let after_from = &trimmed[from_idx + " from ".len()..];
2351 let after_from_lc = &lowered[from_idx + " from ".len()..];
2352
2353 let mut body = after_from.to_string();
2358 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2359 body.truncate(pos);
2360 }
2361 Some(format!("SELECT * FROM {}", body.trim_end()))
2362}
2363
2364fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2369 let bytes = haystack.as_bytes();
2370 let nlen = needle.len();
2371 let mut i = 0usize;
2372 let mut in_string = false;
2373 while i < bytes.len() {
2374 let c = bytes[i];
2375 if c == b'\'' {
2376 in_string = !in_string;
2377 i += 1;
2378 continue;
2379 }
2380 if !in_string
2381 && i + nlen <= bytes.len()
2382 && &bytes[i..i + nlen] == needle.as_bytes()
2383 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2384 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2385 {
2386 return Some(i);
2387 }
2388 i += 1;
2389 }
2390 None
2391}
2392
2393fn build_returning_result(
2400 items: &[ReturningItem],
2401 snapshots: &[Vec<(String, Value)>],
2402 outputs: Option<&[CreateEntityOutput]>,
2403) -> UnifiedResult {
2404 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2405 let public_item_outputs = outputs.is_some_and(|outs| {
2406 outs.first()
2407 .and_then(|out| out.entity.as_ref())
2408 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2409 });
2410
2411 let mut columns: Vec<String> = if project_all {
2412 let mut cols: Vec<String> = Vec::new();
2413 if public_item_outputs {
2414 cols.extend(
2415 [
2416 "rid",
2417 "collection",
2418 "kind",
2419 "tenant",
2420 "created_at",
2421 "updated_at",
2422 ]
2423 .into_iter()
2424 .map(str::to_string),
2425 );
2426 } else if outputs.is_some() {
2427 cols.push("red_entity_id".to_string());
2428 }
2429 if let Some(first) = snapshots.first() {
2430 for (name, _) in first {
2431 cols.push(name.clone());
2432 }
2433 }
2434 cols
2435 } else {
2436 items
2437 .iter()
2438 .filter_map(|it| match it {
2439 ReturningItem::Column(c) => Some(c.clone()),
2440 ReturningItem::All => None,
2441 })
2442 .collect()
2443 };
2444 {
2446 let mut seen = std::collections::HashSet::new();
2447 columns.retain(|c| seen.insert(c.clone()));
2448 }
2449
2450 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2451 for (idx, snap) in snapshots.iter().enumerate() {
2452 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2453 if let Some(outs) = outputs {
2454 if let Some(out) = outs.get(idx) {
2455 if let Some(entity) = out.entity.as_ref() {
2456 if let Some(kind) = public_returning_item_kind(entity) {
2457 values.insert(
2458 Arc::clone(&sys_key_rid()),
2459 Value::UnsignedInteger(out.id.raw()),
2460 );
2461 values.insert(
2462 Arc::clone(&sys_key_collection()),
2463 Value::text(entity.kind.collection().to_string()),
2464 );
2465 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2466 values.insert(
2467 Arc::clone(&sys_key_created_at()),
2468 Value::UnsignedInteger(entity.created_at),
2469 );
2470 values.insert(
2471 Arc::clone(&sys_key_updated_at()),
2472 Value::UnsignedInteger(entity.updated_at),
2473 );
2474 values.insert(
2479 Arc::clone(&sys_key_red_entity_id()),
2480 Value::UnsignedInteger(out.id.raw()),
2481 );
2482 } else {
2483 values.insert(
2484 Arc::clone(&sys_key_red_entity_id()),
2485 Value::Integer(out.id.raw() as i64),
2486 );
2487 }
2488 } else {
2489 values.insert(
2490 Arc::clone(&sys_key_red_entity_id()),
2491 Value::Integer(out.id.raw() as i64),
2492 );
2493 }
2494 }
2495 }
2496 for (name, val) in snap {
2497 values.insert(Arc::from(name.as_str()), val.clone());
2498 }
2499 if !values.contains_key("tenant") {
2500 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2501 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2502 }
2503 let mut rec = UnifiedRecord::default();
2504 for col in &columns {
2506 if let Some(v) = values.get(col.as_str()) {
2507 rec.set_arc(Arc::from(col.as_str()), v.clone());
2508 }
2509 }
2510 records.push(rec);
2511 }
2512
2513 UnifiedResult {
2514 columns,
2515 records,
2516 stats: Default::default(),
2517 pre_serialized_json: None,
2518 }
2519}
2520
2521fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2522 match (&entity.kind, &entity.data) {
2523 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2524 Some("node")
2525 }
2526 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2527 Some("edge")
2528 }
2529 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2530 _ => None,
2531 }
2532}
2533
2534fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2535 let Some(row) = entity.data.as_row() else {
2536 return "row";
2537 };
2538
2539 let is_kv = row.named.as_ref().is_some_and(|named| {
2540 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2541 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2542 });
2543 if is_kv {
2544 return "kv";
2545 }
2546
2547 let is_document = row
2548 .named
2549 .as_ref()
2550 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2551 || row.columns.iter().any(runtime_returning_documentish_value);
2552 if is_document {
2553 "document"
2554 } else {
2555 "row"
2556 }
2557}
2558
2559fn runtime_returning_documentish_value(value: &Value) -> bool {
2560 matches!(value, Value::Json(_) | Value::Blob(_))
2561}
2562
2563fn row_insert_returning_snapshots(
2564 outputs: &[CreateEntityOutput],
2565 fallback: Vec<Vec<(String, Value)>>,
2566) -> Vec<Vec<(String, Value)>> {
2567 outputs
2568 .iter()
2569 .enumerate()
2570 .map(|(idx, out)| {
2571 out.entity
2572 .as_ref()
2573 .map(entity_row_fields_snapshot)
2574 .filter(|snap| !snap.is_empty())
2575 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2576 })
2577 .collect()
2578}
2579
2580fn graph_insert_returning_snapshots(
2581 store: &crate::storage::unified::UnifiedStore,
2582 collection: &str,
2583 ids: &[EntityId],
2584) -> Vec<Vec<(String, Value)>> {
2585 let Some(manager) = store.get_collection(collection) else {
2586 return Vec::new();
2587 };
2588
2589 ids.iter()
2590 .filter_map(|id| manager.get(*id))
2591 .filter_map(|entity| {
2592 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2593 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2594 Some(record)
2595 })
2596 .map(|record| {
2597 record
2598 .iter_fields()
2599 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2600 .collect()
2601 })
2602 .collect()
2603}
2604
2605fn graph_update_returning_snapshots(
2606 runtime: &RedDBRuntime,
2607 collection: &str,
2608 ids: &[EntityId],
2609) -> Vec<Vec<(String, Value)>> {
2610 let store = runtime.db().store();
2611 let Some(manager) = store.get_collection(collection) else {
2612 return Vec::new();
2613 };
2614
2615 manager
2616 .get_many(ids)
2617 .into_iter()
2618 .flatten()
2619 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2620 .map(|record| {
2621 record
2622 .iter_fields()
2623 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2624 .collect()
2625 })
2626 .collect()
2627}
2628
2629fn ensure_update_target_contract(
2630 runtime: &RedDBRuntime,
2631 collection: &str,
2632 target: UpdateTarget,
2633) -> RedDBResult<()> {
2634 let Some(contract) = runtime.db().collection_contract(collection) else {
2635 return Ok(());
2636 };
2637 if update_target_contract_is_advisory(&contract)
2638 || update_target_allows_model(contract.declared_model, update_target_model(target))
2639 {
2640 return Ok(());
2641 }
2642 Err(RedDBError::InvalidOperation(format!(
2643 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2644 collection,
2645 update_model_name(contract.declared_model),
2646 update_model_name(update_target_model(target))
2647 )))
2648}
2649
2650fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2651 matches!(
2652 (&contract.origin, &contract.schema_mode),
2653 (
2654 crate::physical::ContractOrigin::Implicit,
2655 crate::catalog::SchemaMode::Dynamic,
2656 )
2657 )
2658}
2659
2660fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2661 match target {
2662 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2663 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2664 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2665 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2666 }
2667}
2668
2669fn update_target_allows_model(
2670 declared_model: crate::catalog::CollectionModel,
2671 requested_model: crate::catalog::CollectionModel,
2672) -> bool {
2673 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2674}
2675
2676fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2677 match model {
2678 crate::catalog::CollectionModel::Table => "table",
2679 crate::catalog::CollectionModel::Document => "document",
2680 crate::catalog::CollectionModel::Graph => "graph",
2681 crate::catalog::CollectionModel::Vector => "vector",
2682 crate::catalog::CollectionModel::Hll => "hll",
2683 crate::catalog::CollectionModel::Sketch => "sketch",
2684 crate::catalog::CollectionModel::Filter => "filter",
2685 crate::catalog::CollectionModel::Kv => "kv",
2686 crate::catalog::CollectionModel::Config => "config",
2687 crate::catalog::CollectionModel::Vault => "vault",
2688 crate::catalog::CollectionModel::Mixed => "mixed",
2689 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2690 crate::catalog::CollectionModel::Queue => "queue",
2691 crate::catalog::CollectionModel::Metrics => "metrics",
2692 }
2693}
2694
2695fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2696 let db = runtime.db();
2697 if let Some(contract) = db.collection_contract(collection) {
2698 let advisory_implicit_dynamic = matches!(
2699 (&contract.origin, &contract.schema_mode),
2700 (
2701 crate::physical::ContractOrigin::Implicit,
2702 crate::catalog::SchemaMode::Dynamic,
2703 )
2704 );
2705 if advisory_implicit_dynamic
2706 || matches!(
2707 contract.declared_model,
2708 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2709 )
2710 {
2711 return Ok(());
2712 }
2713 return Err(RedDBError::InvalidOperation(format!(
2714 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2715 collection, contract.declared_model
2716 )));
2717 }
2718
2719 let now = std::time::SystemTime::now()
2720 .duration_since(std::time::UNIX_EPOCH)
2721 .unwrap_or_default()
2722 .as_millis();
2723 db.save_collection_contract(crate::physical::CollectionContract {
2724 name: collection.to_string(),
2725 declared_model: crate::catalog::CollectionModel::Graph,
2726 schema_mode: crate::catalog::SchemaMode::Dynamic,
2727 origin: crate::physical::ContractOrigin::Implicit,
2728 version: 1,
2729 created_at_unix_ms: now,
2730 updated_at_unix_ms: now,
2731 default_ttl_ms: db.collection_default_ttl_ms(collection),
2732 vector_dimension: None,
2733 vector_metric: None,
2734 context_index_fields: Vec::new(),
2735 declared_columns: Vec::new(),
2736 table_def: None,
2737 timestamps_enabled: false,
2738 context_index_enabled: false,
2739 metrics_raw_retention_ms: None,
2740 metrics_rollup_policies: Vec::new(),
2741 metrics_tenant_identity: None,
2742 metrics_namespace: None,
2743 append_only: false,
2744 subscriptions: Vec::new(),
2745 session_key: None,
2746 session_gap_ms: None,
2747 retention_duration_ms: None,
2748 })
2749 .map(|_| ())
2750 .map_err(|err| RedDBError::Internal(err.to_string()))
2751}
2752
2753fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2754 query
2755 .assignment_exprs
2756 .iter()
2757 .enumerate()
2758 .any(|(idx, (column, expr))| {
2759 query
2760 .compound_assignment_ops
2761 .get(idx)
2762 .is_some_and(|op| op.is_some())
2763 || expr_references_update_column(expr, &query.table, column)
2764 })
2765}
2766
2767fn evaluate_compound_update_assignment(
2768 column: &str,
2769 record: &UnifiedRecord,
2770 op: BinOp,
2771 rhs: Value,
2772) -> RedDBResult<Value> {
2773 let lhs = record.get(column).ok_or_else(|| {
2774 RedDBError::Query(format!(
2775 "compound assignment requires existing numeric field '{column}'"
2776 ))
2777 })?;
2778 if matches!(lhs, Value::Null) {
2779 return Err(RedDBError::Query(format!(
2780 "compound assignment requires non-null numeric field '{column}'"
2781 )));
2782 }
2783 apply_compound_numeric_op(column, op, lhs, &rhs)
2784}
2785
2786fn apply_compound_numeric_op(
2787 column: &str,
2788 op: BinOp,
2789 lhs: &Value,
2790 rhs: &Value,
2791) -> RedDBResult<Value> {
2792 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2793 return Err(RedDBError::Query(format!(
2794 "compound assignment requires numeric field '{column}'"
2795 )));
2796 };
2797 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2798 return Err(RedDBError::Query(format!(
2799 "compound assignment requires numeric right-hand value for field '{column}'"
2800 )));
2801 };
2802
2803 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2804 let a = lhs_number.as_f64();
2805 let b = rhs_number.as_f64();
2806 let out = match op {
2807 BinOp::Add => a + b,
2808 BinOp::Sub => a - b,
2809 BinOp::Mul => a * b,
2810 BinOp::Div => {
2811 if b == 0.0 {
2812 return Err(RedDBError::Query(format!(
2813 "division by zero in compound assignment for field '{column}'"
2814 )));
2815 }
2816 a / b
2817 }
2818 BinOp::Mod => {
2819 if b == 0.0 {
2820 return Err(RedDBError::Query(format!(
2821 "modulo by zero in compound assignment for field '{column}'"
2822 )));
2823 }
2824 a % b
2825 }
2826 _ => {
2827 return Err(RedDBError::Query(format!(
2828 "unsupported compound assignment operator for field '{column}'"
2829 )));
2830 }
2831 };
2832 if !out.is_finite() {
2833 return Err(RedDBError::Query(format!(
2834 "numeric overflow in compound assignment for field '{column}'"
2835 )));
2836 }
2837 return Ok(Value::Float(out));
2838 }
2839
2840 let a = lhs_number.as_i128();
2841 let b = rhs_number.as_i128();
2842 let out = match op {
2843 BinOp::Add => a.checked_add(b),
2844 BinOp::Sub => a.checked_sub(b),
2845 BinOp::Mul => a.checked_mul(b),
2846 BinOp::Mod => {
2847 if b == 0 {
2848 return Err(RedDBError::Query(format!(
2849 "modulo by zero in compound assignment for field '{column}'"
2850 )));
2851 }
2852 a.checked_rem(b)
2853 }
2854 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2855 _ => None,
2856 }
2857 .ok_or_else(|| {
2858 RedDBError::Query(format!(
2859 "numeric overflow in compound assignment for field '{column}'"
2860 ))
2861 })?;
2862
2863 if matches!(lhs, Value::UnsignedInteger(_)) {
2864 let value = u64::try_from(out).map_err(|_| {
2865 RedDBError::Query(format!(
2866 "numeric overflow in compound assignment for field '{column}'"
2867 ))
2868 })?;
2869 Ok(Value::UnsignedInteger(value))
2870 } else {
2871 let value = i64::try_from(out).map_err(|_| {
2872 RedDBError::Query(format!(
2873 "numeric overflow in compound assignment for field '{column}'"
2874 ))
2875 })?;
2876 Ok(Value::Integer(value))
2877 }
2878}
2879
2880#[derive(Clone, Copy)]
2881enum CompoundNumber {
2882 Integer(i128),
2883 Float(f64),
2884}
2885
2886impl CompoundNumber {
2887 fn from_value(value: &Value) -> Option<Self> {
2888 match value {
2889 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2890 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2891 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2892 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2893 _ => None,
2894 }
2895 }
2896
2897 fn is_float(self) -> bool {
2898 matches!(self, Self::Float(_))
2899 }
2900
2901 fn as_f64(self) -> f64 {
2902 match self {
2903 Self::Integer(value) => value as f64,
2904 Self::Float(value) => value,
2905 }
2906 }
2907
2908 fn as_i128(self) -> i128 {
2909 match self {
2910 Self::Integer(value) => value,
2911 Self::Float(_) => unreachable!("float compound number used as integer"),
2912 }
2913 }
2914}
2915
2916fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2917 match expr {
2918 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2919 Expr::Column { field, .. } => {
2920 field_ref_matches_update_column(field, table_name, target_column)
2921 }
2922 Expr::BinaryOp { lhs, rhs, .. } => {
2923 expr_references_update_column(lhs, table_name, target_column)
2924 || expr_references_update_column(rhs, table_name, target_column)
2925 }
2926 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2927 expr_references_update_column(operand, table_name, target_column)
2928 }
2929 Expr::FunctionCall { args, .. } => args
2930 .iter()
2931 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2932 Expr::Case {
2933 branches, else_, ..
2934 } => {
2935 branches.iter().any(|(cond, value)| {
2936 expr_references_update_column(cond, table_name, target_column)
2937 || expr_references_update_column(value, table_name, target_column)
2938 }) || else_
2939 .as_deref()
2940 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2941 }
2942 Expr::IsNull { operand, .. } => {
2943 expr_references_update_column(operand, table_name, target_column)
2944 }
2945 Expr::InList { target, values, .. } => {
2946 expr_references_update_column(target, table_name, target_column)
2947 || values
2948 .iter()
2949 .any(|value| expr_references_update_column(value, table_name, target_column))
2950 }
2951 Expr::Between {
2952 target, low, high, ..
2953 } => {
2954 expr_references_update_column(target, table_name, target_column)
2955 || expr_references_update_column(low, table_name, target_column)
2956 || expr_references_update_column(high, table_name, target_column)
2957 }
2958 Expr::WindowFunctionCall { args, window, .. } => {
2959 args.iter()
2960 .any(|arg| expr_references_update_column(arg, table_name, target_column))
2961 || window
2962 .partition_by
2963 .iter()
2964 .any(|e| expr_references_update_column(e, table_name, target_column))
2965 || window
2966 .order_by
2967 .iter()
2968 .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
2969 }
2970 }
2971}
2972
2973fn field_ref_matches_update_column(
2974 field: &FieldRef,
2975 table_name: &str,
2976 target_column: &str,
2977) -> bool {
2978 match field {
2979 FieldRef::TableColumn { table, column } => {
2980 column.eq_ignore_ascii_case(target_column)
2981 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2982 }
2983 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2984 false
2985 }
2986 }
2987}
2988
2989fn resolve_update_entity_by_logical_id(
2990 runtime: &RedDBRuntime,
2991 table: &str,
2992 logical_id: EntityId,
2993) -> Option<UnifiedEntity> {
2994 let store = runtime.inner.db.store();
2995 if let Some(entity) =
2996 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2997 .resolve_logical_id(&store, table, logical_id)
2998 {
2999 return Some(entity);
3000 }
3001 store.get(table, logical_id)
3004}
3005
3006fn update_cdc_item_kind(
3007 runtime: &RedDBRuntime,
3008 collection: &str,
3009 entity: &UnifiedEntity,
3010) -> &'static str {
3011 match &entity.data {
3012 EntityData::Node(_) => return "node",
3013 EntityData::Edge(_) => return "edge",
3014 _ => {}
3015 }
3016
3017 match runtime
3018 .db()
3019 .collection_contract(collection)
3020 .map(|contract| contract.declared_model)
3021 {
3022 Some(crate::catalog::CollectionModel::Document) => "document",
3023 Some(crate::catalog::CollectionModel::Kv)
3024 | Some(crate::catalog::CollectionModel::Vault) => "kv",
3025 _ => "row",
3026 }
3027}
3028
3029fn ordered_update_target_ids(
3030 manager: &Arc<crate::storage::SegmentManager>,
3031 entity_ids: &[EntityId],
3032 order_by: &[OrderByClause],
3033 limit: Option<usize>,
3034) -> Vec<EntityId> {
3035 let mut entities: Vec<UnifiedEntity> =
3036 manager.get_many(entity_ids).into_iter().flatten().collect();
3037 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3038 if let Some(limit) = limit {
3039 entities.truncate(limit);
3040 }
3041 entities.into_iter().map(|entity| entity.id).collect()
3042}
3043
3044fn compare_update_order(
3045 left: &UnifiedEntity,
3046 right: &UnifiedEntity,
3047 order_by: &[OrderByClause],
3048) -> Ordering {
3049 for clause in order_by {
3050 let left_value = update_order_value(left, &clause.field);
3051 let right_value = update_order_value(right, &clause.field);
3052 let ordering = compare_update_order_values(
3053 left_value.as_ref(),
3054 right_value.as_ref(),
3055 clause.nulls_first,
3056 );
3057 if ordering != Ordering::Equal {
3058 return if clause.ascending {
3059 ordering
3060 } else {
3061 ordering.reverse()
3062 };
3063 }
3064 }
3065 left.logical_id().raw().cmp(&right.logical_id().raw())
3066}
3067
3068fn compare_update_order_values(
3069 left: Option<&Value>,
3070 right: Option<&Value>,
3071 nulls_first: bool,
3072) -> Ordering {
3073 match (left, right) {
3074 (None, None) => Ordering::Equal,
3075 (None, Some(_)) => {
3076 if nulls_first {
3077 Ordering::Less
3078 } else {
3079 Ordering::Greater
3080 }
3081 }
3082 (Some(_), None) => {
3083 if nulls_first {
3084 Ordering::Greater
3085 } else {
3086 Ordering::Less
3087 }
3088 }
3089 (Some(left), Some(right)) => {
3090 crate::storage::query::value_compare::total_compare_values(left, right)
3091 }
3092 }
3093}
3094
3095fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3096 let FieldRef::TableColumn { table, column } = field else {
3097 return None;
3098 };
3099 if !table.is_empty() {
3100 return None;
3101 }
3102 if column.eq_ignore_ascii_case("rid") {
3103 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3104 }
3105 match &entity.data {
3106 EntityData::Row(row) => row.get_field(column).cloned(),
3107 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3108 .and_then(|record| record.get(column).cloned()),
3109 _ => None,
3110 }
3111}
3112
3113fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3114 if columns.is_empty() {
3115 return columns;
3116 }
3117
3118 let mut unique = Vec::with_capacity(columns.len());
3119 for column in columns.drain(..) {
3120 if !unique
3121 .iter()
3122 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3123 {
3124 unique.push(column);
3125 }
3126 }
3127 unique
3128}
3129
3130const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3135
3136fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3137 if column.eq_ignore_ascii_case("_ttl") {
3138 Some(SQL_TTL_METADATA_COLUMNS[0])
3139 } else if column.eq_ignore_ascii_case("_ttl_ms") {
3140 Some(SQL_TTL_METADATA_COLUMNS[1])
3141 } else if column.eq_ignore_ascii_case("_expires_at") {
3142 Some(SQL_TTL_METADATA_COLUMNS[2])
3143 } else {
3144 None
3145 }
3146}
3147
3148fn canonicalize_sql_ttl_metadata(
3153 key: &'static str,
3154 value: MetadataValue,
3155) -> (&'static str, MetadataValue) {
3156 if key != "_ttl" {
3157 return (key, value);
3158 }
3159 let scaled = match value {
3160 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3161 MetadataValue::Timestamp(ms_or_s) => {
3162 MetadataValue::Timestamp(ms_or_s)
3165 }
3166 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3167 other => other,
3168 };
3169 ("_ttl_ms", scaled)
3170}
3171
3172pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3176
3177impl RedDBRuntime {
3178 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3184 match value {
3185 Value::Password(marked) => {
3186 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3187 Ok(Value::Password(crate::auth::store::hash_password(plain)))
3188 } else {
3189 Ok(Value::Password(marked))
3190 }
3191 }
3192 Value::Secret(bytes) => {
3193 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3194 if !self.secret_auto_encrypt() {
3195 return Err(RedDBError::Query(
3196 "SECRET() literal rejected: red.config.secret.auto_encrypt \
3197 is false. Insert pre-encrypted bytes directly instead."
3198 .to_string(),
3199 ));
3200 }
3201 let key = self.secret_aes_key().ok_or_else(|| {
3202 RedDBError::Query(
3203 "SECRET() column encryption requires a bootstrapped \
3204 vault (red.secret.aes_key is missing). Start the server \
3205 with --vault to enable."
3206 .to_string(),
3207 )
3208 })?;
3209 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3210 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3211 } else {
3212 Ok(Value::Secret(bytes))
3213 }
3214 }
3215 other => Ok(other),
3216 }
3217 }
3218}
3219
3220fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3223 let nonce_bytes = crate::auth::store::random_bytes(12);
3224 let mut nonce = [0u8; 12];
3225 nonce.copy_from_slice(&nonce_bytes[..12]);
3226 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3227 let mut out = Vec::with_capacity(12 + ct.len());
3228 out.extend_from_slice(&nonce);
3229 out.extend_from_slice(&ct);
3230 out
3231}
3232
3233pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3237 if payload.len() < 12 {
3238 return None;
3239 }
3240 let mut nonce = [0u8; 12];
3241 nonce.copy_from_slice(&payload[..12]);
3242 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3243}
3244
3245fn split_insert_metadata(
3246 runtime: &RedDBRuntime,
3247 columns: &[String],
3248 values: &[Value],
3249) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3250 let mut fields = Vec::new();
3251 let mut metadata = Vec::new();
3252
3253 for (column, value) in columns.iter().zip(values.iter()) {
3254 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3256 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3257 let (canonical_key, canonical_value) =
3258 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3259 metadata.push((canonical_key.to_string(), canonical_value));
3260 continue;
3261 }
3262 fields.push((
3263 column.clone(),
3264 runtime.resolve_crypto_sentinel(value.clone())?,
3265 ));
3266 }
3267
3268 Ok((fields, metadata))
3269}
3270
3271fn merge_with_clauses(
3273 metadata: &mut Vec<(String, MetadataValue)>,
3274 ttl_ms: Option<u64>,
3275 expires_at_ms: Option<u64>,
3276 with_metadata: &[(String, Value)],
3277) {
3278 if let Some(ms) = ttl_ms {
3279 metadata.push((
3280 "_ttl_ms".to_string(),
3281 if ms <= i64::MAX as u64 {
3282 MetadataValue::Int(ms as i64)
3283 } else {
3284 MetadataValue::Timestamp(ms)
3285 },
3286 ));
3287 }
3288 if let Some(ms) = expires_at_ms {
3289 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3290 }
3291 for (key, value) in with_metadata {
3292 let meta_value = match value {
3293 Value::Text(s) => MetadataValue::String(s.to_string()),
3294 Value::Integer(n) => MetadataValue::Int(*n),
3295 Value::Float(n) => MetadataValue::Float(*n),
3296 Value::Boolean(b) => MetadataValue::Bool(*b),
3297 _ => MetadataValue::String(value.to_string()),
3298 };
3299 metadata.push((key.clone(), meta_value));
3300 }
3301}
3302
3303fn merge_vector_metadata_column(
3304 metadata: &mut Vec<(String, MetadataValue)>,
3305 columns: &[String],
3306 values: &[Value],
3307) -> RedDBResult<()> {
3308 let Some(value) = columns
3309 .iter()
3310 .position(|column| column.eq_ignore_ascii_case("metadata"))
3311 .map(|index| &values[index])
3312 else {
3313 return Ok(());
3314 };
3315 let json = match value {
3316 Value::Null => return Ok(()),
3317 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3318 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3319 })?,
3320 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3321 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3322 })?,
3323 other => {
3324 return Err(RedDBError::Query(format!(
3325 "column 'metadata' expected JSON object, got {other:?}"
3326 )))
3327 }
3328 };
3329 let parsed = metadata_from_json(&json)?;
3330 for (key, value) in parsed.iter() {
3331 metadata.push((key.clone(), value.clone()));
3332 }
3333 Ok(())
3334}
3335
3336fn apply_collection_default_ttl_metadata(
3337 runtime: &RedDBRuntime,
3338 collection: &str,
3339 metadata: &mut Vec<(String, MetadataValue)>,
3340) {
3341 if has_internal_ttl_metadata(metadata) {
3342 return;
3343 }
3344
3345 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3346 return;
3347 };
3348
3349 metadata.push((
3350 "_ttl_ms".to_string(),
3351 if default_ttl_ms <= i64::MAX as u64 {
3352 MetadataValue::Int(default_ttl_ms as i64)
3353 } else {
3354 MetadataValue::Timestamp(default_ttl_ms)
3355 },
3356 ));
3357}
3358
3359fn ensure_non_tree_reserved_metadata_entries(
3360 metadata: &[(String, MetadataValue)],
3361) -> RedDBResult<()> {
3362 for (key, _) in metadata {
3363 ensure_non_tree_reserved_metadata_key(key)?;
3364 }
3365 Ok(())
3366}
3367
3368fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3369 if key.starts_with(TREE_METADATA_PREFIX) {
3370 return Err(RedDBError::Query(format!(
3371 "metadata key '{}' is reserved for managed trees",
3372 key
3373 )));
3374 }
3375 Ok(())
3376}
3377
3378fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3379 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3380 return Err(RedDBError::Query(format!(
3381 "edge label '{}' is reserved for managed trees",
3382 TREE_CHILD_EDGE_LABEL
3383 )));
3384 }
3385 Ok(())
3386}
3387
3388fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3389 let mut columns = Vec::with_capacity(pairs.len());
3390 let mut values = Vec::with_capacity(pairs.len());
3391
3392 for (column, value) in pairs {
3393 columns.push(column.clone());
3394 values.push(value.clone());
3395 }
3396
3397 (columns, values)
3398}
3399
3400fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3402 for (i, col) in columns.iter().enumerate() {
3403 if col.eq_ignore_ascii_case(name) {
3404 return Ok(values[i].clone());
3405 }
3406 }
3407 Err(RedDBError::Query(format!(
3408 "required column '{name}' not found in INSERT"
3409 )))
3410}
3411
3412fn find_column_value_string(
3414 columns: &[String],
3415 values: &[Value],
3416 name: &str,
3417) -> RedDBResult<String> {
3418 let val = find_column_value(columns, values, name)?;
3419 match val {
3420 Value::Text(s) => Ok(s.to_string()),
3421 Value::Integer(n) => Ok(n.to_string()),
3422 Value::Float(n) => Ok(n.to_string()),
3423 other => Err(RedDBError::Query(format!(
3424 "column '{name}' expected text, got {other:?}"
3425 ))),
3426 }
3427}
3428
3429fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3430 let val = find_column_value(columns, values, name)?;
3431 match val {
3432 Value::Float(n) => Ok(n),
3433 Value::Integer(n) => Ok(n as f64),
3434 Value::UnsignedInteger(n) => Ok(n as f64),
3435 Value::Text(s) => s
3436 .parse::<f64>()
3437 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3438 other => Err(RedDBError::Query(format!(
3439 "column '{name}' expected number, got {other:?}"
3440 ))),
3441 }
3442}
3443
3444fn find_column_value_opt_string(
3446 columns: &[String],
3447 values: &[Value],
3448 name: &str,
3449) -> Option<String> {
3450 for (i, col) in columns.iter().enumerate() {
3451 if col.eq_ignore_ascii_case(name) {
3452 return match &values[i] {
3453 Value::Null => None,
3454 Value::Text(s) => Some(s.to_string()),
3455 Value::Integer(n) => Some(n.to_string()),
3456 Value::Float(n) => Some(n.to_string()),
3457 _ => None,
3458 };
3459 }
3460 }
3461 None
3462}
3463
3464fn resolve_edge_endpoint(
3471 store: &crate::storage::unified::UnifiedStore,
3472 collection: &str,
3473 columns: &[String],
3474 values: &[Value],
3475 name: &str,
3476) -> RedDBResult<u64> {
3477 let val = find_column_value(columns, values, name)?;
3478 match val {
3479 Value::Integer(n) => Ok(n as u64),
3480 Value::UnsignedInteger(n) => Ok(n),
3481 Value::Text(s) => {
3482 if let Ok(n) = s.parse::<u64>() {
3483 return Ok(n);
3484 }
3485 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3486 match matches.len() {
3487 0 => Err(RedDBError::Query(format!(
3488 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3489 ))),
3490 1 => Ok(matches[0].raw()),
3491 n => Err(RedDBError::Query(format!(
3492 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3493 ))),
3494 }
3495 }
3496 other => Err(RedDBError::Query(format!(
3497 "column '{name}' expected integer or node label, got {other:?}"
3498 ))),
3499 }
3500}
3501
3502fn resolve_edge_endpoint_any(
3503 store: &crate::storage::unified::UnifiedStore,
3504 collection: &str,
3505 columns: &[String],
3506 values: &[Value],
3507 names: &[&str],
3508) -> RedDBResult<u64> {
3509 for name in names {
3510 if columns
3511 .iter()
3512 .any(|column| column.eq_ignore_ascii_case(name))
3513 {
3514 return resolve_edge_endpoint(store, collection, columns, values, name);
3515 }
3516 }
3517
3518 Err(RedDBError::Query(format!(
3519 "required column '{}' not found in INSERT",
3520 names.first().copied().unwrap_or("from_rid")
3521 )))
3522}
3523
3524fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3526 let val = find_column_value(columns, values, name)?;
3527 match val {
3528 Value::Integer(n) => Ok(n as u64),
3529 Value::UnsignedInteger(n) => Ok(n),
3530 Value::Text(s) => s
3531 .parse::<u64>()
3532 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3533 other => Err(RedDBError::Query(format!(
3534 "column '{name}' expected integer, got {other:?}"
3535 ))),
3536 }
3537}
3538
3539fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3541 for (i, col) in columns.iter().enumerate() {
3542 if col.eq_ignore_ascii_case(name) {
3543 return match &values[i] {
3544 Value::Float(n) => Some(*n as f32),
3545 Value::Integer(n) => Some(*n as f32),
3546 Value::Null => None,
3547 _ => None,
3548 };
3549 }
3550 }
3551 None
3552}
3553
3554fn find_column_value_vec_f32(
3556 columns: &[String],
3557 values: &[Value],
3558 name: &str,
3559) -> RedDBResult<Vec<f32>> {
3560 let val = find_column_value(columns, values, name)?;
3561 match val {
3562 Value::Vector(v) => Ok(v),
3563 Value::Json(bytes) => {
3564 let s = std::str::from_utf8(&bytes).map_err(|_| {
3566 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3567 })?;
3568 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3569 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3570 })?;
3571 Ok(arr)
3572 }
3573 other => Err(RedDBError::Query(format!(
3574 "column '{name}' expected vector, got {other:?}"
3575 ))),
3576 }
3577}
3578
3579fn find_column_value_vec_f32_any(
3580 columns: &[String],
3581 values: &[Value],
3582 names: &[&str],
3583) -> RedDBResult<Vec<f32>> {
3584 for name in names {
3585 if columns
3586 .iter()
3587 .any(|column| column.eq_ignore_ascii_case(name))
3588 {
3589 return find_column_value_vec_f32(columns, values, name);
3590 }
3591 }
3592 Err(RedDBError::Query(format!(
3593 "required vector column '{}' not found in INSERT",
3594 names.join("' or '")
3595 )))
3596}
3597
3598fn extract_remaining_properties(
3600 columns: &[String],
3601 values: &[Value],
3602 exclude: &[&str],
3603) -> Vec<(String, Value)> {
3604 columns
3605 .iter()
3606 .zip(values.iter())
3607 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3608 .map(|(col, val)| (col.clone(), val.clone()))
3609 .collect()
3610}
3611
3612fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3613 let mut invalid = Vec::new();
3614 for column in columns {
3615 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3616 invalid.push(column.clone());
3617 }
3618 }
3619
3620 if invalid.is_empty() {
3621 Ok(())
3622 } else {
3623 Err(RedDBError::Query(format!(
3624 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3625 invalid.join(", ")
3626 )))
3627 }
3628}
3629
3630fn is_timeseries_insert_column(column: &str) -> bool {
3631 matches!(
3632 column.to_ascii_lowercase().as_str(),
3633 "metric"
3634 | "value"
3635 | "tags"
3636 | "timestamp"
3637 | "timestamp_ns"
3638 | "time"
3639 | "event_name"
3644 | "payload"
3645 )
3646}
3647
3648fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3649 let mut found = None;
3650
3651 for alias in ["timestamp_ns", "timestamp", "time"] {
3652 for (index, column) in columns.iter().enumerate() {
3653 if !column.eq_ignore_ascii_case(alias) {
3654 continue;
3655 }
3656
3657 if found.is_some() {
3658 return Err(RedDBError::Query(
3659 "timeseries INSERT accepts only one timestamp column".to_string(),
3660 ));
3661 }
3662
3663 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3664 }
3665 }
3666
3667 Ok(found)
3668}
3669
3670fn find_timeseries_tags(
3671 columns: &[String],
3672 values: &[Value],
3673) -> RedDBResult<std::collections::HashMap<String, String>> {
3674 for (index, column) in columns.iter().enumerate() {
3675 if column.eq_ignore_ascii_case("tags") {
3676 return parse_timeseries_tags(&values[index]);
3677 }
3678 }
3679 Ok(std::collections::HashMap::new())
3680}
3681
3682fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3683 match value {
3684 Value::Null => Ok(std::collections::HashMap::new()),
3685 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3686 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3687 other => Err(RedDBError::Query(format!(
3688 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3689 ))),
3690 }
3691}
3692
3693fn parse_timeseries_tags_json(
3694 bytes: &[u8],
3695) -> RedDBResult<std::collections::HashMap<String, String>> {
3696 let json: crate::json::Value = crate::json::from_slice(bytes)
3697 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3698
3699 let object = match json {
3700 crate::json::Value::Object(object) => object,
3701 other => {
3702 return Err(RedDBError::Query(format!(
3703 "timeseries tags must be a JSON object, got {other:?}"
3704 )))
3705 }
3706 };
3707
3708 let mut tags = std::collections::HashMap::with_capacity(object.len());
3709 for (key, value) in object {
3710 tags.insert(key, json_tag_value_to_string(&value));
3711 }
3712 Ok(tags)
3713}
3714
3715fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3731 let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3732 buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3733 buf.push_str(&value.to_string_compact());
3734 buf
3735}
3736
3737fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3738 match value {
3739 Value::UnsignedInteger(value) => Ok(*value),
3740 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3741 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3742 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3743 RedDBError::Query(format!(
3744 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3745 ))
3746 }),
3747 other => Err(RedDBError::Query(format!(
3748 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3749 ))),
3750 }
3751}
3752
3753fn current_unix_ns() -> u64 {
3754 std::time::SystemTime::now()
3755 .duration_since(std::time::UNIX_EPOCH)
3756 .unwrap_or_default()
3757 .as_nanos()
3758 .min(u128::from(u64::MAX)) as u64
3759}
3760
3761fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3762 use crate::json::{Map, Value as JV};
3763 match value {
3764 MetadataValue::Null => JV::Null,
3765 MetadataValue::Bool(value) => JV::Bool(*value),
3766 MetadataValue::Int(value) => JV::Number(*value as f64),
3767 MetadataValue::Float(value) => JV::Number(*value),
3768 MetadataValue::String(value) => JV::String(value.clone()),
3769 MetadataValue::Bytes(value) => JV::Array(
3770 value
3771 .iter()
3772 .map(|value| JV::Number(*value as f64))
3773 .collect(),
3774 ),
3775 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3776 MetadataValue::Array(values) => {
3777 JV::Array(values.iter().map(metadata_value_to_json).collect())
3778 }
3779 MetadataValue::Object(object) => {
3780 let entries = object
3781 .iter()
3782 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3783 .collect();
3784 JV::Object(entries)
3785 }
3786 MetadataValue::Geo { lat, lon } => {
3787 let mut object = Map::new();
3788 object.insert("lat".to_string(), JV::Number(*lat));
3789 object.insert("lon".to_string(), JV::Number(*lon));
3790 JV::Object(object)
3791 }
3792 MetadataValue::Reference(target) => {
3793 let mut object = Map::new();
3794 object.insert(
3795 "collection".to_string(),
3796 JV::String(target.collection().to_string()),
3797 );
3798 object.insert(
3799 "entity_id".to_string(),
3800 JV::Number(target.entity_id().raw() as f64),
3801 );
3802 JV::Object(object)
3803 }
3804 MetadataValue::References(values) => {
3805 let refs = values
3806 .iter()
3807 .map(|target| {
3808 let mut object = Map::new();
3809 object.insert(
3810 "collection".to_string(),
3811 JV::String(target.collection().to_string()),
3812 );
3813 object.insert(
3814 "entity_id".to_string(),
3815 JV::Number(target.entity_id().raw() as f64),
3816 );
3817 JV::Object(object)
3818 })
3819 .collect();
3820 JV::Array(refs)
3821 }
3822 }
3823}
3824
3825fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3826 match value {
3827 Value::Null => MetadataValue::Null,
3828 Value::Boolean(value) => MetadataValue::Bool(*value),
3829 Value::Integer(value) => MetadataValue::Int(*value),
3830 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3831 Value::Float(value) => MetadataValue::Float(*value),
3832 Value::Text(value) => MetadataValue::String(value.to_string()),
3833 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3834 Value::Timestamp(value) => {
3835 if *value >= 0 {
3836 metadata_u64_to_value(*value as u64)
3837 } else {
3838 MetadataValue::Int(*value)
3839 }
3840 }
3841 Value::TimestampMs(value) => {
3842 if *value >= 0 {
3843 metadata_u64_to_value(*value as u64)
3844 } else {
3845 MetadataValue::Int(*value)
3846 }
3847 }
3848 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3849 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3850 Value::Date(value) => MetadataValue::String(value.to_string()),
3851 Value::Time(value) => MetadataValue::String(value.to_string()),
3852 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3853 Value::Ipv4(value) => MetadataValue::String(format!(
3854 "{}.{}.{}.{}",
3855 (value >> 24) & 0xFF,
3856 (value >> 16) & 0xFF,
3857 (value >> 8) & 0xFF,
3858 value & 0xFF
3859 )),
3860 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3861 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3862 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3863 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3864 lat: *lat as f64 / 1_000_000.0,
3865 lon: *lon as f64 / 1_000_000.0,
3866 },
3867 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3868 Value::TableRef(value) => MetadataValue::String(value.clone()),
3869 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3870 Value::Password(value) => MetadataValue::String(value.clone()),
3871 Value::Array(values) => {
3872 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3873 }
3874 _ => MetadataValue::String(value.to_string()),
3875 }
3876}
3877
3878fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3879 match value {
3880 Value::Null => Ok(MetadataValue::Null),
3881 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3882 Value::Integer(_) => Err(RedDBError::Query(format!(
3883 "column '{field}' must be non-negative for TTL metadata"
3884 ))),
3885 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3886 Value::Float(value) if value.is_finite() => {
3887 if value.fract().abs() >= f64::EPSILON {
3888 return Err(RedDBError::Query(format!(
3889 "column '{field}' must be an integer (TTL metadata must be an integer)"
3890 )));
3891 }
3892 if *value < 0.0 {
3893 return Err(RedDBError::Query(format!(
3894 "column '{field}' must be non-negative for TTL metadata"
3895 )));
3896 }
3897 if *value > u64::MAX as f64 {
3898 return Err(RedDBError::Query(format!(
3899 "column '{field}' value is too large"
3900 )));
3901 }
3902 Ok(metadata_u64_to_value(*value as u64))
3903 }
3904 Value::Float(_) => Err(RedDBError::Query(format!(
3905 "column '{field}' must be a finite number"
3906 ))),
3907 Value::Text(value) => {
3908 let value = value.trim();
3909 if let Ok(value) = value.parse::<u64>() {
3910 Ok(metadata_u64_to_value(value))
3911 } else if let Ok(value) = value.parse::<i64>() {
3912 if value < 0 {
3913 return Err(RedDBError::Query(format!(
3914 "column '{field}' must be non-negative for TTL metadata"
3915 )));
3916 }
3917 Ok(metadata_u64_to_value(value as u64))
3918 } else if let Ok(value) = value.parse::<f64>() {
3919 if !value.is_finite() {
3920 return Err(RedDBError::Query(format!(
3921 "column '{field}' must be a finite number"
3922 )));
3923 }
3924 if value.fract().abs() >= f64::EPSILON {
3925 return Err(RedDBError::Query(format!(
3926 "column '{field}' must be an integer (TTL metadata must be an integer)"
3927 )));
3928 }
3929 if value < 0.0 {
3930 return Err(RedDBError::Query(format!(
3931 "column '{field}' must be non-negative for TTL metadata"
3932 )));
3933 }
3934 if value > u64::MAX as f64 {
3935 return Err(RedDBError::Query(format!(
3936 "column '{field}' value is too large"
3937 )));
3938 }
3939 Ok(metadata_u64_to_value(value as u64))
3940 } else {
3941 Err(RedDBError::Query(format!(
3942 "column '{field}' expects a numeric value for TTL metadata"
3943 )))
3944 }
3945 }
3946 _ => Err(RedDBError::Query(format!(
3947 "column '{field}' expects a numeric value for TTL metadata"
3948 ))),
3949 }
3950}
3951
3952fn metadata_u64_to_value(value: u64) -> MetadataValue {
3953 if value <= i64::MAX as u64 {
3954 MetadataValue::Int(value as i64)
3955 } else {
3956 MetadataValue::Timestamp(value)
3957 }
3958}
3959
3960fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3966 let json = match value {
3967 Value::Null => return false,
3968 Value::Json(bytes) | Value::Blob(bytes) => {
3969 match crate::json::from_slice::<crate::json::Value>(bytes) {
3970 Ok(v) => v,
3971 Err(_) => return false,
3972 }
3973 }
3974 Value::Text(s) => {
3975 let trimmed = s.trim_start();
3976 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3977 return false;
3978 }
3979 match crate::json::from_str::<crate::json::Value>(s) {
3980 Ok(v) => v,
3981 Err(_) => return false,
3982 }
3983 }
3984 _ => return false,
3985 };
3986 let mut cursor = &json;
3987 for seg in tail.split('.') {
3988 match cursor {
3989 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3990 Some((_, v)) => cursor = v,
3991 None => return false,
3992 },
3993 _ => return false,
3994 }
3995 }
3996 !matches!(cursor, crate::json::Value::Null)
3997}
3998
3999fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4010 let mut root = match current {
4011 Value::Null => crate::json::Value::Object(Default::default()),
4012 Value::Json(bytes) | Value::Blob(bytes) => {
4013 crate::json::from_slice(&bytes).map_err(|err| {
4014 RedDBError::Query(format!(
4015 "tenant auto-fill: root column is not valid JSON ({err})"
4016 ))
4017 })?
4018 }
4019 Value::Text(s) => {
4020 if s.trim().is_empty() {
4021 crate::json::Value::Object(Default::default())
4022 } else {
4023 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4024 RedDBError::Query(format!(
4025 "tenant auto-fill: text root is not valid JSON ({err})"
4026 ))
4027 })?
4028 }
4029 }
4030 other => {
4031 return Err(RedDBError::Query(format!(
4032 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4033 )));
4034 }
4035 };
4036
4037 let segments: Vec<&str> = tail.split('.').collect();
4039 let mut cursor: &mut crate::json::Value = &mut root;
4040 for (i, seg) in segments.iter().enumerate() {
4041 let is_last = i + 1 == segments.len();
4042 let map = match cursor {
4043 crate::json::Value::Object(m) => m,
4044 _ => {
4045 return Err(RedDBError::Query(format!(
4046 "tenant auto-fill: segment '{seg}' is not inside an object"
4047 )));
4048 }
4049 };
4050 if is_last {
4051 map.insert(
4052 seg.to_string(),
4053 crate::json::Value::String(tenant_id.to_string()),
4054 );
4055 break;
4056 }
4057 cursor = map
4058 .entry(seg.to_string())
4059 .or_insert_with(|| crate::json::Value::Object(Default::default()));
4060 }
4061
4062 let bytes = crate::json::to_vec(&root).map_err(|err| {
4063 RedDBError::Query(format!(
4064 "tenant auto-fill: failed to re-serialize JSON ({err})"
4065 ))
4066 })?;
4067 Ok(Value::Json(bytes))
4068}
4069
4070#[cfg(test)]
4071mod tests {
4072 use crate::storage::schema::Value;
4073 use crate::storage::wal::{WalReader, WalRecord};
4074 use crate::{RedDBOptions, RedDBRuntime};
4075 use std::path::Path;
4076
4077 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4078 WalReader::open(wal_path)
4079 .expect("wal opens")
4080 .iter()
4081 .map(|record| record.expect("wal record decodes").1)
4082 .filter_map(|record| match record {
4083 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4084 _ => None,
4085 })
4086 .collect()
4087 }
4088
4089 fn action_contains_text(action: &[u8], needle: &str) -> bool {
4090 action
4091 .windows(needle.len())
4092 .any(|window| window == needle.as_bytes())
4093 }
4094
4095 fn assert_statement_writes_collections_in_one_new_wal_batch(
4096 rt: &RedDBRuntime,
4097 wal_path: &Path,
4098 statement: &str,
4099 source: &str,
4100 event_queue: &str,
4101 ) {
4102 let before_batches = store_commit_batches(wal_path).len();
4103
4104 rt.execute_query(statement).unwrap();
4105
4106 let batches = store_commit_batches(wal_path);
4107 let statement_batches = &batches[before_batches..];
4108 let source_batch = statement_batches
4109 .iter()
4110 .position(|actions| {
4111 actions.iter().any(|action| {
4112 action_contains_text(action, source)
4113 && !action_contains_text(action, event_queue)
4114 })
4115 })
4116 .expect("source collection write batch is present");
4117 let event_batch = statement_batches
4118 .iter()
4119 .position(|actions| {
4120 actions
4121 .iter()
4122 .any(|action| action_contains_text(action, event_queue))
4123 })
4124 .expect("event queue write batch is present");
4125
4126 assert_eq!(
4127 source_batch, event_batch,
4128 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4129 );
4130 }
4131
4132 #[test]
4133 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4134 let dir = tempfile::tempdir().unwrap();
4135 let db_path = dir.path().join("events_dual_write.rdb");
4136 let wal_path = db_path.with_extension("rdb-uwal");
4137 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4138
4139 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4140 .unwrap();
4141 assert_statement_writes_collections_in_one_new_wal_batch(
4142 &rt,
4143 &wal_path,
4144 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4145 "users",
4146 "users_events",
4147 );
4148 }
4149
4150 #[test]
4151 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4152 let dir = tempfile::tempdir().unwrap();
4153 let db_path = dir.path().join("events_update_atomic.rdb");
4154 let wal_path = db_path.with_extension("rdb-uwal");
4155 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4156
4157 rt.execute_query(
4158 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4159 )
4160 .unwrap();
4161 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4162 .unwrap();
4163
4164 assert_statement_writes_collections_in_one_new_wal_batch(
4165 &rt,
4166 &wal_path,
4167 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4168 "users",
4169 "user_updates",
4170 );
4171 }
4172
4173 #[test]
4174 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4175 let dir = tempfile::tempdir().unwrap();
4176 let db_path = dir.path().join("events_delete_atomic.rdb");
4177 let wal_path = db_path.with_extension("rdb-uwal");
4178 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4179
4180 rt.execute_query(
4181 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4182 )
4183 .unwrap();
4184 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4185 .unwrap();
4186
4187 assert_statement_writes_collections_in_one_new_wal_batch(
4188 &rt,
4189 &wal_path,
4190 "DELETE FROM users WHERE id = 1",
4191 "users",
4192 "user_deletes",
4193 );
4194 }
4195
4196 #[test]
4197 fn update_where_id_in_with_hash_index_updates_expected_rows() {
4198 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4199 rt.execute_query("CREATE TABLE users (id INT, score INT)")
4200 .unwrap();
4201 for id in 0..5 {
4202 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4203 .unwrap();
4204 }
4205 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4206 .unwrap();
4207
4208 let updated = rt
4209 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4210 .unwrap();
4211 assert_eq!(updated.affected_rows, 3);
4212
4213 let selected = rt
4214 .execute_query("SELECT id, score FROM users ORDER BY id")
4215 .unwrap();
4216 let scores: Vec<(i64, i64)> = selected
4217 .result
4218 .records
4219 .iter()
4220 .map(|record| {
4221 let id = match record.get("id").unwrap() {
4222 Value::Integer(value) => *value,
4223 other => panic!("expected integer id, got {other:?}"),
4224 };
4225 let score = match record.get("score").unwrap() {
4226 Value::Integer(value) => *value,
4227 other => panic!("expected integer score, got {other:?}"),
4228 };
4229 (id, score)
4230 })
4231 .collect();
4232 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4233 }
4234
4235 #[test]
4242 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4243 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4244 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4245 .unwrap();
4246 for id in 0..5 {
4247 rt.execute_query(&format!(
4248 "INSERT INTO items (id, score) VALUES ({id}, {})",
4249 id * 10
4250 ))
4251 .unwrap();
4252 }
4253 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4254 .unwrap();
4255
4256 let updated_one = rt
4260 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4261 .unwrap();
4262 assert_eq!(updated_one.affected_rows, 1);
4263
4264 let updated_many = rt
4268 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4269 .unwrap();
4270 assert_eq!(updated_many.affected_rows, 2);
4271
4272 let snapshot = rt
4273 .execute_query("SELECT id, score FROM items ORDER BY id")
4274 .unwrap();
4275 let pairs: Vec<(i64, i64)> = snapshot
4276 .result
4277 .records
4278 .iter()
4279 .map(|record| {
4280 let id = match record.get("id").unwrap() {
4281 Value::Integer(value) => *value,
4282 other => panic!("expected integer id, got {other:?}"),
4283 };
4284 let score = match record.get("score").unwrap() {
4285 Value::Integer(value) => *value,
4286 other => panic!("expected integer score, got {other:?}"),
4287 };
4288 (id, score)
4289 })
4290 .collect();
4291 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4292
4293 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4295 assert_eq!(updated_all.affected_rows, 5);
4296 let after = rt
4297 .execute_query("SELECT score FROM items ORDER BY id")
4298 .unwrap();
4299 let scores: Vec<i64> = after
4300 .result
4301 .records
4302 .iter()
4303 .map(|record| match record.get("score").unwrap() {
4304 Value::Integer(value) => *value,
4305 other => panic!("expected integer score, got {other:?}"),
4306 })
4307 .collect();
4308 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4309 }
4310
4311 #[test]
4317 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4318 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4319 rt.execute_query("CREATE TABLE items (id INT, score INT)")
4320 .unwrap();
4321 for id in 0..5 {
4322 rt.execute_query(&format!(
4323 "INSERT INTO items (id, score) VALUES ({id}, {})",
4324 id * 10
4325 ))
4326 .unwrap();
4327 }
4328 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4329 .unwrap();
4330
4331 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4334 assert_eq!(deleted_one.affected_rows, 1);
4335
4336 let deleted_many = rt
4340 .execute_query("DELETE FROM items WHERE score > 25")
4341 .unwrap();
4342 assert_eq!(deleted_many.affected_rows, 2);
4343
4344 let surviving = rt
4345 .execute_query("SELECT id FROM items ORDER BY id")
4346 .unwrap();
4347 let ids: Vec<i64> = surviving
4348 .result
4349 .records
4350 .iter()
4351 .map(|record| match record.get("id").unwrap() {
4352 Value::Integer(value) => *value,
4353 other => panic!("expected integer id, got {other:?}"),
4354 })
4355 .collect();
4356 assert_eq!(ids, vec![0, 1]);
4357
4358 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4360 assert_eq!(deleted_rest.affected_rows, 2);
4361 let empty = rt.execute_query("SELECT id FROM items").unwrap();
4362 assert!(empty.result.records.is_empty());
4363 }
4364
4365 #[test]
4370 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4371 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4372 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4373 .unwrap();
4374
4375 let inserted = rt
4378 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4379 .unwrap();
4380 assert_eq!(inserted.affected_rows, 1);
4381
4382 let update_err = rt
4384 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4385 .unwrap_err();
4386 let msg = format!("{update_err}");
4387 assert!(
4388 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4389 "expected UPDATE rejection message, got: {msg}"
4390 );
4391
4392 let delete_err = rt
4394 .execute_query("DELETE FROM events WHERE id = 1")
4395 .unwrap_err();
4396 let msg = format!("{delete_err}");
4397 assert!(
4398 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4399 "expected DELETE rejection message, got: {msg}"
4400 );
4401
4402 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4405 assert_eq!(surviving.result.records.len(), 1);
4406 }
4407
4408 #[test]
4412 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4413 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4414 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4415 .unwrap();
4416
4417 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4418 .unwrap();
4419 let updated = rt
4420 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4421 .unwrap();
4422 assert_eq!(updated.affected_rows, 1);
4423 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4424 assert_eq!(deleted.affected_rows, 1);
4425 }
4426
4427 #[test]
4428 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4429 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4430 rt.execute_query(
4431 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4432 )
4433 .unwrap();
4434
4435 let inserted = rt
4436 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4437 .unwrap();
4438 assert_eq!(inserted.affected_rows, 1);
4439
4440 let events = queue_payloads(&rt, "audit_log");
4441 assert_eq!(events.len(), 1);
4442 let event = events[0].as_object().expect("event payload object");
4443 assert!(event
4444 .get("event_id")
4445 .and_then(crate::json::Value::as_str)
4446 .is_some_and(|value| !value.is_empty()));
4447 assert_eq!(
4448 event.get("op").and_then(crate::json::Value::as_str),
4449 Some("insert")
4450 );
4451 assert_eq!(
4452 event.get("collection").and_then(crate::json::Value::as_str),
4453 Some("users")
4454 );
4455 assert_eq!(
4456 event.get("id").and_then(crate::json::Value::as_u64),
4457 Some(7)
4458 );
4459 assert!(event
4460 .get("ts")
4461 .and_then(crate::json::Value::as_u64)
4462 .is_some());
4463 assert!(event
4464 .get("lsn")
4465 .and_then(crate::json::Value::as_u64)
4466 .is_some());
4467 assert!(matches!(
4468 event.get("tenant"),
4469 Some(crate::json::Value::Null)
4470 ));
4471 assert!(matches!(
4472 event.get("before"),
4473 Some(crate::json::Value::Null)
4474 ));
4475 let after = event
4476 .get("after")
4477 .and_then(crate::json::Value::as_object)
4478 .expect("after object");
4479 assert_eq!(
4480 after.get("id").and_then(crate::json::Value::as_u64),
4481 Some(7)
4482 );
4483 assert_eq!(
4484 after.get("email").and_then(crate::json::Value::as_str),
4485 Some("a@example.com")
4486 );
4487 }
4488
4489 #[test]
4490 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4491 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4492 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4493 .unwrap();
4494
4495 rt.execute_query(
4496 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4497 )
4498 .unwrap();
4499
4500 let events = queue_payloads(&rt, "users_events");
4501 assert_eq!(events.len(), 2);
4502 let mut previous_lsn = 0;
4503 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4504 let object = event.as_object().expect("event payload object");
4505 assert_eq!(
4506 object.get("op").and_then(crate::json::Value::as_str),
4507 Some("insert")
4508 );
4509 assert_eq!(
4510 object.get("id").and_then(crate::json::Value::as_u64),
4511 Some(expected_id)
4512 );
4513 let lsn = object
4514 .get("lsn")
4515 .and_then(crate::json::Value::as_u64)
4516 .expect("event lsn");
4517 assert!(
4518 lsn > previous_lsn,
4519 "event LSNs should increase in row order"
4520 );
4521 previous_lsn = lsn;
4522 let after = object
4523 .get("after")
4524 .and_then(crate::json::Value::as_object)
4525 .expect("after object");
4526 assert_eq!(
4527 after.get("id").and_then(crate::json::Value::as_u64),
4528 Some(expected_id)
4529 );
4530 }
4531 }
4532
4533 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4534 let result = rt
4535 .execute_query(&format!("QUEUE PEEK {queue} 10"))
4536 .expect("peek queue");
4537 result
4538 .result
4539 .records
4540 .iter()
4541 .map(
4542 |record| match record.get("payload").expect("payload column") {
4543 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4544 other => panic!("expected JSON queue payload, got {other:?}"),
4545 },
4546 )
4547 .collect()
4548 }
4549
4550 #[test]
4562 fn auto_index_id_fires_on_first_insert() {
4563 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4564 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4565 .unwrap();
4566
4567 assert!(
4569 rt.index_store_ref()
4570 .find_index_for_column("bench_users", "id")
4571 .is_none(),
4572 "freshly created collection should not have an `id` index"
4573 );
4574
4575 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4577 .unwrap();
4578
4579 let registered = rt
4581 .index_store_ref()
4582 .find_index_for_column("bench_users", "id")
4583 .expect("auto-index hook should have registered idx_id on first insert");
4584 assert_eq!(registered.name, "idx_id");
4585 assert_eq!(registered.collection, "bench_users");
4586 assert_eq!(registered.columns, vec!["id".to_string()]);
4587 assert!(matches!(
4588 registered.method,
4589 super::super::index_store::IndexMethodKind::Hash
4590 ));
4591
4592 for id in 2..=5 {
4595 rt.execute_query(&format!(
4596 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4597 id * 10
4598 ))
4599 .unwrap();
4600 }
4601 for id in 1..=5 {
4602 let result = rt
4603 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4604 .unwrap();
4605 assert_eq!(
4606 result.result.records.len(),
4607 1,
4608 "id={id} should match one row"
4609 );
4610 }
4611
4612 let deleted = rt
4617 .execute_query("DELETE FROM bench_users WHERE id = 3")
4618 .unwrap();
4619 assert_eq!(deleted.affected_rows, 1);
4620 }
4621
4622 #[test]
4627 fn auto_index_id_fires_on_first_bulk_insert() {
4628 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4629 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4630 .unwrap();
4631
4632 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4633 .unwrap();
4634
4635 let registered = rt
4636 .index_store_ref()
4637 .find_index_for_column("bench_bulk", "id")
4638 .expect("auto-index hook should fire on first bulk insert");
4639 assert_eq!(registered.name, "idx_id");
4640
4641 for id in 1..=3 {
4643 let result = rt
4644 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4645 .unwrap();
4646 assert_eq!(result.result.records.len(), 1);
4647 }
4648 }
4649
4650 #[test]
4654 fn auto_index_id_skips_when_no_id_column() {
4655 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4656 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4657 .unwrap();
4658 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4659 .unwrap();
4660
4661 assert!(rt
4662 .index_store_ref()
4663 .find_index_for_column("plain", "id")
4664 .is_none());
4665 assert!(rt
4666 .index_store_ref()
4667 .find_index_for_column("plain", "uid")
4668 .is_none());
4669 }
4670
4671 #[test]
4676 fn auto_index_id_skips_when_index_already_exists() {
4677 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4678 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4679 .unwrap();
4680 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4682 .unwrap();
4683 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4684 .unwrap();
4685
4686 let registered = rt
4687 .index_store_ref()
4688 .find_index_for_column("pre", "id")
4689 .expect("user index should still be there");
4690 assert_eq!(
4691 registered.name, "user_idx",
4692 "auto-index hook must not overwrite an existing index"
4693 );
4694 }
4695
4696 #[test]
4700 fn auto_index_id_dropped_with_collection() {
4701 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4702 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4703 .unwrap();
4704 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4705 .unwrap();
4706 assert!(rt
4707 .index_store_ref()
4708 .find_index_for_column("ephemeral", "id")
4709 .is_some());
4710
4711 rt.execute_query("DROP TABLE ephemeral").unwrap();
4712
4713 assert!(
4714 rt.index_store_ref()
4715 .find_index_for_column("ephemeral", "id")
4716 .is_none(),
4717 "implicit `idx_id` must be reaped when its collection drops"
4718 );
4719 }
4720
4721 #[test]
4726 fn auto_index_id_disabled_by_config() {
4727 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4728 let rt = RedDBRuntime::with_options(opts).unwrap();
4729
4730 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4731 .unwrap();
4732 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4733 .unwrap();
4734
4735 assert!(
4736 rt.index_store_ref()
4737 .find_index_for_column("off", "id")
4738 .is_none(),
4739 "with auto_index_id=false, no implicit index should be created"
4740 );
4741 }
4742
4743 #[test]
4746 fn update_single_row_emits_update_event() {
4747 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4748 rt.execute_query(
4749 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4750 )
4751 .unwrap();
4752 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4753 .unwrap();
4754
4755 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4756 .unwrap();
4757
4758 let events = queue_payloads(&rt, "audit_log");
4759 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4760 let event = events[0].as_object().expect("event payload object");
4761 assert_eq!(
4762 event.get("op").and_then(crate::json::Value::as_str),
4763 Some("update")
4764 );
4765 assert_eq!(
4766 event.get("collection").and_then(crate::json::Value::as_str),
4767 Some("users")
4768 );
4769 assert!(event
4770 .get("event_id")
4771 .and_then(crate::json::Value::as_str)
4772 .is_some_and(|v| !v.is_empty()));
4773 let before = event
4774 .get("before")
4775 .and_then(crate::json::Value::as_object)
4776 .expect("before must be an object");
4777 let after = event
4778 .get("after")
4779 .and_then(crate::json::Value::as_object)
4780 .expect("after must be an object");
4781 assert_eq!(
4782 before.get("name").and_then(crate::json::Value::as_str),
4783 Some("Alice"),
4784 "before.name should be the old value"
4785 );
4786 assert_eq!(
4787 after.get("name").and_then(crate::json::Value::as_str),
4788 Some("Bob"),
4789 "after.name should be the new value"
4790 );
4791 }
4792
4793 #[test]
4794 fn update_event_only_includes_changed_fields() {
4795 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4796 rt.execute_query(
4797 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4798 )
4799 .unwrap();
4800 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4801 .unwrap();
4802
4803 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4804 .unwrap();
4805
4806 let events = queue_payloads(&rt, "evts");
4807 assert_eq!(events.len(), 1);
4808 let event = events[0].as_object().unwrap();
4809 let before = event
4810 .get("before")
4811 .and_then(crate::json::Value::as_object)
4812 .unwrap();
4813 let after = event
4814 .get("after")
4815 .and_then(crate::json::Value::as_object)
4816 .unwrap();
4817 assert!(
4819 before.contains_key("name"),
4820 "before must include changed field"
4821 );
4822 assert!(
4823 after.contains_key("name"),
4824 "after must include changed field"
4825 );
4826 assert!(
4828 !before.contains_key("email"),
4829 "before must not include unchanged email"
4830 );
4831 assert!(
4832 !after.contains_key("email"),
4833 "after must not include unchanged email"
4834 );
4835 }
4836
4837 #[test]
4838 fn multi_row_update_emits_one_event_per_row() {
4839 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4840 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4841 .unwrap();
4842 rt.execute_query(
4843 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4844 )
4845 .unwrap();
4846
4847 rt.execute_query("UPDATE items SET status = 'done'")
4848 .unwrap();
4849
4850 let events = queue_payloads(&rt, "evts");
4851 assert_eq!(events.len(), 3, "expected one update event per row");
4852 for event in &events {
4853 let obj = event.as_object().unwrap();
4854 assert_eq!(
4855 obj.get("op").and_then(crate::json::Value::as_str),
4856 Some("update")
4857 );
4858 }
4859 }
4860
4861 #[test]
4862 fn delete_single_row_emits_delete_event() {
4863 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4864 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4865 .unwrap();
4866 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4867 .unwrap();
4868
4869 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4870
4871 let events = queue_payloads(&rt, "del_log");
4872 assert_eq!(events.len(), 1);
4873 let event = events[0].as_object().expect("event payload object");
4874 assert_eq!(
4875 event.get("op").and_then(crate::json::Value::as_str),
4876 Some("delete")
4877 );
4878 assert_eq!(
4879 event.get("collection").and_then(crate::json::Value::as_str),
4880 Some("users")
4881 );
4882 assert!(event
4883 .get("event_id")
4884 .and_then(crate::json::Value::as_str)
4885 .is_some_and(|v| !v.is_empty()));
4886 let before = event
4887 .get("before")
4888 .and_then(crate::json::Value::as_object)
4889 .expect("before must be an object for delete");
4890 assert_eq!(
4891 before.get("id").and_then(crate::json::Value::as_u64),
4892 Some(42)
4893 );
4894 assert_eq!(
4895 before.get("name").and_then(crate::json::Value::as_str),
4896 Some("Alice")
4897 );
4898 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4899 }
4900
4901 #[test]
4902 fn multi_row_delete_emits_one_event_per_row() {
4903 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4904 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4905 .unwrap();
4906 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4907 .unwrap();
4908
4909 rt.execute_query("DELETE FROM items").unwrap();
4910
4911 let events = queue_payloads(&rt, "del_log");
4912 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4913 for event in &events {
4914 let obj = event.as_object().unwrap();
4915 assert_eq!(
4916 obj.get("op").and_then(crate::json::Value::as_str),
4917 Some("delete")
4918 );
4919 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4920 }
4921 }
4922
4923 #[test]
4924 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4925 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4926 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4927 .unwrap();
4928
4929 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4930 .unwrap();
4931 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4932
4933 let events = queue_payloads(&rt, "evts");
4934 assert!(
4935 events.is_empty(),
4936 "UPDATE-only filter must not emit INSERT or DELETE events"
4937 );
4938 }
4939
4940 #[test]
4943 fn suppress_events_on_insert_emits_no_events() {
4944 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4945 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4946 .unwrap();
4947
4948 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4949 .unwrap();
4950
4951 let events = queue_payloads(&rt, "evts");
4952 assert!(
4953 events.is_empty(),
4954 "SUPPRESS EVENTS must prevent INSERT events"
4955 );
4956 }
4957
4958 #[test]
4959 fn suppress_events_on_update_emits_no_events() {
4960 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4961 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4962 .unwrap();
4963 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4964 .unwrap();
4965 let _ = queue_payloads(&rt, "evts");
4967 rt.execute_query("QUEUE PURGE evts").unwrap();
4969
4970 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4971 .unwrap();
4972
4973 let events = queue_payloads(&rt, "evts");
4974 assert!(
4975 events.is_empty(),
4976 "SUPPRESS EVENTS must prevent UPDATE events"
4977 );
4978 }
4979
4980 #[test]
4981 fn suppress_events_on_delete_emits_no_events() {
4982 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4983 rt.execute_query(
4984 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4985 )
4986 .unwrap();
4987 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4988 .unwrap();
4989
4990 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4991 .unwrap();
4992
4993 let events = queue_payloads(&rt, "evts");
4994 assert!(
4995 events.is_empty(),
4996 "SUPPRESS EVENTS must prevent DELETE events"
4997 );
4998 }
4999
5000 #[test]
5001 fn normal_insert_after_suppress_still_emits() {
5002 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5003 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5004 .unwrap();
5005
5006 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5007 .unwrap();
5008 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5009 .unwrap();
5010
5011 let events = queue_payloads(&rt, "evts");
5012 assert_eq!(
5013 events.len(),
5014 1,
5015 "only the non-suppressed INSERT should emit"
5016 );
5017 assert_eq!(
5018 events[0].get("id").and_then(crate::json::Value::as_u64),
5019 Some(2)
5020 );
5021 }
5022}