1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11 CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12 CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13 RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16 build_row_update_contract_plan, entity_row_fields_snapshot,
17 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18 RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27 sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28 sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43 column: String,
44 expr: Expr,
45 compound_op: Option<BinOp>,
46 metadata_key: Option<&'static str>,
47 row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51 static_field_assignments: Vec<(String, Value)>,
52 static_metadata_assignments: Vec<(String, MetadataValue)>,
53 dynamic_assignments: Vec<CompiledUpdateAssignment>,
54 row_contract_plan: Option<RowUpdateContractPlan>,
55 row_modified_columns: Vec<String>,
56 row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61 dynamic_field_assignments: Vec<(String, Value)>,
62 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
80 let Some(tenant_col) = self.tenant_column(&query.table) else {
81 return Ok(None);
82 };
83 if query
85 .columns
86 .iter()
87 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
88 {
89 return Ok(None);
90 }
91
92 if let Some(dot_pos) = tenant_col.find('.') {
98 let (root, tail) = tenant_col.split_at(dot_pos);
99 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
101 }
102
103 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
104 return Err(RedDBError::Query(format!(
105 "INSERT into tenant-scoped table '{}' requires an active tenant — \
106 run SET TENANT '<id>' first or name column '{}' explicitly",
107 query.table, tenant_col
108 )));
109 };
110
111 let mut augmented = query.clone();
112 augmented.columns.push(tenant_col);
113 let lit = Value::text(tenant_id.clone());
114 for row in augmented.values.iter_mut() {
115 row.push(lit.clone());
116 }
117 for row in augmented.value_exprs.iter_mut() {
118 row.push(crate::storage::query::ast::Expr::Literal {
119 value: lit.clone(),
120 span: crate::storage::query::ast::Span::synthetic(),
121 });
122 }
123 Ok(Some(augmented))
124 }
125
126 fn inject_dotted_tenant(
136 &self,
137 query: &InsertQuery,
138 root: &str,
139 tail: &str,
140 ) -> RedDBResult<Option<InsertQuery>> {
141 let active_tenant = crate::runtime::impl_core::current_tenant();
142 let mut augmented = query.clone();
143 let root_idx = augmented
144 .columns
145 .iter()
146 .position(|c| c.eq_ignore_ascii_case(root));
147
148 if let Some(idx) = root_idx {
149 for row in augmented.values.iter_mut() {
155 let Some(slot) = row.get_mut(idx) else {
156 continue;
157 };
158 if dotted_tail_already_set(slot, tail) {
159 continue;
160 }
161 let Some(tenant_id) = &active_tenant else {
162 return Err(RedDBError::Query(format!(
163 "INSERT into tenant-scoped table '{}' requires an active tenant — \
164 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
165 query.table, root, tail
166 )));
167 };
168 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
169 }
170 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
174 if let Some(slot) = row.get_mut(idx) {
175 let new_value = augmented
176 .values
177 .get(row_idx)
178 .and_then(|v| v.get(idx))
179 .cloned()
180 .unwrap_or(Value::Null);
181 *slot = crate::storage::query::ast::Expr::Literal {
182 value: new_value,
183 span: crate::storage::query::ast::Span::synthetic(),
184 };
185 }
186 }
187 } else {
188 let Some(tenant_id) = &active_tenant else {
192 return Err(RedDBError::Query(format!(
193 "INSERT into tenant-scoped table '{}' requires an active tenant — \
194 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
195 query.table, root, tail
196 )));
197 };
198 augmented.columns.push(root.to_string());
200 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
201 for row in augmented.values.iter_mut() {
202 row.push(fresh.clone());
203 }
204 for row in augmented.value_exprs.iter_mut() {
205 row.push(crate::storage::query::ast::Expr::Literal {
206 value: fresh.clone(),
207 span: crate::storage::query::ast::Span::synthetic(),
208 });
209 }
210 }
211
212 Ok(Some(augmented))
213 }
214
215 fn delete_entities_batch(
218 &self,
219 collection: &str,
220 ids: &[EntityId],
221 ) -> RedDBResult<(u64, Vec<u64>)> {
222 if ids.is_empty() {
223 return Ok((0, vec![]));
224 }
225
226 let store = self.db().store();
227 let Some(manager) = store.get_collection(collection) else {
228 return Ok((0, vec![]));
229 };
230
231 let active_xid = self.current_xid();
232 let conn_id = crate::runtime::impl_core::current_connection_id();
233 let mut autocommit_xid = None;
234 let mut tombstoned_ids = Vec::new();
235 let mut tombstoned_entities = Vec::new();
236 let mut physical_delete_ids = Vec::new();
237 let table_row_resolver =
238 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
239
240 for &id in ids {
241 let Some(mut entity) = manager.get(id) else {
242 continue;
243 };
244 if matches!(entity.data, EntityData::Row(_)) {
245 let previous_xmax = entity.xmax;
246 if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
247 if table_row_resolver.resolve_candidate(&entity).is_none() {
248 continue;
249 }
250 } else if entity.xmax != 0 {
251 continue;
252 }
253
254 let xid = match active_xid {
255 Some(xid) => xid,
256 None => match autocommit_xid {
257 Some(xid) => xid,
258 None => {
259 let mgr = self.snapshot_manager();
260 let xid = mgr.begin();
261 autocommit_xid = Some(xid);
262 xid
263 }
264 },
265 };
266 entity.set_xmax(xid);
267 if manager.update(entity.clone()).is_ok() {
268 if active_xid.is_some() {
269 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
270 }
271 tombstoned_entities.push(entity);
272 tombstoned_ids.push(id);
273 }
274 } else {
275 physical_delete_ids.push(id);
276 }
277 }
278
279 if let Some(xid) = autocommit_xid {
280 self.snapshot_manager().commit(xid);
281 }
282
283 let mut affected = tombstoned_ids.len() as u64;
284 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
285 if active_xid.is_some() {
286 store
287 .persist_entities_to_pager(collection, &tombstoned_entities)
288 .map_err(|err| RedDBError::Internal(err.to_string()))?;
289 } else {
290 store
291 .persist_entities_to_pager(collection, &tombstoned_entities)
292 .map_err(|err| RedDBError::Internal(err.to_string()))?;
293 for id in &tombstoned_ids {
294 store.context_index().remove_entity(*id);
295 let lsn = self.cdc_emit(
296 crate::replication::cdc::ChangeOperation::Delete,
297 collection,
298 id.raw(),
299 "entity",
300 );
301 lsns.push(lsn);
302 }
303 }
304
305 let deleted_ids = store
306 .delete_batch(collection, &physical_delete_ids)
307 .map_err(|err| RedDBError::Internal(err.to_string()))?;
308 affected += deleted_ids.len() as u64;
309 for id in &deleted_ids {
310 store.context_index().remove_entity(*id);
311 let lsn = self.cdc_emit(
312 crate::replication::cdc::ChangeOperation::Delete,
313 collection,
314 id.raw(),
315 "entity",
316 );
317 lsns.push(lsn);
318 }
319
320 Ok((affected, lsns))
321 }
322
323 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
326 if applied.is_empty() {
327 return Ok(Vec::new());
328 }
329
330 let store = self.db().store();
331 if applied.iter().any(|item| item.context_index_dirty) {
332 store.context_index().index_entities(
333 &applied[0].collection,
334 applied
335 .iter()
336 .filter(|item| item.context_index_dirty)
337 .map(|item| &item.entity),
338 );
339 }
340
341 for item in applied {
342 self.refresh_update_secondary_indexes(item)?;
343 }
344
345 let mut lsns = Vec::with_capacity(applied.len());
346 for item in applied {
347 let lsn = self.cdc_emit_prebuilt(
348 crate::replication::cdc::ChangeOperation::Update,
349 &item.collection,
350 &item.entity,
351 update_cdc_item_kind(self, &item.collection, &item.entity),
352 item.metadata.as_ref(),
353 false,
354 );
355 lsns.push(lsn);
356 }
357 Ok(lsns)
358 }
359
360 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
361 self.persist_applied_entity_mutations(applied)
362 }
363
364 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
365 if applied.pre_mutation_fields.is_empty() {
366 return Ok(());
367 }
368 let post = entity_row_fields_snapshot(&applied.entity);
369 if post.is_empty() {
370 return Ok(());
371 }
372
373 let indexed_cols = self
374 .index_store_ref()
375 .indexed_columns_set(&applied.collection);
376 if indexed_cols.is_empty() {
377 return Ok(());
378 }
379
380 if let Some(old_version) = applied.replaced_entity.as_ref() {
381 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
382 .pre_mutation_fields
383 .iter()
384 .filter(|(col, _)| indexed_cols.contains(col))
385 .cloned()
386 .collect();
387 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
388 .iter()
389 .filter(|(col, _)| indexed_cols.contains(col))
390 .cloned()
391 .collect();
392 if !old_index_fields.is_empty() {
393 self.index_store_ref()
394 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
395 .map_err(crate::RedDBError::Internal)?;
396 }
397 if !new_index_fields.is_empty() {
398 self.index_store_ref()
399 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
400 .map_err(crate::RedDBError::Internal)?;
401 }
402 return Ok(());
403 }
404
405 let damage =
406 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
407 if damage
408 .touched_columns()
409 .into_iter()
410 .any(|col| indexed_cols.contains(col))
411 {
412 self.index_store_ref()
413 .index_entity_update(
414 &applied.collection,
415 applied.id,
416 &applied.pre_mutation_fields,
417 &post,
418 )
419 .map_err(crate::RedDBError::Internal)?;
420 }
421 Ok(())
422 }
423
424 pub fn execute_insert(
429 &self,
430 raw_query: &str,
431 query: &InsertQuery,
432 ) -> RedDBResult<RuntimeQueryResult> {
433 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
434 crate::runtime::collection_contract::CollectionContractGate::check(
440 self,
441 &query.table,
442 crate::runtime::collection_contract::MutationKind::Insert,
443 )?;
444 let augmented_owned;
453 let query = match self.maybe_inject_tenant_column(query)? {
454 Some(new_q) => {
455 augmented_owned = new_q;
456 &augmented_owned
457 }
458 None => query,
459 };
460 self.check_insert_column_policy(query)?;
461
462 let mut inserted_count: u64 = 0;
463 let effective_rows =
464 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
465
466 let store = self.inner.db.store();
468 let _ = store.get_or_create_collection(&query.table);
469 let declared_model = self
470 .db()
471 .collection_contract_arc(&query.table)
472 .map(|contract| contract.declared_model);
473
474 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
475 if query.returning.is_some() {
476 Some(Vec::with_capacity(effective_rows.len()))
477 } else {
478 None
479 };
480 let mut returning_result: Option<UnifiedResult> = None;
481
482 if matches!(query.entity_type, InsertEntityType::Row)
483 && !matches!(
484 declared_model,
485 Some(crate::catalog::CollectionModel::TimeSeries)
486 )
487 {
488 let mut rows = Vec::with_capacity(effective_rows.len());
489 for row_values in &effective_rows {
490 if row_values.len() != query.columns.len() {
491 return Err(RedDBError::Query(format!(
492 "INSERT column count ({}) does not match value count ({})",
493 query.columns.len(),
494 row_values.len()
495 )));
496 }
497 let (fields, mut metadata) =
498 split_insert_metadata(self, &query.columns, row_values)?;
499 merge_with_clauses(
500 &mut metadata,
501 query.ttl_ms,
502 query.expires_at_ms,
503 &query.with_metadata,
504 );
505 if let Some(snaps) = returning_snapshots.as_mut() {
506 snaps.push(fields.clone());
507 }
508 rows.push(CreateRowInput {
509 collection: query.table.clone(),
510 fields,
511 metadata,
512 node_links: Vec::new(),
513 vector_links: Vec::new(),
514 });
515 }
516 let outputs = self.create_rows_batch(CreateRowsBatchInput {
517 collection: query.table.clone(),
518 rows,
519 suppress_events: query.suppress_events,
520 })?;
521 inserted_count = outputs.len() as u64;
522
523 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
531 let time_col = &spec.time_column;
532 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
534 for row in &effective_rows {
535 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
536 if *n >= 0 {
537 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
538 }
539 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
540 let _ = self.inner.db.hypertables().route(&query.table, *n);
541 }
542 }
543 }
544 }
545
546 if let (Some(items), Some(snaps)) =
547 (query.returning.as_ref(), returning_snapshots.take())
548 {
549 let snaps = row_insert_returning_snapshots(&outputs, snaps);
550 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
551 }
552 } else {
553 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
560 Vec::with_capacity(effective_rows.len());
561 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
562 {
563 Vec::with_capacity(effective_rows.len())
564 } else {
565 Vec::new()
566 };
567 if matches!(
568 query.entity_type,
569 InsertEntityType::Node | InsertEntityType::Edge
570 ) {
571 enum PreparedGraphInsert {
572 Node {
573 fields: Vec<(String, Value)>,
574 input: CreateNodeInput,
575 },
576 Edge {
577 fields: Vec<(String, Value)>,
578 input: CreateEdgeInput,
579 },
580 }
581
582 let mut prepared = Vec::with_capacity(effective_rows.len());
583 for row_values in &effective_rows {
584 if row_values.len() != query.columns.len() {
585 return Err(RedDBError::Query(format!(
586 "INSERT column count ({}) does not match value count ({})",
587 query.columns.len(),
588 row_values.len()
589 )));
590 }
591
592 match query.entity_type {
593 InsertEntityType::Node => {
594 let (node_values, mut metadata) =
595 split_insert_metadata(self, &query.columns, row_values)?;
596 merge_with_clauses(
597 &mut metadata,
598 query.ttl_ms,
599 query.expires_at_ms,
600 &query.with_metadata,
601 );
602 ensure_non_tree_reserved_metadata_entries(&metadata)?;
603 apply_collection_default_ttl_metadata(
604 self,
605 &query.table,
606 &mut metadata,
607 );
608 let (columns, values) = pairwise_columns_values(&node_values);
609 let label = find_column_value_string(&columns, &values, "label")?;
610 let node_type =
611 find_column_value_opt_string(&columns, &values, "node_type");
612 let properties = extract_remaining_properties(
613 &columns,
614 &values,
615 &["label", "node_type"],
616 );
617 crate::reserved_fields::ensure_no_reserved_public_item_fields(
618 properties.iter().map(|(key, _)| key.as_str()),
619 &format!("node '{}'", query.table),
620 )?;
621 prepared.push(PreparedGraphInsert::Node {
622 fields: node_values,
623 input: CreateNodeInput {
624 collection: query.table.clone(),
625 label,
626 node_type,
627 properties,
628 metadata,
629 embeddings: Vec::new(),
630 table_links: Vec::new(),
631 node_links: Vec::new(),
632 },
633 });
634 }
635 InsertEntityType::Edge => {
636 let (edge_values, mut metadata) =
637 split_insert_metadata(self, &query.columns, row_values)?;
638 merge_with_clauses(
639 &mut metadata,
640 query.ttl_ms,
641 query.expires_at_ms,
642 &query.with_metadata,
643 );
644 ensure_non_tree_reserved_metadata_entries(&metadata)?;
645 apply_collection_default_ttl_metadata(
646 self,
647 &query.table,
648 &mut metadata,
649 );
650 let (columns, values) = pairwise_columns_values(&edge_values);
651 let label = find_column_value_string(&columns, &values, "label")?;
652 ensure_non_tree_structural_edge_label(&label)?;
653 let from_id = resolve_edge_endpoint_any(
654 self.inner.db.store().as_ref(),
655 &query.table,
656 &columns,
657 &values,
658 &["from_rid", "from"],
659 )?;
660 let to_id = resolve_edge_endpoint_any(
661 self.inner.db.store().as_ref(),
662 &query.table,
663 &columns,
664 &values,
665 &["to_rid", "to"],
666 )?;
667 let weight = find_column_value_f32_opt(&columns, &values, "weight");
668 let properties = extract_remaining_properties(
669 &columns,
670 &values,
671 &["label", "from_rid", "to_rid", "from", "to", "weight"],
672 );
673 crate::reserved_fields::ensure_no_reserved_public_item_fields(
674 properties.iter().map(|(key, _)| key.as_str()),
675 &format!("edge '{}'", query.table),
676 )?;
677 prepared.push(PreparedGraphInsert::Edge {
678 fields: edge_values,
679 input: CreateEdgeInput {
680 collection: query.table.clone(),
681 label,
682 from: EntityId::new(from_id),
683 to: EntityId::new(to_id),
684 weight,
685 properties,
686 metadata,
687 },
688 });
689 }
690 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
691 }
692 }
693
694 ensure_graph_insert_contract(self, &query.table)?;
695 let mut batch = self.inner.db.batch();
696 for item in prepared {
697 match item {
698 PreparedGraphInsert::Node { fields, input } => {
699 if query.returning.is_some() {
700 returning_field_snaps.push(fields);
701 }
702 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
703 batch = batch.add_node_with_type(
704 input.collection,
705 input.label,
706 node_type,
707 input.properties.into_iter().collect(),
708 input.metadata.into_iter().collect(),
709 );
710 }
711 PreparedGraphInsert::Edge { fields, input } => {
712 if query.returning.is_some() {
713 returning_field_snaps.push(fields);
714 }
715 batch = batch.add_edge(
716 input.collection,
717 input.label,
718 input.from,
719 input.to,
720 input.weight.unwrap_or(1.0),
721 input.properties.into_iter().collect(),
722 input.metadata.into_iter().collect(),
723 );
724 }
725 }
726 }
727 let batch_result = batch
728 .execute()
729 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
730 let (ids, entity_kind) = match query.entity_type {
731 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
732 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
733 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
734 };
735 for id in &ids {
736 self.stamp_xmin_if_in_txn(&query.table, *id);
737 }
738 if query.returning.is_some() {
739 returning_field_snaps = graph_insert_returning_snapshots(
740 self.inner.db.store().as_ref(),
741 &query.table,
742 &ids,
743 );
744 }
745 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
746 let store = self.inner.db.store();
747 entity_outputs.extend(ids.iter().map(|id| {
748 crate::application::entity::CreateEntityOutput {
749 id: *id,
750 entity: store.get(&query.table, *id),
751 }
752 }));
753 inserted_count = ids.len() as u64;
754 } else {
755 for row_values in &effective_rows {
756 if row_values.len() != query.columns.len() {
757 return Err(RedDBError::Query(format!(
758 "INSERT column count ({}) does not match value count ({})",
759 query.columns.len(),
760 row_values.len()
761 )));
762 }
763
764 match query.entity_type {
765 InsertEntityType::Row => {
766 if query.returning.is_some() {
767 return Err(RedDBError::Query(
768 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
769 .to_string(),
770 ));
771 }
772 let (fields, mut metadata) =
773 split_insert_metadata(self, &query.columns, row_values)?;
774 merge_with_clauses(
775 &mut metadata,
776 query.ttl_ms,
777 query.expires_at_ms,
778 &query.with_metadata,
779 );
780 self.insert_timeseries_point(&query.table, fields, metadata)?;
781 }
782 InsertEntityType::Node | InsertEntityType::Edge => {
783 unreachable!("NODE and EDGE are handled by the prepared graph path")
784 }
785 InsertEntityType::Vector => {
786 let (vector_values, mut metadata) =
787 split_insert_metadata(self, &query.columns, row_values)?;
788 merge_with_clauses(
789 &mut metadata,
790 query.ttl_ms,
791 query.expires_at_ms,
792 &query.with_metadata,
793 );
794 let (columns, values) = pairwise_columns_values(&vector_values);
795 let dense = find_column_value_vec_f32_any(
796 &columns,
797 &values,
798 &["dense", "embedding"],
799 )?;
800 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
801 let content =
802 find_column_value_opt_string(&columns, &values, "content");
803 if query.returning.is_some() {
804 returning_field_snaps.push(vector_values.clone());
805 }
806 let input = CreateVectorInput {
807 collection: query.table.clone(),
808 dense,
809 content,
810 metadata,
811 link_row: None,
812 link_node: None,
813 };
814 entity_outputs.push(self.create_vector(input)?);
815 }
816 InsertEntityType::Document => {
817 let (document_values, mut metadata) =
818 split_insert_metadata(self, &query.columns, row_values)?;
819 merge_with_clauses(
820 &mut metadata,
821 query.ttl_ms,
822 query.expires_at_ms,
823 &query.with_metadata,
824 );
825 let (columns, values) = pairwise_columns_values(&document_values);
826 let body_str = find_column_value_string(&columns, &values, "body")?;
827 let body: crate::json::Value = crate::json::from_str(&body_str)
828 .map_err(|e| {
829 RedDBError::Query(format!("invalid JSON body: {e}"))
830 })?;
831 let input = CreateDocumentInput {
832 collection: query.table.clone(),
833 body,
834 metadata,
835 node_links: Vec::new(),
836 vector_links: Vec::new(),
837 };
838 let output = self.create_document(input)?;
839 if query.returning.is_some() {
840 let fields = output
841 .entity
842 .as_ref()
843 .map(entity_row_fields_snapshot)
844 .filter(|fields| !fields.is_empty())
845 .unwrap_or(document_values);
846 returning_field_snaps.push(fields);
847 }
848 entity_outputs.push(output);
849 }
850 InsertEntityType::Kv => {
851 let (kv_values, mut metadata) =
852 split_insert_metadata(self, &query.columns, row_values)?;
853 merge_with_clauses(
854 &mut metadata,
855 query.ttl_ms,
856 query.expires_at_ms,
857 &query.with_metadata,
858 );
859 let (columns, values) = pairwise_columns_values(&kv_values);
860 let key = find_column_value_string(&columns, &values, "key")?;
861 let value = find_column_value(&columns, &values, "value")?;
862 if query.returning.is_some() {
863 returning_field_snaps.push(kv_values.clone());
864 }
865 let input = CreateKvInput {
866 collection: query.table.clone(),
867 key,
868 value,
869 metadata,
870 };
871 entity_outputs.push(self.create_kv(input)?);
872 }
873 }
874
875 inserted_count += 1;
876 }
877 }
878
879 if let Some(items) = query.returning.as_ref() {
880 if !entity_outputs.is_empty() {
881 returning_result = Some(build_returning_result(
882 items,
883 &returning_field_snaps,
884 Some(&entity_outputs),
885 ));
886 }
887 }
888 }
889
890 if let Some(ref embed_config) = query.auto_embed {
892 let store = self.inner.db.store();
893 let provider = crate::ai::parse_provider(&embed_config.provider)?;
894 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
895 let model = embed_config.model.clone().unwrap_or_else(|| {
896 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
897 .ok()
898 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
899 });
900
901 let manager = store
903 .get_collection(&query.table)
904 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
905 let entities = manager.query_all(|_| true);
906 let recent: Vec<_> = entities
907 .into_iter()
908 .rev()
909 .take(effective_rows.len())
910 .collect();
911
912 let entity_combos: Vec<(usize, String)> = recent
914 .iter()
915 .enumerate()
916 .filter_map(|(i, entity)| {
917 if let EntityData::Row(ref row) = entity.data {
918 if let Some(ref named) = row.named {
919 let texts: Vec<String> = embed_config
920 .fields
921 .iter()
922 .filter_map(|field| match named.get(field) {
923 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
924 _ => None,
925 })
926 .collect();
927 if !texts.is_empty() {
928 return Some((i, texts.join(" ")));
929 }
930 }
931 }
932 None
933 })
934 .collect();
935
936 if !entity_combos.is_empty() {
937 let batch_texts: Vec<String> =
939 entity_combos.iter().map(|(_, t)| t.clone()).collect();
940
941 let batch_client =
942 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
943
944 let embeddings = match tokio::runtime::Handle::try_current() {
945 Ok(handle) => tokio::task::block_in_place(|| {
946 handle.block_on(batch_client.embed_batch(
947 &provider,
948 &model,
949 &api_key,
950 batch_texts,
951 ))
952 }),
953 Err(_) => {
954 return Err(RedDBError::Query(
955 "AUTO EMBED requires a Tokio runtime context".to_string(),
956 ));
957 }
958 }
959 .map_err(|e| RedDBError::Query(e.to_string()))?;
960
961 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
963 if dense.is_empty() {
964 continue;
965 }
966 self.create_vector(CreateVectorInput {
967 collection: query.table.clone(),
968 dense,
969 content: Some(combined.clone()),
970 metadata: Vec::new(),
971 link_row: None,
972 link_node: None,
973 })?;
974 }
975 }
976 }
977
978 if inserted_count > 0 {
979 self.note_table_write(&query.table);
980 }
981
982 let mut result = RuntimeQueryResult::dml_result(
983 raw_query.to_string(),
984 inserted_count,
985 "insert",
986 "runtime-dml",
987 );
988 if let Some(returning) = returning_result {
989 result.result = returning;
990 }
991 Ok(result)
992 }
993
994 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
995 let Some(auth_store) = self.inner.auth_store.read().clone() else {
996 return Ok(());
997 };
998 if !auth_store.iam_authorization_enabled() {
999 return Ok(());
1000 }
1001 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1002 return Ok(());
1003 };
1004
1005 let tenant = crate::runtime::impl_core::current_tenant();
1006 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1007 let request = crate::auth::ColumnAccessRequest {
1008 action: "insert".to_string(),
1009 schema: None,
1010 table: query.table.clone(),
1011 columns: query.columns.clone(),
1012 };
1013 let ctx = crate::auth::policies::EvalContext {
1014 principal_tenant: tenant.clone(),
1015 current_tenant: tenant,
1016 peer_ip: None,
1017 mfa_present: false,
1018 now_ms: crate::auth::now_ms(),
1019 principal_is_admin_role: role == crate::auth::Role::Admin,
1020 };
1021
1022 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1023 let table_allowed = matches!(
1024 outcome.table_decision,
1025 crate::auth::policies::Decision::Allow { .. }
1026 | crate::auth::policies::Decision::AdminBypass
1027 );
1028 if !table_allowed {
1029 return Err(RedDBError::Query(format!(
1030 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1031 outcome.table_resource.kind, outcome.table_resource.name
1032 )));
1033 }
1034 if let Some(denied) = outcome.first_denied_column() {
1035 return Err(RedDBError::Query(format!(
1036 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1037 denied.resource.kind, denied.resource.name
1038 )));
1039 }
1040
1041 Ok(())
1042 }
1043
1044 pub(crate) fn insert_timeseries_point(
1045 &self,
1046 collection: &str,
1047 fields: Vec<(String, Value)>,
1048 mut metadata: Vec<(String, MetadataValue)>,
1049 ) -> RedDBResult<EntityId> {
1050 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1051
1052 let (columns, values) = pairwise_columns_values(&fields);
1053 validate_timeseries_insert_columns(&columns)?;
1054
1055 let metric = find_column_value_string(&columns, &values, "metric")?;
1056 let value = find_column_value_f64(&columns, &values, "value")?;
1057 let timestamp_ns =
1058 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1059 let tags = find_timeseries_tags(&columns, &values)?;
1060
1061 let mut entity = UnifiedEntity::new(
1062 EntityId::new(0),
1063 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1064 series: collection.to_string(),
1065 metric: metric.clone(),
1066 })),
1067 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1068 metric,
1069 timestamp_ns,
1070 value,
1071 tags,
1072 }),
1073 );
1074 let writer_xid = match self.current_xid() {
1078 Some(xid) => xid,
1079 None => {
1080 let mgr = self.snapshot_manager();
1081 let xid = mgr.begin();
1082 mgr.commit(xid);
1083 xid
1084 }
1085 };
1086 entity.set_xmin(writer_xid);
1087
1088 let store = self.inner.db.store();
1089 let id = store
1090 .insert_auto(collection, entity)
1091 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1092
1093 if !metadata.is_empty() {
1094 let _ = store.set_metadata(
1095 collection,
1096 id,
1097 Metadata::with_fields(metadata.into_iter().collect()),
1098 );
1099 }
1100
1101 self.cdc_emit(
1102 crate::replication::cdc::ChangeOperation::Insert,
1103 collection,
1104 id.raw(),
1105 "timeseries",
1106 );
1107
1108 Ok(id)
1109 }
1110
1111 pub fn execute_update(
1116 &self,
1117 raw_query: &str,
1118 query: &UpdateQuery,
1119 ) -> RedDBResult<RuntimeQueryResult> {
1120 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1121 crate::runtime::collection_contract::CollectionContractGate::check(
1127 self,
1128 &query.table,
1129 crate::runtime::collection_contract::MutationKind::Update,
1130 )?;
1131 ensure_update_target_contract(self, &query.table, query.target)?;
1132
1133 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1139 let augmented_query: UpdateQuery;
1140 let effective_query: &UpdateQuery = if rls_gated {
1141 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1142 self,
1143 &query.table,
1144 crate::storage::query::ast::PolicyAction::Update,
1145 );
1146 let Some(policy) = rls_filter else {
1147 let mut response = RuntimeQueryResult::dml_result(
1150 raw_query.to_string(),
1151 0,
1152 "update",
1153 "runtime-dml-rls",
1154 );
1155 if let Some(items) = query.returning.clone() {
1156 response.result = build_returning_result(&items, &[], None);
1157 }
1158 return Ok(response);
1159 };
1160 let mut augmented = query.clone();
1161 augmented.filter = Some(match augmented.filter.take() {
1162 Some(existing) => {
1163 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1164 }
1165 None => policy,
1166 });
1167 augmented_query = augmented;
1168 &augmented_query
1169 } else {
1170 query
1171 };
1172
1173 if let Some(items) = effective_query.returning.clone() {
1178 let mut inner_query = effective_query.clone();
1179 inner_query.returning = None;
1180 let (mut response, touched_ids) =
1181 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1182
1183 let snapshots = if matches!(
1184 effective_query.target,
1185 UpdateTarget::Nodes | UpdateTarget::Edges
1186 ) {
1187 graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1188 } else {
1189 super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1190 .row_snapshots(&touched_ids)
1191 };
1192
1193 response.result = build_returning_result(&items, &snapshots, None);
1194 response.engine = "runtime-dml-returning";
1195 return Ok(response);
1196 }
1197
1198 self.execute_update_inner(raw_query, effective_query)
1199 }
1200
1201 fn execute_update_inner(
1203 &self,
1204 raw_query: &str,
1205 query: &UpdateQuery,
1206 ) -> RedDBResult<RuntimeQueryResult> {
1207 self.execute_update_inner_tracked(raw_query, query)
1208 .map(|(res, _)| res)
1209 }
1210
1211 fn execute_update_inner_tracked(
1212 &self,
1213 raw_query: &str,
1214 query: &UpdateQuery,
1215 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1216 let store = self.inner.db.store();
1217 let effective_filter = effective_update_filter(query);
1218 let compiled_plan = self.compile_update_plan(query)?;
1219 let mut touched_ids: Vec<EntityId> = Vec::new();
1220 let limit_cap = query.limit.map(|l| l as usize);
1221 let manager = store
1222 .get_collection(&query.table)
1223 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1224 let scan_limit = if query.order_by.is_empty() {
1225 limit_cap
1226 } else {
1227 None
1228 };
1229 let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1230 self,
1231 &query.table,
1232 effective_filter.as_ref(),
1233 scan_limit,
1234 query.target,
1235 )
1236 .find_target_ids()?;
1237 let ids_to_update = if query.order_by.is_empty() {
1238 ids_to_update
1239 } else {
1240 ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1241 };
1242
1243 if update_needs_rmw_lock(query) {
1244 return self.execute_update_inner_tracked_locked(
1245 raw_query,
1246 query,
1247 &compiled_plan,
1248 &ids_to_update,
1249 effective_filter.as_ref(),
1250 );
1251 }
1252
1253 let mut affected: u64 = 0;
1254 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1255 let mut applied_chunk = Vec::with_capacity(chunk.len());
1256 for entity in manager.get_many(chunk).into_iter().flatten() {
1257 let assignments =
1258 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1259 let applied = self.apply_materialized_update_for_entity(
1260 query.table.clone(),
1261 entity,
1262 &compiled_plan,
1263 assignments,
1264 )?;
1265 touched_ids.push(applied.id);
1266 applied_chunk.push(applied);
1267 }
1268 self.persist_update_chunk(&applied_chunk)?;
1269 affected += applied_chunk.len() as u64;
1270 let lsns = self.flush_update_chunk(&applied_chunk)?;
1271 if !query.suppress_events {
1272 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1273 }
1274 }
1275
1276 if affected > 0 {
1277 self.note_table_write(&query.table);
1278 }
1279
1280 Ok((
1281 RuntimeQueryResult::dml_result(
1282 raw_query.to_string(),
1283 affected,
1284 "update",
1285 "runtime-dml",
1286 ),
1287 touched_ids,
1288 ))
1289 }
1290
1291 fn execute_update_inner_tracked_locked(
1292 &self,
1293 raw_query: &str,
1294 query: &UpdateQuery,
1295 compiled_plan: &CompiledUpdatePlan,
1296 ids_to_update: &[EntityId],
1297 effective_filter: Option<&Filter>,
1298 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1299 let store = self.inner.db.store();
1300 let mut touched_ids = Vec::new();
1301 let mut lock_entries = Vec::new();
1302
1303 for id in ids_to_update {
1304 let Some(candidate) = store.get(&query.table, *id) else {
1305 continue;
1306 };
1307 let logical_id = candidate.logical_id();
1308 let lock_key = format!("row:{}", logical_id.raw());
1309 let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1310 lock_entries.push((lock_key, logical_id, rmw_lock));
1311 }
1312
1313 lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1314 lock_entries.dedup_by(|left, right| left.0 == right.0);
1315 let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1316
1317 let mut applied_chunk = Vec::new();
1318 for (_, logical_id, _) in &lock_entries {
1319 let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1320 else {
1321 continue;
1322 };
1323 if let Some(filter) = effective_filter {
1324 if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1325 Some(self.inner.db.as_ref()),
1326 &entity,
1327 filter,
1328 &query.table,
1329 &query.table,
1330 ) {
1331 continue;
1332 }
1333 }
1334
1335 let assignments =
1336 self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1337 let applied = self.apply_materialized_update_for_entity(
1338 query.table.clone(),
1339 entity,
1340 compiled_plan,
1341 assignments,
1342 )?;
1343 touched_ids.push(applied.id);
1344 applied_chunk.push(applied);
1345 }
1346
1347 let affected = applied_chunk.len() as u64;
1348 if !applied_chunk.is_empty() {
1349 self.persist_update_chunk(&applied_chunk)?;
1350 let lsns = self.flush_update_chunk(&applied_chunk)?;
1351 if !query.suppress_events {
1352 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1353 }
1354 }
1355
1356 if affected > 0 {
1357 self.note_table_write(&query.table);
1358 }
1359
1360 Ok((
1361 RuntimeQueryResult::dml_result(
1362 raw_query.to_string(),
1363 affected,
1364 "update",
1365 "runtime-dml",
1366 ),
1367 touched_ids,
1368 ))
1369 }
1370
1371 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1372 let mut static_field_assignments = Vec::new();
1373 let mut static_metadata_assignments = Vec::new();
1374 let mut dynamic_assignments = Vec::new();
1375 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1376 let mut row_modified_columns = Vec::new();
1377
1378 for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1379 let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1380 let metadata_key = resolve_sql_ttl_metadata_key(column);
1381 if compound_op.is_some() && metadata_key.is_some() {
1382 return Err(RedDBError::Query(format!(
1383 "compound assignment is only supported for row fields: {column}"
1384 )));
1385 }
1386 if compound_op.is_none() {
1387 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1388 if let Some(metadata_key) = metadata_key {
1389 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1390 let (canonical_key, canonical_value) =
1391 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1392 static_metadata_assignments
1393 .push((canonical_key.to_string(), canonical_value));
1394 } else {
1395 let value = self.resolve_crypto_sentinel(value)?;
1396 static_field_assignments.push((
1397 column.clone(),
1398 normalize_row_update_assignment_with_plan(
1399 &query.table,
1400 column,
1401 value,
1402 row_contract_plan.as_ref(),
1403 )?,
1404 ));
1405 row_modified_columns.push(column.clone());
1406 }
1407 continue;
1408 }
1409 }
1410
1411 dynamic_assignments.push(CompiledUpdateAssignment {
1412 column: column.clone(),
1413 expr: expr.clone(),
1414 compound_op,
1415 metadata_key,
1416 row_rule: if metadata_key.is_none() {
1417 if let Some(plan) = row_contract_plan.as_ref() {
1418 if plan.timestamps_enabled
1419 && (column == "created_at" || column == "updated_at")
1420 {
1421 return Err(RedDBError::Query(format!(
1422 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1423 query.table, column
1424 )));
1425 }
1426 if let Some(rule) = plan.declared_rules.get(column) {
1427 Some(rule.clone())
1428 } else if plan.strict_schema {
1429 return Err(RedDBError::Query(format!(
1430 "collection '{}' is strict and does not allow undeclared fields: {}",
1431 query.table, column
1432 )));
1433 } else {
1434 None
1435 }
1436 } else {
1437 None
1438 }
1439 } else {
1440 None
1441 },
1442 });
1443 if metadata_key.is_none() {
1444 row_modified_columns.push(column.clone());
1445 }
1446 }
1447
1448 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1449 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1450 row_modified_columns.iter().any(|column| {
1451 plan.unique_columns
1452 .keys()
1453 .any(|unique| unique.eq_ignore_ascii_case(column))
1454 })
1455 });
1456
1457 if let Some(ttl_ms) = query.ttl_ms {
1458 static_metadata_assignments
1459 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1460 }
1461 if let Some(expires_at_ms) = query.expires_at_ms {
1462 static_metadata_assignments.push((
1463 "_expires_at".to_string(),
1464 metadata_u64_to_value(expires_at_ms),
1465 ));
1466 }
1467 for (key, val) in &query.with_metadata {
1468 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1469 }
1470
1471 Ok(CompiledUpdatePlan {
1472 static_field_assignments,
1473 static_metadata_assignments,
1474 dynamic_assignments,
1475 row_contract_plan,
1476 row_modified_columns,
1477 row_touches_unique_columns,
1478 })
1479 }
1480
1481 fn materialize_update_assignments_for_entity(
1482 &self,
1483 query: &UpdateQuery,
1484 entity: &UnifiedEntity,
1485 compiled_plan: &CompiledUpdatePlan,
1486 ) -> RedDBResult<MaterializedUpdateAssignments> {
1487 let mut assignments = MaterializedUpdateAssignments::default();
1488 let mut record: Option<UnifiedRecord> = None;
1489
1490 for assignment in &compiled_plan.dynamic_assignments {
1491 if assignment.compound_op.is_some()
1492 && !matches!(
1493 entity.data,
1494 EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1495 )
1496 {
1497 return Err(RedDBError::Query(format!(
1498 "compound assignment is only supported for row or graph UPDATE column '{}'",
1499 assignment.column
1500 )));
1501 }
1502 if record.is_none() {
1503 record = runtime_any_record_from_entity_ref(entity);
1504 }
1505 let Some(record) = record.as_ref() else {
1506 return Err(RedDBError::Query(format!(
1507 "UPDATE could not materialize runtime record for entity {} in '{}'",
1508 entity.id.raw(),
1509 query.table
1510 )));
1511 };
1512 let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1513 Some(self.inner.db.as_ref()),
1514 &assignment.expr,
1515 record,
1516 Some(query.table.as_str()),
1517 Some(query.table.as_str()),
1518 )
1519 .ok_or_else(|| {
1520 RedDBError::Query(format!(
1521 "failed to evaluate UPDATE expression for column '{}'",
1522 assignment.column
1523 ))
1524 })?;
1525 let value = if let Some(op) = assignment.compound_op {
1526 evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1527 } else {
1528 rhs
1529 };
1530
1531 if let Some(metadata_key) = assignment.metadata_key {
1532 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1533 let (canonical_key, canonical_value) =
1534 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1535 assignments
1536 .dynamic_metadata_assignments
1537 .push((canonical_key.to_string(), canonical_value));
1538 } else {
1539 assignments.dynamic_field_assignments.push((
1540 assignment.column.clone(),
1541 normalize_row_update_value_for_rule(
1542 &query.table,
1543 self.resolve_crypto_sentinel(value)?,
1544 assignment.row_rule.as_ref(),
1545 )?,
1546 ));
1547 }
1548 }
1549
1550 Ok(assignments)
1551 }
1552
1553 fn apply_materialized_update_for_entity(
1554 &self,
1555 collection: String,
1556 entity: UnifiedEntity,
1557 compiled_plan: &CompiledUpdatePlan,
1558 assignments: MaterializedUpdateAssignments,
1559 ) -> RedDBResult<AppliedEntityMutation> {
1560 if matches!(entity.data, EntityData::Row(_)) {
1561 return self.apply_loaded_sql_update_row_core(
1562 collection,
1563 entity,
1564 &compiled_plan.static_field_assignments,
1565 assignments.dynamic_field_assignments,
1566 &compiled_plan.static_metadata_assignments,
1567 assignments.dynamic_metadata_assignments,
1568 compiled_plan.row_contract_plan.as_ref(),
1569 &compiled_plan.row_modified_columns,
1570 compiled_plan.row_touches_unique_columns,
1571 );
1572 }
1573
1574 ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
1575
1576 let operations = build_patch_operations_from_materialized_assignments(
1577 &entity,
1578 compiled_plan,
1579 assignments,
1580 );
1581 self.apply_loaded_patch_entity_core(
1582 collection,
1583 entity,
1584 crate::json::Value::Null,
1585 operations,
1586 )
1587 }
1588
1589 pub fn execute_delete(
1591 &self,
1592 raw_query: &str,
1593 query: &DeleteQuery,
1594 ) -> RedDBResult<RuntimeQueryResult> {
1595 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1596 crate::runtime::collection_contract::CollectionContractGate::check(
1600 self,
1601 &query.table,
1602 crate::runtime::collection_contract::MutationKind::Delete,
1603 )?;
1604
1605 if let Some(items) = query.returning.clone() {
1612 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1613 RedDBError::Query(
1614 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1615 )
1616 })?;
1617 let captured = self.execute_query(&select_sql)?;
1618
1619 let mut inner_query = query.clone();
1620 inner_query.returning = None;
1621 let _ = self.execute_delete(raw_query, &inner_query)?;
1622
1623 let snapshots: Vec<Vec<(String, Value)>> = captured
1624 .result
1625 .records
1626 .iter()
1627 .map(|rec| {
1628 rec.iter_fields()
1629 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1630 .collect()
1631 })
1632 .collect();
1633 let affected = snapshots.len() as u64;
1634 let result = build_returning_result(&items, &snapshots, None);
1635
1636 let mut response = RuntimeQueryResult::dml_result(
1637 raw_query.to_string(),
1638 affected,
1639 "delete",
1640 "runtime-dml-returning",
1641 );
1642 response.result = result;
1643 return Ok(response);
1644 }
1645 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1652 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1653 self,
1654 &query.table,
1655 crate::storage::query::ast::PolicyAction::Delete,
1656 );
1657 let Some(policy) = rls_filter else {
1658 return Ok(RuntimeQueryResult::dml_result(
1659 raw_query.to_string(),
1660 0,
1661 "delete",
1662 "runtime-dml-rls",
1663 ));
1664 };
1665 let mut augmented = query.clone();
1670 augmented.filter = Some(match augmented.filter.take() {
1671 Some(existing) => {
1672 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1673 }
1674 None => policy,
1675 });
1676 return self.execute_delete_inner(raw_query, &augmented);
1677 }
1678 self.execute_delete_inner(raw_query, query)
1679 }
1680
1681 fn execute_delete_inner(
1682 &self,
1683 raw_query: &str,
1684 query: &DeleteQuery,
1685 ) -> RedDBResult<RuntimeQueryResult> {
1686 let effective_filter = effective_delete_filter(query);
1687
1688 let scan = super::dml_target_scan::DmlTargetScan::new(
1692 self,
1693 &query.table,
1694 effective_filter.as_ref(),
1695 None,
1696 );
1697 let ids_to_delete = scan.find_target_ids()?;
1698
1699 let needs_delete_events =
1702 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1703 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1704 scan.row_json_pre_images(&ids_to_delete)
1705 } else {
1706 HashMap::new()
1707 };
1708
1709 let mut affected: u64 = 0;
1710 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1711 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1712 affected += count;
1713 if needs_delete_events && !lsns.is_empty() {
1714 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1718 self.emit_delete_events_for_collection(
1719 &query.table,
1720 deleted_chunk,
1721 &lsns,
1722 &pre_images,
1723 )?;
1724 }
1725 }
1726 pre_images.clear();
1727
1728 if affected > 0 {
1729 self.note_table_write(&query.table);
1730 }
1731
1732 Ok(RuntimeQueryResult::dml_result(
1733 raw_query.to_string(),
1734 affected,
1735 "delete",
1736 "runtime-dml",
1737 ))
1738 }
1739}
1740
1741fn ensure_graph_identity_update_allowed(
1742 entity: &UnifiedEntity,
1743 compiled_plan: &CompiledUpdatePlan,
1744 assignments: &MaterializedUpdateAssignments,
1745) -> RedDBResult<()> {
1746 if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
1747 return Ok(());
1748 }
1749
1750 for (column, _) in compiled_plan
1751 .static_field_assignments
1752 .iter()
1753 .chain(assignments.dynamic_field_assignments.iter())
1754 {
1755 if is_immutable_graph_identity_field(column) {
1756 return Err(RedDBError::Query(format!(
1757 "immutable graph field '{column}' cannot be updated"
1758 )));
1759 }
1760 }
1761
1762 Ok(())
1763}
1764
1765fn is_immutable_graph_identity_field(column: &str) -> bool {
1766 ["rid", "label", "from_rid", "to_rid", "from", "to"]
1767 .iter()
1768 .any(|reserved| column.eq_ignore_ascii_case(reserved))
1769}
1770
1771fn build_patch_operations_from_materialized_assignments(
1772 entity: &UnifiedEntity,
1773 compiled_plan: &CompiledUpdatePlan,
1774 assignments: MaterializedUpdateAssignments,
1775) -> Vec<PatchEntityOperation> {
1776 let mut operations = Vec::with_capacity(
1777 compiled_plan.static_field_assignments.len()
1778 + compiled_plan.static_metadata_assignments.len()
1779 + assignments.dynamic_field_assignments.len()
1780 + assignments.dynamic_metadata_assignments.len(),
1781 );
1782
1783 for (column, value) in &compiled_plan.static_field_assignments {
1784 operations.push(PatchEntityOperation {
1785 op: PatchEntityOperationType::Set,
1786 path: update_patch_path_for_entity(entity, column),
1787 value: Some(storage_value_to_json(value)),
1788 });
1789 }
1790
1791 for (column, value) in assignments.dynamic_field_assignments {
1792 operations.push(PatchEntityOperation {
1793 op: PatchEntityOperationType::Set,
1794 path: update_patch_path_for_entity(entity, &column),
1795 value: Some(storage_value_to_json(&value)),
1796 });
1797 }
1798
1799 for (key, value) in &compiled_plan.static_metadata_assignments {
1800 operations.push(PatchEntityOperation {
1801 op: PatchEntityOperationType::Set,
1802 path: vec!["metadata".to_string(), key.clone()],
1803 value: Some(metadata_value_to_json(value)),
1804 });
1805 }
1806
1807 for (key, value) in assignments.dynamic_metadata_assignments {
1808 operations.push(PatchEntityOperation {
1809 op: PatchEntityOperationType::Set,
1810 path: vec!["metadata".to_string(), key],
1811 value: Some(metadata_value_to_json(&value)),
1812 });
1813 }
1814
1815 operations
1816}
1817
1818fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
1819 if matches!(
1820 (&entity.kind, &entity.data),
1821 (
1822 crate::storage::EntityKind::GraphNode(_),
1823 EntityData::Node(_)
1824 )
1825 ) && column.eq_ignore_ascii_case("node_type")
1826 {
1827 return vec!["node_type".to_string()];
1828 }
1829 if matches!(
1830 (&entity.kind, &entity.data),
1831 (
1832 crate::storage::EntityKind::GraphEdge(_),
1833 EntityData::Edge(_)
1834 )
1835 ) && column.eq_ignore_ascii_case("weight")
1836 {
1837 return vec!["weight".to_string()];
1838 }
1839 vec!["fields".to_string(), column.to_string()]
1840}
1841
1842fn delete_to_select_sql(sql: &str) -> Option<String> {
1852 let trimmed = sql.trim_start();
1853 let lowered = trimmed.to_ascii_lowercase();
1854 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1855 return None;
1856 }
1857 let from_idx = lowered.find(" from ")?;
1859 let after_from = &trimmed[from_idx + " from ".len()..];
1860 let after_from_lc = &lowered[from_idx + " from ".len()..];
1861
1862 let mut body = after_from.to_string();
1867 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1868 body.truncate(pos);
1869 }
1870 Some(format!("SELECT * FROM {}", body.trim_end()))
1871}
1872
1873fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1878 let bytes = haystack.as_bytes();
1879 let nlen = needle.len();
1880 let mut i = 0usize;
1881 let mut in_string = false;
1882 while i < bytes.len() {
1883 let c = bytes[i];
1884 if c == b'\'' {
1885 in_string = !in_string;
1886 i += 1;
1887 continue;
1888 }
1889 if !in_string
1890 && i + nlen <= bytes.len()
1891 && &bytes[i..i + nlen] == needle.as_bytes()
1892 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1893 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1894 {
1895 return Some(i);
1896 }
1897 i += 1;
1898 }
1899 None
1900}
1901
1902fn build_returning_result(
1909 items: &[ReturningItem],
1910 snapshots: &[Vec<(String, Value)>],
1911 outputs: Option<&[CreateEntityOutput]>,
1912) -> UnifiedResult {
1913 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1914 let public_item_outputs = outputs.is_some_and(|outs| {
1915 outs.first()
1916 .and_then(|out| out.entity.as_ref())
1917 .is_some_and(|entity| public_returning_item_kind(entity).is_some())
1918 });
1919
1920 let mut columns: Vec<String> = if project_all {
1921 let mut cols: Vec<String> = Vec::new();
1922 if public_item_outputs {
1923 cols.extend(
1924 [
1925 "rid",
1926 "collection",
1927 "kind",
1928 "tenant",
1929 "created_at",
1930 "updated_at",
1931 ]
1932 .into_iter()
1933 .map(str::to_string),
1934 );
1935 } else if outputs.is_some() {
1936 cols.push("red_entity_id".to_string());
1937 }
1938 if let Some(first) = snapshots.first() {
1939 for (name, _) in first {
1940 cols.push(name.clone());
1941 }
1942 }
1943 cols
1944 } else {
1945 items
1946 .iter()
1947 .filter_map(|it| match it {
1948 ReturningItem::Column(c) => Some(c.clone()),
1949 ReturningItem::All => None,
1950 })
1951 .collect()
1952 };
1953 {
1955 let mut seen = std::collections::HashSet::new();
1956 columns.retain(|c| seen.insert(c.clone()));
1957 }
1958
1959 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1960 for (idx, snap) in snapshots.iter().enumerate() {
1961 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1962 if let Some(outs) = outputs {
1963 if let Some(out) = outs.get(idx) {
1964 if let Some(entity) = out.entity.as_ref() {
1965 if let Some(kind) = public_returning_item_kind(entity) {
1966 values.insert(
1967 Arc::clone(&sys_key_rid()),
1968 Value::UnsignedInteger(out.id.raw()),
1969 );
1970 values.insert(
1971 Arc::clone(&sys_key_collection()),
1972 Value::text(entity.kind.collection().to_string()),
1973 );
1974 values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
1975 values.insert(
1976 Arc::clone(&sys_key_created_at()),
1977 Value::UnsignedInteger(entity.created_at),
1978 );
1979 values.insert(
1980 Arc::clone(&sys_key_updated_at()),
1981 Value::UnsignedInteger(entity.updated_at),
1982 );
1983 } else {
1984 values.insert(
1985 Arc::clone(&sys_key_red_entity_id()),
1986 Value::Integer(out.id.raw() as i64),
1987 );
1988 }
1989 } else {
1990 values.insert(
1991 Arc::clone(&sys_key_red_entity_id()),
1992 Value::Integer(out.id.raw() as i64),
1993 );
1994 }
1995 }
1996 }
1997 for (name, val) in snap {
1998 values.insert(Arc::from(name.as_str()), val.clone());
1999 }
2000 if !values.contains_key("tenant") {
2001 let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2002 values.insert(Arc::clone(&sys_key_tenant()), tenant);
2003 }
2004 let mut rec = UnifiedRecord::default();
2005 for col in &columns {
2007 if let Some(v) = values.get(col.as_str()) {
2008 rec.set_arc(Arc::from(col.as_str()), v.clone());
2009 }
2010 }
2011 records.push(rec);
2012 }
2013
2014 UnifiedResult {
2015 columns,
2016 records,
2017 stats: Default::default(),
2018 pre_serialized_json: None,
2019 }
2020}
2021
2022fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2023 match (&entity.kind, &entity.data) {
2024 (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2025 Some("node")
2026 }
2027 (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2028 Some("edge")
2029 }
2030 (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2031 _ => None,
2032 }
2033}
2034
2035fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2036 let Some(row) = entity.data.as_row() else {
2037 return "row";
2038 };
2039
2040 let is_kv = row.named.as_ref().is_some_and(|named| {
2041 (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2042 || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2043 });
2044 if is_kv {
2045 return "kv";
2046 }
2047
2048 let is_document = row
2049 .named
2050 .as_ref()
2051 .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2052 || row.columns.iter().any(runtime_returning_documentish_value);
2053 if is_document {
2054 "document"
2055 } else {
2056 "row"
2057 }
2058}
2059
2060fn runtime_returning_documentish_value(value: &Value) -> bool {
2061 matches!(value, Value::Json(_) | Value::Blob(_))
2062}
2063
2064fn row_insert_returning_snapshots(
2065 outputs: &[CreateEntityOutput],
2066 fallback: Vec<Vec<(String, Value)>>,
2067) -> Vec<Vec<(String, Value)>> {
2068 outputs
2069 .iter()
2070 .enumerate()
2071 .map(|(idx, out)| {
2072 out.entity
2073 .as_ref()
2074 .map(entity_row_fields_snapshot)
2075 .filter(|snap| !snap.is_empty())
2076 .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2077 })
2078 .collect()
2079}
2080
2081fn graph_insert_returning_snapshots(
2082 store: &crate::storage::unified::UnifiedStore,
2083 collection: &str,
2084 ids: &[EntityId],
2085) -> Vec<Vec<(String, Value)>> {
2086 let Some(manager) = store.get_collection(collection) else {
2087 return Vec::new();
2088 };
2089
2090 ids.iter()
2091 .filter_map(|id| manager.get(*id))
2092 .filter_map(|entity| {
2093 let mut record = runtime_any_record_from_entity_ref(&entity)?;
2094 record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2095 Some(record)
2096 })
2097 .map(|record| {
2098 record
2099 .iter_fields()
2100 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2101 .collect()
2102 })
2103 .collect()
2104}
2105
2106fn graph_update_returning_snapshots(
2107 runtime: &RedDBRuntime,
2108 collection: &str,
2109 ids: &[EntityId],
2110) -> Vec<Vec<(String, Value)>> {
2111 let store = runtime.db().store();
2112 let Some(manager) = store.get_collection(collection) else {
2113 return Vec::new();
2114 };
2115
2116 manager
2117 .get_many(ids)
2118 .into_iter()
2119 .flatten()
2120 .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2121 .map(|record| {
2122 record
2123 .iter_fields()
2124 .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2125 .collect()
2126 })
2127 .collect()
2128}
2129
2130fn ensure_update_target_contract(
2131 runtime: &RedDBRuntime,
2132 collection: &str,
2133 target: UpdateTarget,
2134) -> RedDBResult<()> {
2135 let Some(contract) = runtime.db().collection_contract(collection) else {
2136 return Ok(());
2137 };
2138 if update_target_contract_is_advisory(&contract)
2139 || update_target_allows_model(contract.declared_model, update_target_model(target))
2140 {
2141 return Ok(());
2142 }
2143 Err(RedDBError::InvalidOperation(format!(
2144 "collection '{}' is declared as '{}' and does not allow '{}' updates",
2145 collection,
2146 update_model_name(contract.declared_model),
2147 update_model_name(update_target_model(target))
2148 )))
2149}
2150
2151fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2152 matches!(
2153 (&contract.origin, &contract.schema_mode),
2154 (
2155 crate::physical::ContractOrigin::Implicit,
2156 crate::catalog::SchemaMode::Dynamic,
2157 )
2158 )
2159}
2160
2161fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2162 match target {
2163 UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2164 UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2165 UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2166 UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2167 }
2168}
2169
2170fn update_target_allows_model(
2171 declared_model: crate::catalog::CollectionModel,
2172 requested_model: crate::catalog::CollectionModel,
2173) -> bool {
2174 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2175}
2176
2177fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2178 match model {
2179 crate::catalog::CollectionModel::Table => "table",
2180 crate::catalog::CollectionModel::Document => "document",
2181 crate::catalog::CollectionModel::Graph => "graph",
2182 crate::catalog::CollectionModel::Vector => "vector",
2183 crate::catalog::CollectionModel::Hll => "hll",
2184 crate::catalog::CollectionModel::Sketch => "sketch",
2185 crate::catalog::CollectionModel::Filter => "filter",
2186 crate::catalog::CollectionModel::Kv => "kv",
2187 crate::catalog::CollectionModel::Config => "config",
2188 crate::catalog::CollectionModel::Vault => "vault",
2189 crate::catalog::CollectionModel::Mixed => "mixed",
2190 crate::catalog::CollectionModel::TimeSeries => "timeseries",
2191 crate::catalog::CollectionModel::Queue => "queue",
2192 crate::catalog::CollectionModel::Metrics => "metrics",
2193 }
2194}
2195
2196fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2197 let db = runtime.db();
2198 if let Some(contract) = db.collection_contract(collection) {
2199 let advisory_implicit_dynamic = matches!(
2200 (&contract.origin, &contract.schema_mode),
2201 (
2202 crate::physical::ContractOrigin::Implicit,
2203 crate::catalog::SchemaMode::Dynamic,
2204 )
2205 );
2206 if advisory_implicit_dynamic
2207 || matches!(
2208 contract.declared_model,
2209 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2210 )
2211 {
2212 return Ok(());
2213 }
2214 return Err(RedDBError::InvalidOperation(format!(
2215 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2216 collection, contract.declared_model
2217 )));
2218 }
2219
2220 let now = std::time::SystemTime::now()
2221 .duration_since(std::time::UNIX_EPOCH)
2222 .unwrap_or_default()
2223 .as_millis();
2224 db.save_collection_contract(crate::physical::CollectionContract {
2225 name: collection.to_string(),
2226 declared_model: crate::catalog::CollectionModel::Graph,
2227 schema_mode: crate::catalog::SchemaMode::Dynamic,
2228 origin: crate::physical::ContractOrigin::Implicit,
2229 version: 1,
2230 created_at_unix_ms: now,
2231 updated_at_unix_ms: now,
2232 default_ttl_ms: db.collection_default_ttl_ms(collection),
2233 vector_dimension: None,
2234 vector_metric: None,
2235 context_index_fields: Vec::new(),
2236 declared_columns: Vec::new(),
2237 table_def: None,
2238 timestamps_enabled: false,
2239 context_index_enabled: false,
2240 metrics_raw_retention_ms: None,
2241 metrics_rollup_policies: Vec::new(),
2242 metrics_tenant_identity: None,
2243 metrics_namespace: None,
2244 append_only: false,
2245 subscriptions: Vec::new(),
2246 })
2247 .map(|_| ())
2248 .map_err(|err| RedDBError::Internal(err.to_string()))
2249}
2250
2251fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2252 query
2253 .assignment_exprs
2254 .iter()
2255 .enumerate()
2256 .any(|(idx, (column, expr))| {
2257 query
2258 .compound_assignment_ops
2259 .get(idx)
2260 .is_some_and(|op| op.is_some())
2261 || expr_references_update_column(expr, &query.table, column)
2262 })
2263}
2264
2265fn evaluate_compound_update_assignment(
2266 column: &str,
2267 record: &UnifiedRecord,
2268 op: BinOp,
2269 rhs: Value,
2270) -> RedDBResult<Value> {
2271 let lhs = record.get(column).ok_or_else(|| {
2272 RedDBError::Query(format!(
2273 "compound assignment requires existing numeric field '{column}'"
2274 ))
2275 })?;
2276 if matches!(lhs, Value::Null) {
2277 return Err(RedDBError::Query(format!(
2278 "compound assignment requires non-null numeric field '{column}'"
2279 )));
2280 }
2281 apply_compound_numeric_op(column, op, lhs, &rhs)
2282}
2283
2284fn apply_compound_numeric_op(
2285 column: &str,
2286 op: BinOp,
2287 lhs: &Value,
2288 rhs: &Value,
2289) -> RedDBResult<Value> {
2290 let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2291 return Err(RedDBError::Query(format!(
2292 "compound assignment requires numeric field '{column}'"
2293 )));
2294 };
2295 let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2296 return Err(RedDBError::Query(format!(
2297 "compound assignment requires numeric right-hand value for field '{column}'"
2298 )));
2299 };
2300
2301 if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2302 let a = lhs_number.as_f64();
2303 let b = rhs_number.as_f64();
2304 let out = match op {
2305 BinOp::Add => a + b,
2306 BinOp::Sub => a - b,
2307 BinOp::Mul => a * b,
2308 BinOp::Div => {
2309 if b == 0.0 {
2310 return Err(RedDBError::Query(format!(
2311 "division by zero in compound assignment for field '{column}'"
2312 )));
2313 }
2314 a / b
2315 }
2316 BinOp::Mod => {
2317 if b == 0.0 {
2318 return Err(RedDBError::Query(format!(
2319 "modulo by zero in compound assignment for field '{column}'"
2320 )));
2321 }
2322 a % b
2323 }
2324 _ => {
2325 return Err(RedDBError::Query(format!(
2326 "unsupported compound assignment operator for field '{column}'"
2327 )));
2328 }
2329 };
2330 if !out.is_finite() {
2331 return Err(RedDBError::Query(format!(
2332 "numeric overflow in compound assignment for field '{column}'"
2333 )));
2334 }
2335 return Ok(Value::Float(out));
2336 }
2337
2338 let a = lhs_number.as_i128();
2339 let b = rhs_number.as_i128();
2340 let out = match op {
2341 BinOp::Add => a.checked_add(b),
2342 BinOp::Sub => a.checked_sub(b),
2343 BinOp::Mul => a.checked_mul(b),
2344 BinOp::Mod => {
2345 if b == 0 {
2346 return Err(RedDBError::Query(format!(
2347 "modulo by zero in compound assignment for field '{column}'"
2348 )));
2349 }
2350 a.checked_rem(b)
2351 }
2352 BinOp::Div => unreachable!("integer division is handled by the float branch"),
2353 _ => None,
2354 }
2355 .ok_or_else(|| {
2356 RedDBError::Query(format!(
2357 "numeric overflow in compound assignment for field '{column}'"
2358 ))
2359 })?;
2360
2361 if matches!(lhs, Value::UnsignedInteger(_)) {
2362 let value = u64::try_from(out).map_err(|_| {
2363 RedDBError::Query(format!(
2364 "numeric overflow in compound assignment for field '{column}'"
2365 ))
2366 })?;
2367 Ok(Value::UnsignedInteger(value))
2368 } else {
2369 let value = i64::try_from(out).map_err(|_| {
2370 RedDBError::Query(format!(
2371 "numeric overflow in compound assignment for field '{column}'"
2372 ))
2373 })?;
2374 Ok(Value::Integer(value))
2375 }
2376}
2377
2378#[derive(Clone, Copy)]
2379enum CompoundNumber {
2380 Integer(i128),
2381 Float(f64),
2382}
2383
2384impl CompoundNumber {
2385 fn from_value(value: &Value) -> Option<Self> {
2386 match value {
2387 Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2388 Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2389 Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2390 Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2391 _ => None,
2392 }
2393 }
2394
2395 fn is_float(self) -> bool {
2396 matches!(self, Self::Float(_))
2397 }
2398
2399 fn as_f64(self) -> f64 {
2400 match self {
2401 Self::Integer(value) => value as f64,
2402 Self::Float(value) => value,
2403 }
2404 }
2405
2406 fn as_i128(self) -> i128 {
2407 match self {
2408 Self::Integer(value) => value,
2409 Self::Float(_) => unreachable!("float compound number used as integer"),
2410 }
2411 }
2412}
2413
2414fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2415 match expr {
2416 Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2417 Expr::Column { field, .. } => {
2418 field_ref_matches_update_column(field, table_name, target_column)
2419 }
2420 Expr::BinaryOp { lhs, rhs, .. } => {
2421 expr_references_update_column(lhs, table_name, target_column)
2422 || expr_references_update_column(rhs, table_name, target_column)
2423 }
2424 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2425 expr_references_update_column(operand, table_name, target_column)
2426 }
2427 Expr::FunctionCall { args, .. } => args
2428 .iter()
2429 .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2430 Expr::Case {
2431 branches, else_, ..
2432 } => {
2433 branches.iter().any(|(cond, value)| {
2434 expr_references_update_column(cond, table_name, target_column)
2435 || expr_references_update_column(value, table_name, target_column)
2436 }) || else_
2437 .as_deref()
2438 .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2439 }
2440 Expr::IsNull { operand, .. } => {
2441 expr_references_update_column(operand, table_name, target_column)
2442 }
2443 Expr::InList { target, values, .. } => {
2444 expr_references_update_column(target, table_name, target_column)
2445 || values
2446 .iter()
2447 .any(|value| expr_references_update_column(value, table_name, target_column))
2448 }
2449 Expr::Between {
2450 target, low, high, ..
2451 } => {
2452 expr_references_update_column(target, table_name, target_column)
2453 || expr_references_update_column(low, table_name, target_column)
2454 || expr_references_update_column(high, table_name, target_column)
2455 }
2456 }
2457}
2458
2459fn field_ref_matches_update_column(
2460 field: &FieldRef,
2461 table_name: &str,
2462 target_column: &str,
2463) -> bool {
2464 match field {
2465 FieldRef::TableColumn { table, column } => {
2466 column.eq_ignore_ascii_case(target_column)
2467 && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2468 }
2469 FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2470 false
2471 }
2472 }
2473}
2474
2475fn resolve_update_entity_by_logical_id(
2476 runtime: &RedDBRuntime,
2477 table: &str,
2478 logical_id: EntityId,
2479) -> Option<UnifiedEntity> {
2480 let store = runtime.inner.db.store();
2481 crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2482 .resolve_logical_id(&store, table, logical_id)
2483}
2484
2485fn update_cdc_item_kind(
2486 runtime: &RedDBRuntime,
2487 collection: &str,
2488 entity: &UnifiedEntity,
2489) -> &'static str {
2490 match &entity.data {
2491 EntityData::Node(_) => return "node",
2492 EntityData::Edge(_) => return "edge",
2493 _ => {}
2494 }
2495
2496 match runtime
2497 .db()
2498 .collection_contract(collection)
2499 .map(|contract| contract.declared_model)
2500 {
2501 Some(crate::catalog::CollectionModel::Document) => "document",
2502 Some(crate::catalog::CollectionModel::Kv)
2503 | Some(crate::catalog::CollectionModel::Vault) => "kv",
2504 _ => "row",
2505 }
2506}
2507
2508fn ordered_update_target_ids(
2509 manager: &Arc<crate::storage::SegmentManager>,
2510 entity_ids: &[EntityId],
2511 order_by: &[OrderByClause],
2512 limit: Option<usize>,
2513) -> Vec<EntityId> {
2514 let mut entities: Vec<UnifiedEntity> =
2515 manager.get_many(entity_ids).into_iter().flatten().collect();
2516 entities.sort_by(|left, right| compare_update_order(left, right, order_by));
2517 if let Some(limit) = limit {
2518 entities.truncate(limit);
2519 }
2520 entities.into_iter().map(|entity| entity.id).collect()
2521}
2522
2523fn compare_update_order(
2524 left: &UnifiedEntity,
2525 right: &UnifiedEntity,
2526 order_by: &[OrderByClause],
2527) -> Ordering {
2528 for clause in order_by {
2529 let left_value = update_order_value(left, &clause.field);
2530 let right_value = update_order_value(right, &clause.field);
2531 let ordering = compare_update_order_values(
2532 left_value.as_ref(),
2533 right_value.as_ref(),
2534 clause.nulls_first,
2535 );
2536 if ordering != Ordering::Equal {
2537 return if clause.ascending {
2538 ordering
2539 } else {
2540 ordering.reverse()
2541 };
2542 }
2543 }
2544 left.logical_id().raw().cmp(&right.logical_id().raw())
2545}
2546
2547fn compare_update_order_values(
2548 left: Option<&Value>,
2549 right: Option<&Value>,
2550 nulls_first: bool,
2551) -> Ordering {
2552 match (left, right) {
2553 (None, None) => Ordering::Equal,
2554 (None, Some(_)) => {
2555 if nulls_first {
2556 Ordering::Less
2557 } else {
2558 Ordering::Greater
2559 }
2560 }
2561 (Some(_), None) => {
2562 if nulls_first {
2563 Ordering::Greater
2564 } else {
2565 Ordering::Less
2566 }
2567 }
2568 (Some(left), Some(right)) => {
2569 crate::storage::query::value_compare::total_compare_values(left, right)
2570 }
2571 }
2572}
2573
2574fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
2575 let FieldRef::TableColumn { table, column } = field else {
2576 return None;
2577 };
2578 if !table.is_empty() {
2579 return None;
2580 }
2581 if column.eq_ignore_ascii_case("rid") {
2582 return Some(Value::UnsignedInteger(entity.logical_id().raw()));
2583 }
2584 match &entity.data {
2585 EntityData::Row(row) => row.get_field(column).cloned(),
2586 EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
2587 .and_then(|record| record.get(column).cloned()),
2588 _ => None,
2589 }
2590}
2591
2592fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
2593 if columns.is_empty() {
2594 return columns;
2595 }
2596
2597 let mut unique = Vec::with_capacity(columns.len());
2598 for column in columns.drain(..) {
2599 if !unique
2600 .iter()
2601 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
2602 {
2603 unique.push(column);
2604 }
2605 }
2606 unique
2607}
2608
2609const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
2614
2615fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
2616 if column.eq_ignore_ascii_case("_ttl") {
2617 Some(SQL_TTL_METADATA_COLUMNS[0])
2618 } else if column.eq_ignore_ascii_case("_ttl_ms") {
2619 Some(SQL_TTL_METADATA_COLUMNS[1])
2620 } else if column.eq_ignore_ascii_case("_expires_at") {
2621 Some(SQL_TTL_METADATA_COLUMNS[2])
2622 } else {
2623 None
2624 }
2625}
2626
2627fn canonicalize_sql_ttl_metadata(
2632 key: &'static str,
2633 value: MetadataValue,
2634) -> (&'static str, MetadataValue) {
2635 if key != "_ttl" {
2636 return (key, value);
2637 }
2638 let scaled = match value {
2639 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
2640 MetadataValue::Timestamp(ms_or_s) => {
2641 MetadataValue::Timestamp(ms_or_s)
2644 }
2645 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
2646 other => other,
2647 };
2648 ("_ttl_ms", scaled)
2649}
2650
2651pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
2655
2656impl RedDBRuntime {
2657 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
2663 match value {
2664 Value::Password(marked) => {
2665 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
2666 Ok(Value::Password(crate::auth::store::hash_password(plain)))
2667 } else {
2668 Ok(Value::Password(marked))
2669 }
2670 }
2671 Value::Secret(bytes) => {
2672 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
2673 if !self.secret_auto_encrypt() {
2674 return Err(RedDBError::Query(
2675 "SECRET() literal rejected: red.config.secret.auto_encrypt \
2676 is false. Insert pre-encrypted bytes directly instead."
2677 .to_string(),
2678 ));
2679 }
2680 let key = self.secret_aes_key().ok_or_else(|| {
2681 RedDBError::Query(
2682 "SECRET() column encryption requires a bootstrapped \
2683 vault (red.secret.aes_key is missing). Start the server \
2684 with --vault to enable."
2685 .to_string(),
2686 )
2687 })?;
2688 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
2689 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
2690 } else {
2691 Ok(Value::Secret(bytes))
2692 }
2693 }
2694 other => Ok(other),
2695 }
2696 }
2697}
2698
2699fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
2702 let nonce_bytes = crate::auth::store::random_bytes(12);
2703 let mut nonce = [0u8; 12];
2704 nonce.copy_from_slice(&nonce_bytes[..12]);
2705 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
2706 let mut out = Vec::with_capacity(12 + ct.len());
2707 out.extend_from_slice(&nonce);
2708 out.extend_from_slice(&ct);
2709 out
2710}
2711
2712pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
2716 if payload.len() < 12 {
2717 return None;
2718 }
2719 let mut nonce = [0u8; 12];
2720 nonce.copy_from_slice(&payload[..12]);
2721 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
2722}
2723
2724fn split_insert_metadata(
2725 runtime: &RedDBRuntime,
2726 columns: &[String],
2727 values: &[Value],
2728) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
2729 let mut fields = Vec::new();
2730 let mut metadata = Vec::new();
2731
2732 for (column, value) in columns.iter().zip(values.iter()) {
2733 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
2735 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
2736 let (canonical_key, canonical_value) =
2737 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2738 metadata.push((canonical_key.to_string(), canonical_value));
2739 continue;
2740 }
2741 fields.push((
2742 column.clone(),
2743 runtime.resolve_crypto_sentinel(value.clone())?,
2744 ));
2745 }
2746
2747 Ok((fields, metadata))
2748}
2749
2750fn merge_with_clauses(
2752 metadata: &mut Vec<(String, MetadataValue)>,
2753 ttl_ms: Option<u64>,
2754 expires_at_ms: Option<u64>,
2755 with_metadata: &[(String, Value)],
2756) {
2757 if let Some(ms) = ttl_ms {
2758 metadata.push((
2759 "_ttl_ms".to_string(),
2760 if ms <= i64::MAX as u64 {
2761 MetadataValue::Int(ms as i64)
2762 } else {
2763 MetadataValue::Timestamp(ms)
2764 },
2765 ));
2766 }
2767 if let Some(ms) = expires_at_ms {
2768 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
2769 }
2770 for (key, value) in with_metadata {
2771 let meta_value = match value {
2772 Value::Text(s) => MetadataValue::String(s.to_string()),
2773 Value::Integer(n) => MetadataValue::Int(*n),
2774 Value::Float(n) => MetadataValue::Float(*n),
2775 Value::Boolean(b) => MetadataValue::Bool(*b),
2776 _ => MetadataValue::String(value.to_string()),
2777 };
2778 metadata.push((key.clone(), meta_value));
2779 }
2780}
2781
2782fn merge_vector_metadata_column(
2783 metadata: &mut Vec<(String, MetadataValue)>,
2784 columns: &[String],
2785 values: &[Value],
2786) -> RedDBResult<()> {
2787 let Some(value) = columns
2788 .iter()
2789 .position(|column| column.eq_ignore_ascii_case("metadata"))
2790 .map(|index| &values[index])
2791 else {
2792 return Ok(());
2793 };
2794 let json = match value {
2795 Value::Null => return Ok(()),
2796 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
2797 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2798 })?,
2799 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
2800 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2801 })?,
2802 other => {
2803 return Err(RedDBError::Query(format!(
2804 "column 'metadata' expected JSON object, got {other:?}"
2805 )))
2806 }
2807 };
2808 let parsed = metadata_from_json(&json)?;
2809 for (key, value) in parsed.iter() {
2810 metadata.push((key.clone(), value.clone()));
2811 }
2812 Ok(())
2813}
2814
2815fn apply_collection_default_ttl_metadata(
2816 runtime: &RedDBRuntime,
2817 collection: &str,
2818 metadata: &mut Vec<(String, MetadataValue)>,
2819) {
2820 if has_internal_ttl_metadata(metadata) {
2821 return;
2822 }
2823
2824 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
2825 return;
2826 };
2827
2828 metadata.push((
2829 "_ttl_ms".to_string(),
2830 if default_ttl_ms <= i64::MAX as u64 {
2831 MetadataValue::Int(default_ttl_ms as i64)
2832 } else {
2833 MetadataValue::Timestamp(default_ttl_ms)
2834 },
2835 ));
2836}
2837
2838fn ensure_non_tree_reserved_metadata_entries(
2839 metadata: &[(String, MetadataValue)],
2840) -> RedDBResult<()> {
2841 for (key, _) in metadata {
2842 ensure_non_tree_reserved_metadata_key(key)?;
2843 }
2844 Ok(())
2845}
2846
2847fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2848 if key.starts_with(TREE_METADATA_PREFIX) {
2849 return Err(RedDBError::Query(format!(
2850 "metadata key '{}' is reserved for managed trees",
2851 key
2852 )));
2853 }
2854 Ok(())
2855}
2856
2857fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2858 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2859 return Err(RedDBError::Query(format!(
2860 "edge label '{}' is reserved for managed trees",
2861 TREE_CHILD_EDGE_LABEL
2862 )));
2863 }
2864 Ok(())
2865}
2866
2867fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
2868 let mut columns = Vec::with_capacity(pairs.len());
2869 let mut values = Vec::with_capacity(pairs.len());
2870
2871 for (column, value) in pairs {
2872 columns.push(column.clone());
2873 values.push(value.clone());
2874 }
2875
2876 (columns, values)
2877}
2878
2879fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
2881 for (i, col) in columns.iter().enumerate() {
2882 if col.eq_ignore_ascii_case(name) {
2883 return Ok(values[i].clone());
2884 }
2885 }
2886 Err(RedDBError::Query(format!(
2887 "required column '{name}' not found in INSERT"
2888 )))
2889}
2890
2891fn find_column_value_string(
2893 columns: &[String],
2894 values: &[Value],
2895 name: &str,
2896) -> RedDBResult<String> {
2897 let val = find_column_value(columns, values, name)?;
2898 match val {
2899 Value::Text(s) => Ok(s.to_string()),
2900 Value::Integer(n) => Ok(n.to_string()),
2901 Value::Float(n) => Ok(n.to_string()),
2902 other => Err(RedDBError::Query(format!(
2903 "column '{name}' expected text, got {other:?}"
2904 ))),
2905 }
2906}
2907
2908fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
2909 let val = find_column_value(columns, values, name)?;
2910 match val {
2911 Value::Float(n) => Ok(n),
2912 Value::Integer(n) => Ok(n as f64),
2913 Value::UnsignedInteger(n) => Ok(n as f64),
2914 Value::Text(s) => s
2915 .parse::<f64>()
2916 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
2917 other => Err(RedDBError::Query(format!(
2918 "column '{name}' expected number, got {other:?}"
2919 ))),
2920 }
2921}
2922
2923fn find_column_value_opt_string(
2925 columns: &[String],
2926 values: &[Value],
2927 name: &str,
2928) -> Option<String> {
2929 for (i, col) in columns.iter().enumerate() {
2930 if col.eq_ignore_ascii_case(name) {
2931 return match &values[i] {
2932 Value::Null => None,
2933 Value::Text(s) => Some(s.to_string()),
2934 Value::Integer(n) => Some(n.to_string()),
2935 Value::Float(n) => Some(n.to_string()),
2936 _ => None,
2937 };
2938 }
2939 }
2940 None
2941}
2942
2943fn resolve_edge_endpoint(
2950 store: &crate::storage::unified::UnifiedStore,
2951 collection: &str,
2952 columns: &[String],
2953 values: &[Value],
2954 name: &str,
2955) -> RedDBResult<u64> {
2956 let val = find_column_value(columns, values, name)?;
2957 match val {
2958 Value::Integer(n) => Ok(n as u64),
2959 Value::UnsignedInteger(n) => Ok(n),
2960 Value::Text(s) => {
2961 if let Ok(n) = s.parse::<u64>() {
2962 return Ok(n);
2963 }
2964 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
2965 match matches.len() {
2966 0 => Err(RedDBError::Query(format!(
2967 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
2968 ))),
2969 1 => Ok(matches[0].raw()),
2970 n => Err(RedDBError::Query(format!(
2971 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
2972 ))),
2973 }
2974 }
2975 other => Err(RedDBError::Query(format!(
2976 "column '{name}' expected integer or node label, got {other:?}"
2977 ))),
2978 }
2979}
2980
2981fn resolve_edge_endpoint_any(
2982 store: &crate::storage::unified::UnifiedStore,
2983 collection: &str,
2984 columns: &[String],
2985 values: &[Value],
2986 names: &[&str],
2987) -> RedDBResult<u64> {
2988 for name in names {
2989 if columns
2990 .iter()
2991 .any(|column| column.eq_ignore_ascii_case(name))
2992 {
2993 return resolve_edge_endpoint(store, collection, columns, values, name);
2994 }
2995 }
2996
2997 Err(RedDBError::Query(format!(
2998 "required column '{}' not found in INSERT",
2999 names.first().copied().unwrap_or("from_rid")
3000 )))
3001}
3002
3003fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3005 let val = find_column_value(columns, values, name)?;
3006 match val {
3007 Value::Integer(n) => Ok(n as u64),
3008 Value::UnsignedInteger(n) => Ok(n),
3009 Value::Text(s) => s
3010 .parse::<u64>()
3011 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3012 other => Err(RedDBError::Query(format!(
3013 "column '{name}' expected integer, got {other:?}"
3014 ))),
3015 }
3016}
3017
3018fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3020 for (i, col) in columns.iter().enumerate() {
3021 if col.eq_ignore_ascii_case(name) {
3022 return match &values[i] {
3023 Value::Float(n) => Some(*n as f32),
3024 Value::Integer(n) => Some(*n as f32),
3025 Value::Null => None,
3026 _ => None,
3027 };
3028 }
3029 }
3030 None
3031}
3032
3033fn find_column_value_vec_f32(
3035 columns: &[String],
3036 values: &[Value],
3037 name: &str,
3038) -> RedDBResult<Vec<f32>> {
3039 let val = find_column_value(columns, values, name)?;
3040 match val {
3041 Value::Vector(v) => Ok(v),
3042 Value::Json(bytes) => {
3043 let s = std::str::from_utf8(&bytes).map_err(|_| {
3045 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3046 })?;
3047 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3048 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3049 })?;
3050 Ok(arr)
3051 }
3052 other => Err(RedDBError::Query(format!(
3053 "column '{name}' expected vector, got {other:?}"
3054 ))),
3055 }
3056}
3057
3058fn find_column_value_vec_f32_any(
3059 columns: &[String],
3060 values: &[Value],
3061 names: &[&str],
3062) -> RedDBResult<Vec<f32>> {
3063 for name in names {
3064 if columns
3065 .iter()
3066 .any(|column| column.eq_ignore_ascii_case(name))
3067 {
3068 return find_column_value_vec_f32(columns, values, name);
3069 }
3070 }
3071 Err(RedDBError::Query(format!(
3072 "required vector column '{}' not found in INSERT",
3073 names.join("' or '")
3074 )))
3075}
3076
3077fn extract_remaining_properties(
3079 columns: &[String],
3080 values: &[Value],
3081 exclude: &[&str],
3082) -> Vec<(String, Value)> {
3083 columns
3084 .iter()
3085 .zip(values.iter())
3086 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3087 .map(|(col, val)| (col.clone(), val.clone()))
3088 .collect()
3089}
3090
3091fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3092 let mut invalid = Vec::new();
3093 for column in columns {
3094 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3095 invalid.push(column.clone());
3096 }
3097 }
3098
3099 if invalid.is_empty() {
3100 Ok(())
3101 } else {
3102 Err(RedDBError::Query(format!(
3103 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3104 invalid.join(", ")
3105 )))
3106 }
3107}
3108
3109fn is_timeseries_insert_column(column: &str) -> bool {
3110 matches!(
3111 column.to_ascii_lowercase().as_str(),
3112 "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
3113 )
3114}
3115
3116fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3117 let mut found = None;
3118
3119 for alias in ["timestamp_ns", "timestamp", "time"] {
3120 for (index, column) in columns.iter().enumerate() {
3121 if !column.eq_ignore_ascii_case(alias) {
3122 continue;
3123 }
3124
3125 if found.is_some() {
3126 return Err(RedDBError::Query(
3127 "timeseries INSERT accepts only one timestamp column".to_string(),
3128 ));
3129 }
3130
3131 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3132 }
3133 }
3134
3135 Ok(found)
3136}
3137
3138fn find_timeseries_tags(
3139 columns: &[String],
3140 values: &[Value],
3141) -> RedDBResult<std::collections::HashMap<String, String>> {
3142 for (index, column) in columns.iter().enumerate() {
3143 if column.eq_ignore_ascii_case("tags") {
3144 return parse_timeseries_tags(&values[index]);
3145 }
3146 }
3147 Ok(std::collections::HashMap::new())
3148}
3149
3150fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3151 match value {
3152 Value::Null => Ok(std::collections::HashMap::new()),
3153 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3154 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3155 other => Err(RedDBError::Query(format!(
3156 "timeseries tags must be a JSON object or JSON text, got {other:?}"
3157 ))),
3158 }
3159}
3160
3161fn parse_timeseries_tags_json(
3162 bytes: &[u8],
3163) -> RedDBResult<std::collections::HashMap<String, String>> {
3164 let json: crate::json::Value = crate::json::from_slice(bytes)
3165 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3166
3167 let object = match json {
3168 crate::json::Value::Object(object) => object,
3169 other => {
3170 return Err(RedDBError::Query(format!(
3171 "timeseries tags must be a JSON object, got {other:?}"
3172 )))
3173 }
3174 };
3175
3176 let mut tags = std::collections::HashMap::with_capacity(object.len());
3177 for (key, value) in object {
3178 tags.insert(key, json_tag_value_to_string(&value));
3179 }
3180 Ok(tags)
3181}
3182
3183fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3184 match value {
3185 crate::json::Value::Null => "null".to_string(),
3186 crate::json::Value::Bool(value) => value.to_string(),
3187 crate::json::Value::Number(value) => value.to_string(),
3188 crate::json::Value::String(value) => value.clone(),
3189 other => other.to_string(),
3190 }
3191}
3192
3193fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3194 match value {
3195 Value::UnsignedInteger(value) => Ok(*value),
3196 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3197 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3198 Value::Text(value) => value.parse::<u64>().map_err(|_| {
3199 RedDBError::Query(format!(
3200 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3201 ))
3202 }),
3203 other => Err(RedDBError::Query(format!(
3204 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3205 ))),
3206 }
3207}
3208
3209fn current_unix_ns() -> u64 {
3210 std::time::SystemTime::now()
3211 .duration_since(std::time::UNIX_EPOCH)
3212 .unwrap_or_default()
3213 .as_nanos()
3214 .min(u128::from(u64::MAX)) as u64
3215}
3216
3217fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3218 use crate::json::{Map, Value as JV};
3219 match value {
3220 MetadataValue::Null => JV::Null,
3221 MetadataValue::Bool(value) => JV::Bool(*value),
3222 MetadataValue::Int(value) => JV::Number(*value as f64),
3223 MetadataValue::Float(value) => JV::Number(*value),
3224 MetadataValue::String(value) => JV::String(value.clone()),
3225 MetadataValue::Bytes(value) => JV::Array(
3226 value
3227 .iter()
3228 .map(|value| JV::Number(*value as f64))
3229 .collect(),
3230 ),
3231 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3232 MetadataValue::Array(values) => {
3233 JV::Array(values.iter().map(metadata_value_to_json).collect())
3234 }
3235 MetadataValue::Object(object) => {
3236 let entries = object
3237 .iter()
3238 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3239 .collect();
3240 JV::Object(entries)
3241 }
3242 MetadataValue::Geo { lat, lon } => {
3243 let mut object = Map::new();
3244 object.insert("lat".to_string(), JV::Number(*lat));
3245 object.insert("lon".to_string(), JV::Number(*lon));
3246 JV::Object(object)
3247 }
3248 MetadataValue::Reference(target) => {
3249 let mut object = Map::new();
3250 object.insert(
3251 "collection".to_string(),
3252 JV::String(target.collection().to_string()),
3253 );
3254 object.insert(
3255 "entity_id".to_string(),
3256 JV::Number(target.entity_id().raw() as f64),
3257 );
3258 JV::Object(object)
3259 }
3260 MetadataValue::References(values) => {
3261 let refs = values
3262 .iter()
3263 .map(|target| {
3264 let mut object = Map::new();
3265 object.insert(
3266 "collection".to_string(),
3267 JV::String(target.collection().to_string()),
3268 );
3269 object.insert(
3270 "entity_id".to_string(),
3271 JV::Number(target.entity_id().raw() as f64),
3272 );
3273 JV::Object(object)
3274 })
3275 .collect();
3276 JV::Array(refs)
3277 }
3278 }
3279}
3280
3281fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3282 match value {
3283 Value::Null => MetadataValue::Null,
3284 Value::Boolean(value) => MetadataValue::Bool(*value),
3285 Value::Integer(value) => MetadataValue::Int(*value),
3286 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3287 Value::Float(value) => MetadataValue::Float(*value),
3288 Value::Text(value) => MetadataValue::String(value.to_string()),
3289 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3290 Value::Timestamp(value) => {
3291 if *value >= 0 {
3292 metadata_u64_to_value(*value as u64)
3293 } else {
3294 MetadataValue::Int(*value)
3295 }
3296 }
3297 Value::TimestampMs(value) => {
3298 if *value >= 0 {
3299 metadata_u64_to_value(*value as u64)
3300 } else {
3301 MetadataValue::Int(*value)
3302 }
3303 }
3304 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3305 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3306 Value::Date(value) => MetadataValue::String(value.to_string()),
3307 Value::Time(value) => MetadataValue::String(value.to_string()),
3308 Value::Decimal(value) => MetadataValue::String(value.to_string()),
3309 Value::Ipv4(value) => MetadataValue::String(format!(
3310 "{}.{}.{}.{}",
3311 (value >> 24) & 0xFF,
3312 (value >> 16) & 0xFF,
3313 (value >> 8) & 0xFF,
3314 value & 0xFF
3315 )),
3316 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3317 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3318 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3319 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3320 lat: *lat as f64 / 1_000_000.0,
3321 lon: *lon as f64 / 1_000_000.0,
3322 },
3323 Value::BigInt(value) => MetadataValue::String(value.to_string()),
3324 Value::TableRef(value) => MetadataValue::String(value.clone()),
3325 Value::PageRef(value) => MetadataValue::Int(*value as i64),
3326 Value::Password(value) => MetadataValue::String(value.clone()),
3327 Value::Array(values) => {
3328 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3329 }
3330 _ => MetadataValue::String(value.to_string()),
3331 }
3332}
3333
3334fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3335 match value {
3336 Value::Null => Ok(MetadataValue::Null),
3337 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3338 Value::Integer(_) => Err(RedDBError::Query(format!(
3339 "column '{field}' must be non-negative for TTL metadata"
3340 ))),
3341 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3342 Value::Float(value) if value.is_finite() => {
3343 if value.fract().abs() >= f64::EPSILON {
3344 return Err(RedDBError::Query(format!(
3345 "column '{field}' must be an integer (TTL metadata must be an integer)"
3346 )));
3347 }
3348 if *value < 0.0 {
3349 return Err(RedDBError::Query(format!(
3350 "column '{field}' must be non-negative for TTL metadata"
3351 )));
3352 }
3353 if *value > u64::MAX as f64 {
3354 return Err(RedDBError::Query(format!(
3355 "column '{field}' value is too large"
3356 )));
3357 }
3358 Ok(metadata_u64_to_value(*value as u64))
3359 }
3360 Value::Float(_) => Err(RedDBError::Query(format!(
3361 "column '{field}' must be a finite number"
3362 ))),
3363 Value::Text(value) => {
3364 let value = value.trim();
3365 if let Ok(value) = value.parse::<u64>() {
3366 Ok(metadata_u64_to_value(value))
3367 } else if let Ok(value) = value.parse::<i64>() {
3368 if value < 0 {
3369 return Err(RedDBError::Query(format!(
3370 "column '{field}' must be non-negative for TTL metadata"
3371 )));
3372 }
3373 Ok(metadata_u64_to_value(value as u64))
3374 } else if let Ok(value) = value.parse::<f64>() {
3375 if !value.is_finite() {
3376 return Err(RedDBError::Query(format!(
3377 "column '{field}' must be a finite number"
3378 )));
3379 }
3380 if value.fract().abs() >= f64::EPSILON {
3381 return Err(RedDBError::Query(format!(
3382 "column '{field}' must be an integer (TTL metadata must be an integer)"
3383 )));
3384 }
3385 if value < 0.0 {
3386 return Err(RedDBError::Query(format!(
3387 "column '{field}' must be non-negative for TTL metadata"
3388 )));
3389 }
3390 if value > u64::MAX as f64 {
3391 return Err(RedDBError::Query(format!(
3392 "column '{field}' value is too large"
3393 )));
3394 }
3395 Ok(metadata_u64_to_value(value as u64))
3396 } else {
3397 Err(RedDBError::Query(format!(
3398 "column '{field}' expects a numeric value for TTL metadata"
3399 )))
3400 }
3401 }
3402 _ => Err(RedDBError::Query(format!(
3403 "column '{field}' expects a numeric value for TTL metadata"
3404 ))),
3405 }
3406}
3407
3408fn metadata_u64_to_value(value: u64) -> MetadataValue {
3409 if value <= i64::MAX as u64 {
3410 MetadataValue::Int(value as i64)
3411 } else {
3412 MetadataValue::Timestamp(value)
3413 }
3414}
3415
3416fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3422 let json = match value {
3423 Value::Null => return false,
3424 Value::Json(bytes) | Value::Blob(bytes) => {
3425 match crate::json::from_slice::<crate::json::Value>(bytes) {
3426 Ok(v) => v,
3427 Err(_) => return false,
3428 }
3429 }
3430 Value::Text(s) => {
3431 let trimmed = s.trim_start();
3432 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3433 return false;
3434 }
3435 match crate::json::from_str::<crate::json::Value>(s) {
3436 Ok(v) => v,
3437 Err(_) => return false,
3438 }
3439 }
3440 _ => return false,
3441 };
3442 let mut cursor = &json;
3443 for seg in tail.split('.') {
3444 match cursor {
3445 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3446 Some((_, v)) => cursor = v,
3447 None => return false,
3448 },
3449 _ => return false,
3450 }
3451 }
3452 !matches!(cursor, crate::json::Value::Null)
3453}
3454
3455fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3466 let mut root = match current {
3467 Value::Null => crate::json::Value::Object(Default::default()),
3468 Value::Json(bytes) | Value::Blob(bytes) => {
3469 crate::json::from_slice(&bytes).map_err(|err| {
3470 RedDBError::Query(format!(
3471 "tenant auto-fill: root column is not valid JSON ({err})"
3472 ))
3473 })?
3474 }
3475 Value::Text(s) => {
3476 if s.trim().is_empty() {
3477 crate::json::Value::Object(Default::default())
3478 } else {
3479 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3480 RedDBError::Query(format!(
3481 "tenant auto-fill: text root is not valid JSON ({err})"
3482 ))
3483 })?
3484 }
3485 }
3486 other => {
3487 return Err(RedDBError::Query(format!(
3488 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
3489 )));
3490 }
3491 };
3492
3493 let segments: Vec<&str> = tail.split('.').collect();
3495 let mut cursor: &mut crate::json::Value = &mut root;
3496 for (i, seg) in segments.iter().enumerate() {
3497 let is_last = i + 1 == segments.len();
3498 let map = match cursor {
3499 crate::json::Value::Object(m) => m,
3500 _ => {
3501 return Err(RedDBError::Query(format!(
3502 "tenant auto-fill: segment '{seg}' is not inside an object"
3503 )));
3504 }
3505 };
3506 if is_last {
3507 map.insert(
3508 seg.to_string(),
3509 crate::json::Value::String(tenant_id.to_string()),
3510 );
3511 break;
3512 }
3513 cursor = map
3514 .entry(seg.to_string())
3515 .or_insert_with(|| crate::json::Value::Object(Default::default()));
3516 }
3517
3518 let bytes = crate::json::to_vec(&root).map_err(|err| {
3519 RedDBError::Query(format!(
3520 "tenant auto-fill: failed to re-serialize JSON ({err})"
3521 ))
3522 })?;
3523 Ok(Value::Json(bytes))
3524}
3525
3526#[cfg(test)]
3527mod tests {
3528 use crate::storage::schema::Value;
3529 use crate::storage::wal::{WalReader, WalRecord};
3530 use crate::{RedDBOptions, RedDBRuntime};
3531 use std::path::Path;
3532
3533 fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
3534 WalReader::open(wal_path)
3535 .expect("wal opens")
3536 .iter()
3537 .map(|record| record.expect("wal record decodes").1)
3538 .filter_map(|record| match record {
3539 WalRecord::TxCommitBatch { actions, .. } => Some(actions),
3540 _ => None,
3541 })
3542 .collect()
3543 }
3544
3545 fn action_contains_text(action: &[u8], needle: &str) -> bool {
3546 action
3547 .windows(needle.len())
3548 .any(|window| window == needle.as_bytes())
3549 }
3550
3551 fn assert_statement_writes_collections_in_one_new_wal_batch(
3552 rt: &RedDBRuntime,
3553 wal_path: &Path,
3554 statement: &str,
3555 source: &str,
3556 event_queue: &str,
3557 ) {
3558 let before_batches = store_commit_batches(wal_path).len();
3559
3560 rt.execute_query(statement).unwrap();
3561
3562 let batches = store_commit_batches(wal_path);
3563 let statement_batches = &batches[before_batches..];
3564 let source_batch = statement_batches
3565 .iter()
3566 .position(|actions| {
3567 actions.iter().any(|action| {
3568 action_contains_text(action, source)
3569 && !action_contains_text(action, event_queue)
3570 })
3571 })
3572 .expect("source collection write batch is present");
3573 let event_batch = statement_batches
3574 .iter()
3575 .position(|actions| {
3576 actions
3577 .iter()
3578 .any(|action| action_contains_text(action, event_queue))
3579 })
3580 .expect("event queue write batch is present");
3581
3582 assert_eq!(
3583 source_batch, event_batch,
3584 "WITH EVENTS must persist the source write and queue event in the same WAL batch"
3585 );
3586 }
3587
3588 #[test]
3589 fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
3590 let dir = tempfile::tempdir().unwrap();
3591 let db_path = dir.path().join("events_dual_write.rdb");
3592 let wal_path = db_path.with_extension("rdb-uwal");
3593 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3594
3595 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3596 .unwrap();
3597 assert_statement_writes_collections_in_one_new_wal_batch(
3598 &rt,
3599 &wal_path,
3600 "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
3601 "users",
3602 "users_events",
3603 );
3604 }
3605
3606 #[test]
3607 fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
3608 let dir = tempfile::tempdir().unwrap();
3609 let db_path = dir.path().join("events_update_atomic.rdb");
3610 let wal_path = db_path.with_extension("rdb-uwal");
3611 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3612
3613 rt.execute_query(
3614 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
3615 )
3616 .unwrap();
3617 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
3618 .unwrap();
3619
3620 assert_statement_writes_collections_in_one_new_wal_batch(
3621 &rt,
3622 &wal_path,
3623 "UPDATE users SET email = 'b@example.test' WHERE id = 1",
3624 "users",
3625 "user_updates",
3626 );
3627 }
3628
3629 #[test]
3630 fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
3631 let dir = tempfile::tempdir().unwrap();
3632 let db_path = dir.path().join("events_delete_atomic.rdb");
3633 let wal_path = db_path.with_extension("rdb-uwal");
3634 let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3635
3636 rt.execute_query(
3637 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
3638 )
3639 .unwrap();
3640 rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
3641 .unwrap();
3642
3643 assert_statement_writes_collections_in_one_new_wal_batch(
3644 &rt,
3645 &wal_path,
3646 "DELETE FROM users WHERE id = 1",
3647 "users",
3648 "user_deletes",
3649 );
3650 }
3651
3652 #[test]
3653 fn update_where_id_in_with_hash_index_updates_expected_rows() {
3654 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3655 rt.execute_query("CREATE TABLE users (id INT, score INT)")
3656 .unwrap();
3657 for id in 0..5 {
3658 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
3659 .unwrap();
3660 }
3661 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
3662 .unwrap();
3663
3664 let updated = rt
3665 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
3666 .unwrap();
3667 assert_eq!(updated.affected_rows, 3);
3668
3669 let selected = rt
3670 .execute_query("SELECT id, score FROM users ORDER BY id")
3671 .unwrap();
3672 let scores: Vec<(i64, i64)> = selected
3673 .result
3674 .records
3675 .iter()
3676 .map(|record| {
3677 let id = match record.get("id").unwrap() {
3678 Value::Integer(value) => *value,
3679 other => panic!("expected integer id, got {other:?}"),
3680 };
3681 let score = match record.get("score").unwrap() {
3682 Value::Integer(value) => *value,
3683 other => panic!("expected integer score, got {other:?}"),
3684 };
3685 (id, score)
3686 })
3687 .collect();
3688 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
3689 }
3690
3691 #[test]
3698 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
3699 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3700 rt.execute_query("CREATE TABLE items (id INT, score INT)")
3701 .unwrap();
3702 for id in 0..5 {
3703 rt.execute_query(&format!(
3704 "INSERT INTO items (id, score) VALUES ({id}, {})",
3705 id * 10
3706 ))
3707 .unwrap();
3708 }
3709 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
3710 .unwrap();
3711
3712 let updated_one = rt
3716 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
3717 .unwrap();
3718 assert_eq!(updated_one.affected_rows, 1);
3719
3720 let updated_many = rt
3724 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
3725 .unwrap();
3726 assert_eq!(updated_many.affected_rows, 2);
3727
3728 let snapshot = rt
3729 .execute_query("SELECT id, score FROM items ORDER BY id")
3730 .unwrap();
3731 let pairs: Vec<(i64, i64)> = snapshot
3732 .result
3733 .records
3734 .iter()
3735 .map(|record| {
3736 let id = match record.get("id").unwrap() {
3737 Value::Integer(value) => *value,
3738 other => panic!("expected integer id, got {other:?}"),
3739 };
3740 let score = match record.get("score").unwrap() {
3741 Value::Integer(value) => *value,
3742 other => panic!("expected integer score, got {other:?}"),
3743 };
3744 (id, score)
3745 })
3746 .collect();
3747 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
3748
3749 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
3751 assert_eq!(updated_all.affected_rows, 5);
3752 let after = rt
3753 .execute_query("SELECT score FROM items ORDER BY id")
3754 .unwrap();
3755 let scores: Vec<i64> = after
3756 .result
3757 .records
3758 .iter()
3759 .map(|record| match record.get("score").unwrap() {
3760 Value::Integer(value) => *value,
3761 other => panic!("expected integer score, got {other:?}"),
3762 })
3763 .collect();
3764 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
3765 }
3766
3767 #[test]
3773 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
3774 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3775 rt.execute_query("CREATE TABLE items (id INT, score INT)")
3776 .unwrap();
3777 for id in 0..5 {
3778 rt.execute_query(&format!(
3779 "INSERT INTO items (id, score) VALUES ({id}, {})",
3780 id * 10
3781 ))
3782 .unwrap();
3783 }
3784 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
3785 .unwrap();
3786
3787 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3790 assert_eq!(deleted_one.affected_rows, 1);
3791
3792 let deleted_many = rt
3796 .execute_query("DELETE FROM items WHERE score > 25")
3797 .unwrap();
3798 assert_eq!(deleted_many.affected_rows, 2);
3799
3800 let surviving = rt
3801 .execute_query("SELECT id FROM items ORDER BY id")
3802 .unwrap();
3803 let ids: Vec<i64> = surviving
3804 .result
3805 .records
3806 .iter()
3807 .map(|record| match record.get("id").unwrap() {
3808 Value::Integer(value) => *value,
3809 other => panic!("expected integer id, got {other:?}"),
3810 })
3811 .collect();
3812 assert_eq!(ids, vec![0, 1]);
3813
3814 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
3816 assert_eq!(deleted_rest.affected_rows, 2);
3817 let empty = rt.execute_query("SELECT id FROM items").unwrap();
3818 assert!(empty.result.records.is_empty());
3819 }
3820
3821 #[test]
3826 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
3827 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3828 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
3829 .unwrap();
3830
3831 let inserted = rt
3834 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
3835 .unwrap();
3836 assert_eq!(inserted.affected_rows, 1);
3837
3838 let update_err = rt
3840 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
3841 .unwrap_err();
3842 let msg = format!("{update_err}");
3843 assert!(
3844 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
3845 "expected UPDATE rejection message, got: {msg}"
3846 );
3847
3848 let delete_err = rt
3850 .execute_query("DELETE FROM events WHERE id = 1")
3851 .unwrap_err();
3852 let msg = format!("{delete_err}");
3853 assert!(
3854 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
3855 "expected DELETE rejection message, got: {msg}"
3856 );
3857
3858 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
3861 assert_eq!(surviving.result.records.len(), 1);
3862 }
3863
3864 #[test]
3868 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
3869 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3870 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
3871 .unwrap();
3872
3873 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
3874 .unwrap();
3875 let updated = rt
3876 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
3877 .unwrap();
3878 assert_eq!(updated.affected_rows, 1);
3879 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
3880 assert_eq!(deleted.affected_rows, 1);
3881 }
3882
3883 #[test]
3884 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
3885 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3886 rt.execute_query(
3887 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
3888 )
3889 .unwrap();
3890
3891 let inserted = rt
3892 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
3893 .unwrap();
3894 assert_eq!(inserted.affected_rows, 1);
3895
3896 let events = queue_payloads(&rt, "audit_log");
3897 assert_eq!(events.len(), 1);
3898 let event = events[0].as_object().expect("event payload object");
3899 assert!(event
3900 .get("event_id")
3901 .and_then(crate::json::Value::as_str)
3902 .is_some_and(|value| !value.is_empty()));
3903 assert_eq!(
3904 event.get("op").and_then(crate::json::Value::as_str),
3905 Some("insert")
3906 );
3907 assert_eq!(
3908 event.get("collection").and_then(crate::json::Value::as_str),
3909 Some("users")
3910 );
3911 assert_eq!(
3912 event.get("id").and_then(crate::json::Value::as_u64),
3913 Some(7)
3914 );
3915 assert!(event
3916 .get("ts")
3917 .and_then(crate::json::Value::as_u64)
3918 .is_some());
3919 assert!(event
3920 .get("lsn")
3921 .and_then(crate::json::Value::as_u64)
3922 .is_some());
3923 assert!(matches!(
3924 event.get("tenant"),
3925 Some(crate::json::Value::Null)
3926 ));
3927 assert!(matches!(
3928 event.get("before"),
3929 Some(crate::json::Value::Null)
3930 ));
3931 let after = event
3932 .get("after")
3933 .and_then(crate::json::Value::as_object)
3934 .expect("after object");
3935 assert_eq!(
3936 after.get("id").and_then(crate::json::Value::as_u64),
3937 Some(7)
3938 );
3939 assert_eq!(
3940 after.get("email").and_then(crate::json::Value::as_str),
3941 Some("a@example.com")
3942 );
3943 }
3944
3945 #[test]
3946 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
3947 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3948 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3949 .unwrap();
3950
3951 rt.execute_query(
3952 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
3953 )
3954 .unwrap();
3955
3956 let events = queue_payloads(&rt, "users_events");
3957 assert_eq!(events.len(), 2);
3958 let mut previous_lsn = 0;
3959 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
3960 let object = event.as_object().expect("event payload object");
3961 assert_eq!(
3962 object.get("op").and_then(crate::json::Value::as_str),
3963 Some("insert")
3964 );
3965 assert_eq!(
3966 object.get("id").and_then(crate::json::Value::as_u64),
3967 Some(expected_id)
3968 );
3969 let lsn = object
3970 .get("lsn")
3971 .and_then(crate::json::Value::as_u64)
3972 .expect("event lsn");
3973 assert!(
3974 lsn > previous_lsn,
3975 "event LSNs should increase in row order"
3976 );
3977 previous_lsn = lsn;
3978 let after = object
3979 .get("after")
3980 .and_then(crate::json::Value::as_object)
3981 .expect("after object");
3982 assert_eq!(
3983 after.get("id").and_then(crate::json::Value::as_u64),
3984 Some(expected_id)
3985 );
3986 }
3987 }
3988
3989 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
3990 let result = rt
3991 .execute_query(&format!("QUEUE PEEK {queue} 10"))
3992 .expect("peek queue");
3993 result
3994 .result
3995 .records
3996 .iter()
3997 .map(
3998 |record| match record.get("payload").expect("payload column") {
3999 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4000 other => panic!("expected JSON queue payload, got {other:?}"),
4001 },
4002 )
4003 .collect()
4004 }
4005
4006 #[test]
4018 fn auto_index_id_fires_on_first_insert() {
4019 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4020 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4021 .unwrap();
4022
4023 assert!(
4025 rt.index_store_ref()
4026 .find_index_for_column("bench_users", "id")
4027 .is_none(),
4028 "freshly created collection should not have an `id` index"
4029 );
4030
4031 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4033 .unwrap();
4034
4035 let registered = rt
4037 .index_store_ref()
4038 .find_index_for_column("bench_users", "id")
4039 .expect("auto-index hook should have registered idx_id on first insert");
4040 assert_eq!(registered.name, "idx_id");
4041 assert_eq!(registered.collection, "bench_users");
4042 assert_eq!(registered.columns, vec!["id".to_string()]);
4043 assert!(matches!(
4044 registered.method,
4045 super::super::index_store::IndexMethodKind::Hash
4046 ));
4047
4048 for id in 2..=5 {
4051 rt.execute_query(&format!(
4052 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4053 id * 10
4054 ))
4055 .unwrap();
4056 }
4057 for id in 1..=5 {
4058 let result = rt
4059 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4060 .unwrap();
4061 assert_eq!(
4062 result.result.records.len(),
4063 1,
4064 "id={id} should match one row"
4065 );
4066 }
4067
4068 let deleted = rt
4073 .execute_query("DELETE FROM bench_users WHERE id = 3")
4074 .unwrap();
4075 assert_eq!(deleted.affected_rows, 1);
4076 }
4077
4078 #[test]
4083 fn auto_index_id_fires_on_first_bulk_insert() {
4084 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4085 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4086 .unwrap();
4087
4088 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4089 .unwrap();
4090
4091 let registered = rt
4092 .index_store_ref()
4093 .find_index_for_column("bench_bulk", "id")
4094 .expect("auto-index hook should fire on first bulk insert");
4095 assert_eq!(registered.name, "idx_id");
4096
4097 for id in 1..=3 {
4099 let result = rt
4100 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4101 .unwrap();
4102 assert_eq!(result.result.records.len(), 1);
4103 }
4104 }
4105
4106 #[test]
4110 fn auto_index_id_skips_when_no_id_column() {
4111 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4112 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4113 .unwrap();
4114 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4115 .unwrap();
4116
4117 assert!(rt
4118 .index_store_ref()
4119 .find_index_for_column("plain", "id")
4120 .is_none());
4121 assert!(rt
4122 .index_store_ref()
4123 .find_index_for_column("plain", "uid")
4124 .is_none());
4125 }
4126
4127 #[test]
4132 fn auto_index_id_skips_when_index_already_exists() {
4133 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4134 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4135 .unwrap();
4136 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4138 .unwrap();
4139 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4140 .unwrap();
4141
4142 let registered = rt
4143 .index_store_ref()
4144 .find_index_for_column("pre", "id")
4145 .expect("user index should still be there");
4146 assert_eq!(
4147 registered.name, "user_idx",
4148 "auto-index hook must not overwrite an existing index"
4149 );
4150 }
4151
4152 #[test]
4156 fn auto_index_id_dropped_with_collection() {
4157 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4158 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4159 .unwrap();
4160 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4161 .unwrap();
4162 assert!(rt
4163 .index_store_ref()
4164 .find_index_for_column("ephemeral", "id")
4165 .is_some());
4166
4167 rt.execute_query("DROP TABLE ephemeral").unwrap();
4168
4169 assert!(
4170 rt.index_store_ref()
4171 .find_index_for_column("ephemeral", "id")
4172 .is_none(),
4173 "implicit `idx_id` must be reaped when its collection drops"
4174 );
4175 }
4176
4177 #[test]
4182 fn auto_index_id_disabled_by_config() {
4183 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4184 let rt = RedDBRuntime::with_options(opts).unwrap();
4185
4186 rt.execute_query("CREATE TABLE off (id INT, score INT)")
4187 .unwrap();
4188 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4189 .unwrap();
4190
4191 assert!(
4192 rt.index_store_ref()
4193 .find_index_for_column("off", "id")
4194 .is_none(),
4195 "with auto_index_id=false, no implicit index should be created"
4196 );
4197 }
4198
4199 #[test]
4202 fn update_single_row_emits_update_event() {
4203 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4204 rt.execute_query(
4205 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4206 )
4207 .unwrap();
4208 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4209 .unwrap();
4210
4211 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4212 .unwrap();
4213
4214 let events = queue_payloads(&rt, "audit_log");
4215 assert_eq!(events.len(), 1, "expected exactly 1 update event");
4216 let event = events[0].as_object().expect("event payload object");
4217 assert_eq!(
4218 event.get("op").and_then(crate::json::Value::as_str),
4219 Some("update")
4220 );
4221 assert_eq!(
4222 event.get("collection").and_then(crate::json::Value::as_str),
4223 Some("users")
4224 );
4225 assert!(event
4226 .get("event_id")
4227 .and_then(crate::json::Value::as_str)
4228 .is_some_and(|v| !v.is_empty()));
4229 let before = event
4230 .get("before")
4231 .and_then(crate::json::Value::as_object)
4232 .expect("before must be an object");
4233 let after = event
4234 .get("after")
4235 .and_then(crate::json::Value::as_object)
4236 .expect("after must be an object");
4237 assert_eq!(
4238 before.get("name").and_then(crate::json::Value::as_str),
4239 Some("Alice"),
4240 "before.name should be the old value"
4241 );
4242 assert_eq!(
4243 after.get("name").and_then(crate::json::Value::as_str),
4244 Some("Bob"),
4245 "after.name should be the new value"
4246 );
4247 }
4248
4249 #[test]
4250 fn update_event_only_includes_changed_fields() {
4251 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4252 rt.execute_query(
4253 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4254 )
4255 .unwrap();
4256 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4257 .unwrap();
4258
4259 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4260 .unwrap();
4261
4262 let events = queue_payloads(&rt, "evts");
4263 assert_eq!(events.len(), 1);
4264 let event = events[0].as_object().unwrap();
4265 let before = event
4266 .get("before")
4267 .and_then(crate::json::Value::as_object)
4268 .unwrap();
4269 let after = event
4270 .get("after")
4271 .and_then(crate::json::Value::as_object)
4272 .unwrap();
4273 assert!(
4275 before.contains_key("name"),
4276 "before must include changed field"
4277 );
4278 assert!(
4279 after.contains_key("name"),
4280 "after must include changed field"
4281 );
4282 assert!(
4284 !before.contains_key("email"),
4285 "before must not include unchanged email"
4286 );
4287 assert!(
4288 !after.contains_key("email"),
4289 "after must not include unchanged email"
4290 );
4291 }
4292
4293 #[test]
4294 fn multi_row_update_emits_one_event_per_row() {
4295 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4296 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4297 .unwrap();
4298 rt.execute_query(
4299 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4300 )
4301 .unwrap();
4302
4303 rt.execute_query("UPDATE items SET status = 'done'")
4304 .unwrap();
4305
4306 let events = queue_payloads(&rt, "evts");
4307 assert_eq!(events.len(), 3, "expected one update event per row");
4308 for event in &events {
4309 let obj = event.as_object().unwrap();
4310 assert_eq!(
4311 obj.get("op").and_then(crate::json::Value::as_str),
4312 Some("update")
4313 );
4314 }
4315 }
4316
4317 #[test]
4318 fn delete_single_row_emits_delete_event() {
4319 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4320 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4321 .unwrap();
4322 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4323 .unwrap();
4324
4325 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4326
4327 let events = queue_payloads(&rt, "del_log");
4328 assert_eq!(events.len(), 1);
4329 let event = events[0].as_object().expect("event payload object");
4330 assert_eq!(
4331 event.get("op").and_then(crate::json::Value::as_str),
4332 Some("delete")
4333 );
4334 assert_eq!(
4335 event.get("collection").and_then(crate::json::Value::as_str),
4336 Some("users")
4337 );
4338 assert!(event
4339 .get("event_id")
4340 .and_then(crate::json::Value::as_str)
4341 .is_some_and(|v| !v.is_empty()));
4342 let before = event
4343 .get("before")
4344 .and_then(crate::json::Value::as_object)
4345 .expect("before must be an object for delete");
4346 assert_eq!(
4347 before.get("id").and_then(crate::json::Value::as_u64),
4348 Some(42)
4349 );
4350 assert_eq!(
4351 before.get("name").and_then(crate::json::Value::as_str),
4352 Some("Alice")
4353 );
4354 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4355 }
4356
4357 #[test]
4358 fn multi_row_delete_emits_one_event_per_row() {
4359 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4360 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4361 .unwrap();
4362 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4363 .unwrap();
4364
4365 rt.execute_query("DELETE FROM items").unwrap();
4366
4367 let events = queue_payloads(&rt, "del_log");
4368 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4369 for event in &events {
4370 let obj = event.as_object().unwrap();
4371 assert_eq!(
4372 obj.get("op").and_then(crate::json::Value::as_str),
4373 Some("delete")
4374 );
4375 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4376 }
4377 }
4378
4379 #[test]
4380 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4381 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4382 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4383 .unwrap();
4384
4385 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4386 .unwrap();
4387 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4388
4389 let events = queue_payloads(&rt, "evts");
4390 assert!(
4391 events.is_empty(),
4392 "UPDATE-only filter must not emit INSERT or DELETE events"
4393 );
4394 }
4395
4396 #[test]
4399 fn suppress_events_on_insert_emits_no_events() {
4400 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4401 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4402 .unwrap();
4403
4404 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4405 .unwrap();
4406
4407 let events = queue_payloads(&rt, "evts");
4408 assert!(
4409 events.is_empty(),
4410 "SUPPRESS EVENTS must prevent INSERT events"
4411 );
4412 }
4413
4414 #[test]
4415 fn suppress_events_on_update_emits_no_events() {
4416 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4417 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4418 .unwrap();
4419 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4420 .unwrap();
4421 let _ = queue_payloads(&rt, "evts");
4423 rt.execute_query("QUEUE PURGE evts").unwrap();
4425
4426 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4427 .unwrap();
4428
4429 let events = queue_payloads(&rt, "evts");
4430 assert!(
4431 events.is_empty(),
4432 "SUPPRESS EVENTS must prevent UPDATE events"
4433 );
4434 }
4435
4436 #[test]
4437 fn suppress_events_on_delete_emits_no_events() {
4438 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4439 rt.execute_query(
4440 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4441 )
4442 .unwrap();
4443 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4444 .unwrap();
4445
4446 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4447 .unwrap();
4448
4449 let events = queue_payloads(&rt, "evts");
4450 assert!(
4451 events.is_empty(),
4452 "SUPPRESS EVENTS must prevent DELETE events"
4453 );
4454 }
4455
4456 #[test]
4457 fn normal_insert_after_suppress_still_emits() {
4458 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4459 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4460 .unwrap();
4461
4462 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4463 .unwrap();
4464 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4465 .unwrap();
4466
4467 let events = queue_payloads(&rt, "evts");
4468 assert_eq!(
4469 events.len(),
4470 1,
4471 "only the non-suppressed INSERT should emit"
4472 );
4473 assert_eq!(
4474 events[0].get("id").and_then(crate::json::Value::as_u64),
4475 Some(2)
4476 );
4477 }
4478}