1use crate::application::entity::{
10 AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput, CreateKvInput, CreateNodeInput,
11 CreateRowInput, CreateRowsBatchInput, CreateVectorInput, DeleteEntityInput,
12 PatchEntityOperation, PatchEntityOperationType, RowUpdateColumnRule, RowUpdateContractPlan,
13};
14use crate::application::ports::{
15 build_row_update_contract_plan, normalize_row_update_assignment_with_plan,
16 normalize_row_update_value_for_rule, RuntimeEntityPort,
17};
18use crate::application::ttl_payload::has_internal_ttl_metadata;
19use crate::presentation::entity_json::storage_value_to_json;
20use crate::storage::query::ast::{Expr, ReturningItem};
21use crate::storage::query::sql_lowering::{
22 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
23};
24use crate::storage::query::unified::{sys_key_red_entity_id, UnifiedRecord, UnifiedResult};
25use crate::storage::unified::MetadataValue;
26use crate::storage::Metadata;
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use super::*;
31
32const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
33const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
34const TREE_METADATA_PREFIX: &str = "red.tree.";
35
36#[derive(Clone)]
37struct CompiledUpdateAssignment {
38 column: String,
39 expr: Expr,
40 metadata_key: Option<&'static str>,
41 row_rule: Option<RowUpdateColumnRule>,
42}
43
44struct CompiledUpdatePlan {
45 static_field_assignments: Vec<(String, Value)>,
46 static_metadata_assignments: Vec<(String, MetadataValue)>,
47 dynamic_assignments: Vec<CompiledUpdateAssignment>,
48 row_contract_plan: Option<RowUpdateContractPlan>,
49 row_modified_columns: Vec<String>,
50 row_touches_unique_columns: bool,
51}
52
53#[derive(Default)]
54struct MaterializedUpdateAssignments {
55 dynamic_field_assignments: Vec<(String, Value)>,
56 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
57}
58
59impl RedDBRuntime {
60 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
74 let Some(tenant_col) = self.tenant_column(&query.table) else {
75 return Ok(None);
76 };
77 if query
79 .columns
80 .iter()
81 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
82 {
83 return Ok(None);
84 }
85
86 if let Some(dot_pos) = tenant_col.find('.') {
92 let (root, tail) = tenant_col.split_at(dot_pos);
93 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
95 }
96
97 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
98 return Err(RedDBError::Query(format!(
99 "INSERT into tenant-scoped table '{}' requires an active tenant — \
100 run SET TENANT '<id>' first or name column '{}' explicitly",
101 query.table, tenant_col
102 )));
103 };
104
105 let mut augmented = query.clone();
106 augmented.columns.push(tenant_col);
107 let lit = Value::text(tenant_id.clone());
108 for row in augmented.values.iter_mut() {
109 row.push(lit.clone());
110 }
111 for row in augmented.value_exprs.iter_mut() {
112 row.push(crate::storage::query::ast::Expr::Literal {
113 value: lit.clone(),
114 span: crate::storage::query::ast::Span::synthetic(),
115 });
116 }
117 Ok(Some(augmented))
118 }
119
120 fn inject_dotted_tenant(
130 &self,
131 query: &InsertQuery,
132 root: &str,
133 tail: &str,
134 ) -> RedDBResult<Option<InsertQuery>> {
135 let active_tenant = crate::runtime::impl_core::current_tenant();
136 let mut augmented = query.clone();
137 let root_idx = augmented
138 .columns
139 .iter()
140 .position(|c| c.eq_ignore_ascii_case(root));
141
142 if let Some(idx) = root_idx {
143 for row in augmented.values.iter_mut() {
149 let Some(slot) = row.get_mut(idx) else {
150 continue;
151 };
152 if dotted_tail_already_set(slot, tail) {
153 continue;
154 }
155 let Some(tenant_id) = &active_tenant else {
156 return Err(RedDBError::Query(format!(
157 "INSERT into tenant-scoped table '{}' requires an active tenant — \
158 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
159 query.table, root, tail
160 )));
161 };
162 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
163 }
164 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
168 if let Some(slot) = row.get_mut(idx) {
169 let new_value = augmented
170 .values
171 .get(row_idx)
172 .and_then(|v| v.get(idx))
173 .cloned()
174 .unwrap_or(Value::Null);
175 *slot = crate::storage::query::ast::Expr::Literal {
176 value: new_value,
177 span: crate::storage::query::ast::Span::synthetic(),
178 };
179 }
180 }
181 } else {
182 let Some(tenant_id) = &active_tenant else {
186 return Err(RedDBError::Query(format!(
187 "INSERT into tenant-scoped table '{}' requires an active tenant — \
188 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
189 query.table, root, tail
190 )));
191 };
192 augmented.columns.push(root.to_string());
194 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
195 for row in augmented.values.iter_mut() {
196 row.push(fresh.clone());
197 }
198 for row in augmented.value_exprs.iter_mut() {
199 row.push(crate::storage::query::ast::Expr::Literal {
200 value: fresh.clone(),
201 span: crate::storage::query::ast::Span::synthetic(),
202 });
203 }
204 }
205
206 Ok(Some(augmented))
207 }
208
209 fn delete_entities_batch(
212 &self,
213 collection: &str,
214 ids: &[EntityId],
215 ) -> RedDBResult<(u64, Vec<u64>)> {
216 if ids.is_empty() {
217 return Ok((0, vec![]));
218 }
219
220 if let Some(xid) = self.current_xid() {
226 let store = self.db().store();
227 let Some(manager) = store.get_collection(collection) else {
228 return Ok((0, vec![]));
229 };
230 let conn_id = crate::runtime::impl_core::current_connection_id();
231 let mut marked: u64 = 0;
232 for &id in ids {
233 let Some(mut entity) = manager.get(id) else {
234 continue;
235 };
236 if entity.xmax != 0 {
239 continue;
240 }
241 entity.set_xmax(xid);
242 if manager.update(entity).is_ok() {
243 self.record_pending_tombstone(conn_id, collection, id, xid);
244 marked += 1;
245 }
246 }
247 return Ok((marked, vec![]));
248 }
249
250 let store = self.db().store();
251 let deleted_ids = store
252 .delete_batch(collection, ids)
253 .map_err(|err| RedDBError::Internal(err.to_string()))?;
254 if deleted_ids.is_empty() {
255 return Ok((0, vec![]));
256 }
257
258 let mut lsns = Vec::with_capacity(deleted_ids.len());
259 for id in &deleted_ids {
260 store.context_index().remove_entity(*id);
261 let lsn = self.cdc_emit(
262 crate::replication::cdc::ChangeOperation::Delete,
263 collection,
264 id.raw(),
265 "entity",
266 );
267 lsns.push(lsn);
268 }
269
270 Ok((deleted_ids.len() as u64, lsns))
271 }
272
273 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> Vec<u64> {
276 if applied.is_empty() {
277 return Vec::new();
278 }
279
280 let store = self.db().store();
281 if applied.iter().any(|item| item.context_index_dirty) {
282 store.context_index().index_entities(
283 &applied[0].collection,
284 applied
285 .iter()
286 .filter(|item| item.context_index_dirty)
287 .map(|item| &item.entity),
288 );
289 }
290
291 let mut lsns = Vec::with_capacity(applied.len());
292 for item in applied {
293 let lsn = self.cdc_emit_prebuilt(
294 crate::replication::cdc::ChangeOperation::Update,
295 &item.collection,
296 &item.entity,
297 "entity",
298 item.metadata.as_ref(),
299 false,
300 );
301 lsns.push(lsn);
302 }
303 lsns
304 }
305
306 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
307 self.persist_applied_entity_mutations(applied)
308 }
309
310 pub fn execute_insert(
315 &self,
316 raw_query: &str,
317 query: &InsertQuery,
318 ) -> RedDBResult<RuntimeQueryResult> {
319 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
320 crate::runtime::collection_contract::CollectionContractGate::check(
326 self,
327 &query.table,
328 crate::runtime::collection_contract::MutationKind::Insert,
329 )?;
330 let augmented_owned;
339 let query = match self.maybe_inject_tenant_column(query)? {
340 Some(new_q) => {
341 augmented_owned = new_q;
342 &augmented_owned
343 }
344 None => query,
345 };
346 self.check_insert_column_policy(query)?;
347
348 let mut inserted_count: u64 = 0;
349 let effective_rows =
350 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
351
352 let store = self.inner.db.store();
354 let _ = store.get_or_create_collection(&query.table);
355 let declared_model = self
356 .db()
357 .collection_contract_arc(&query.table)
358 .map(|contract| contract.declared_model);
359
360 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
361 if query.returning.is_some() {
362 Some(Vec::with_capacity(effective_rows.len()))
363 } else {
364 None
365 };
366 let mut returning_result: Option<UnifiedResult> = None;
367
368 if matches!(query.entity_type, InsertEntityType::Row)
369 && !matches!(
370 declared_model,
371 Some(crate::catalog::CollectionModel::TimeSeries)
372 )
373 {
374 let mut rows = Vec::with_capacity(effective_rows.len());
375 for row_values in &effective_rows {
376 if row_values.len() != query.columns.len() {
377 return Err(RedDBError::Query(format!(
378 "INSERT column count ({}) does not match value count ({})",
379 query.columns.len(),
380 row_values.len()
381 )));
382 }
383 let (fields, mut metadata) =
384 split_insert_metadata(self, &query.columns, row_values)?;
385 merge_with_clauses(
386 &mut metadata,
387 query.ttl_ms,
388 query.expires_at_ms,
389 &query.with_metadata,
390 );
391 if let Some(snaps) = returning_snapshots.as_mut() {
392 snaps.push(fields.clone());
393 }
394 rows.push(CreateRowInput {
395 collection: query.table.clone(),
396 fields,
397 metadata,
398 node_links: Vec::new(),
399 vector_links: Vec::new(),
400 });
401 }
402 let outputs = self.create_rows_batch(CreateRowsBatchInput {
403 collection: query.table.clone(),
404 rows,
405 suppress_events: query.suppress_events,
406 })?;
407 inserted_count = outputs.len() as u64;
408
409 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
417 let time_col = &spec.time_column;
418 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
420 for row in &effective_rows {
421 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
422 if *n >= 0 {
423 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
424 }
425 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
426 let _ = self.inner.db.hypertables().route(&query.table, *n);
427 }
428 }
429 }
430 }
431
432 if let (Some(items), Some(snaps)) =
433 (query.returning.as_ref(), returning_snapshots.take())
434 {
435 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
436 }
437 } else {
438 if query.returning.is_some() {
439 return Err(RedDBError::Query(
440 "RETURNING is not yet supported for this INSERT path (only plain table rows)"
441 .to_string(),
442 ));
443 }
444 for row_values in &effective_rows {
445 if row_values.len() != query.columns.len() {
446 return Err(RedDBError::Query(format!(
447 "INSERT column count ({}) does not match value count ({})",
448 query.columns.len(),
449 row_values.len()
450 )));
451 }
452
453 match query.entity_type {
454 InsertEntityType::Row => {
455 let (fields, mut metadata) =
456 split_insert_metadata(self, &query.columns, row_values)?;
457 merge_with_clauses(
458 &mut metadata,
459 query.ttl_ms,
460 query.expires_at_ms,
461 &query.with_metadata,
462 );
463 self.insert_timeseries_point(&query.table, fields, metadata)?;
464 }
465 InsertEntityType::Node => {
466 let (node_values, mut metadata) =
467 split_insert_metadata(self, &query.columns, row_values)?;
468 merge_with_clauses(
469 &mut metadata,
470 query.ttl_ms,
471 query.expires_at_ms,
472 &query.with_metadata,
473 );
474 ensure_non_tree_reserved_metadata_entries(&metadata)?;
475 let (columns, values) = pairwise_columns_values(&node_values);
476 let label = find_column_value_string(&columns, &values, "label")?;
477 let node_type =
478 find_column_value_opt_string(&columns, &values, "node_type");
479 let properties = extract_remaining_properties(
480 &columns,
481 &values,
482 &["label", "node_type"],
483 );
484 let input = CreateNodeInput {
485 collection: query.table.clone(),
486 label,
487 node_type,
488 properties,
489 metadata,
490 embeddings: Vec::new(),
491 table_links: Vec::new(),
492 node_links: Vec::new(),
493 };
494 self.create_node(input)?;
495 }
496 InsertEntityType::Edge => {
497 let (edge_values, mut metadata) =
498 split_insert_metadata(self, &query.columns, row_values)?;
499 merge_with_clauses(
500 &mut metadata,
501 query.ttl_ms,
502 query.expires_at_ms,
503 &query.with_metadata,
504 );
505 ensure_non_tree_reserved_metadata_entries(&metadata)?;
506 let (columns, values) = pairwise_columns_values(&edge_values);
507 let label = find_column_value_string(&columns, &values, "label")?;
508 ensure_non_tree_structural_edge_label(&label)?;
509 let from_id = find_column_value_u64(&columns, &values, "from")?;
510 let to_id = find_column_value_u64(&columns, &values, "to")?;
511 let weight = find_column_value_f32_opt(&columns, &values, "weight");
512 let properties = extract_remaining_properties(
513 &columns,
514 &values,
515 &["label", "from", "to", "weight"],
516 );
517 let input = CreateEdgeInput {
518 collection: query.table.clone(),
519 label,
520 from: EntityId::new(from_id),
521 to: EntityId::new(to_id),
522 weight,
523 properties,
524 metadata,
525 };
526 self.create_edge(input)?;
527 }
528 InsertEntityType::Vector => {
529 let (vector_values, mut metadata) =
530 split_insert_metadata(self, &query.columns, row_values)?;
531 merge_with_clauses(
532 &mut metadata,
533 query.ttl_ms,
534 query.expires_at_ms,
535 &query.with_metadata,
536 );
537 let (columns, values) = pairwise_columns_values(&vector_values);
538 let dense = find_column_value_vec_f32(&columns, &values, "dense")?;
539 let content = find_column_value_opt_string(&columns, &values, "content");
540 let input = CreateVectorInput {
541 collection: query.table.clone(),
542 dense,
543 content,
544 metadata,
545 link_row: None,
546 link_node: None,
547 };
548 self.create_vector(input)?;
549 }
550 InsertEntityType::Document => {
551 let (document_values, mut metadata) =
552 split_insert_metadata(self, &query.columns, row_values)?;
553 merge_with_clauses(
554 &mut metadata,
555 query.ttl_ms,
556 query.expires_at_ms,
557 &query.with_metadata,
558 );
559 let (columns, values) = pairwise_columns_values(&document_values);
560 let body_str = find_column_value_string(&columns, &values, "body")?;
561 let body: crate::json::Value = crate::json::from_str(&body_str)
562 .map_err(|e| RedDBError::Query(format!("invalid JSON body: {e}")))?;
563 let input = CreateDocumentInput {
564 collection: query.table.clone(),
565 body,
566 metadata,
567 node_links: Vec::new(),
568 vector_links: Vec::new(),
569 };
570 self.create_document(input)?;
571 }
572 InsertEntityType::Kv => {
573 let (kv_values, mut metadata) =
574 split_insert_metadata(self, &query.columns, row_values)?;
575 merge_with_clauses(
576 &mut metadata,
577 query.ttl_ms,
578 query.expires_at_ms,
579 &query.with_metadata,
580 );
581 let (columns, values) = pairwise_columns_values(&kv_values);
582 let key = find_column_value_string(&columns, &values, "key")?;
583 let value = find_column_value(&columns, &values, "value")?;
584 let input = CreateKvInput {
585 collection: query.table.clone(),
586 key,
587 value,
588 metadata,
589 };
590 self.create_kv(input)?;
591 }
592 }
593
594 inserted_count += 1;
595 }
596 }
597
598 if let Some(ref embed_config) = query.auto_embed {
600 let store = self.inner.db.store();
601 let provider = crate::ai::parse_provider(&embed_config.provider)?;
602 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
603 let model = embed_config.model.clone().unwrap_or_else(|| {
604 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
605 .ok()
606 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
607 });
608
609 let manager = store
611 .get_collection(&query.table)
612 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
613 let entities = manager.query_all(|_| true);
614 let recent: Vec<_> = entities
615 .into_iter()
616 .rev()
617 .take(effective_rows.len())
618 .collect();
619
620 let entity_combos: Vec<(usize, String)> = recent
622 .iter()
623 .enumerate()
624 .filter_map(|(i, entity)| {
625 if let EntityData::Row(ref row) = entity.data {
626 if let Some(ref named) = row.named {
627 let texts: Vec<String> = embed_config
628 .fields
629 .iter()
630 .filter_map(|field| match named.get(field) {
631 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
632 _ => None,
633 })
634 .collect();
635 if !texts.is_empty() {
636 return Some((i, texts.join(" ")));
637 }
638 }
639 }
640 None
641 })
642 .collect();
643
644 if !entity_combos.is_empty() {
645 let batch_texts: Vec<String> =
647 entity_combos.iter().map(|(_, t)| t.clone()).collect();
648
649 let batch_client =
650 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
651
652 let embeddings = match tokio::runtime::Handle::try_current() {
653 Ok(handle) => tokio::task::block_in_place(|| {
654 handle.block_on(batch_client.embed_batch(
655 &provider,
656 &model,
657 &api_key,
658 batch_texts,
659 ))
660 }),
661 Err(_) => {
662 return Err(RedDBError::Query(
663 "AUTO EMBED requires a Tokio runtime context".to_string(),
664 ));
665 }
666 }
667 .map_err(|e| RedDBError::Query(e.to_string()))?;
668
669 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
671 if dense.is_empty() {
672 continue;
673 }
674 self.create_vector(CreateVectorInput {
675 collection: query.table.clone(),
676 dense,
677 content: Some(combined.clone()),
678 metadata: Vec::new(),
679 link_row: None,
680 link_node: None,
681 })?;
682 }
683 }
684 }
685
686 if inserted_count > 0 {
687 self.note_table_write(&query.table);
688 }
689
690 let mut result = RuntimeQueryResult::dml_result(
691 raw_query.to_string(),
692 inserted_count,
693 "insert",
694 "runtime-dml",
695 );
696 if let Some(returning) = returning_result {
697 result.result = returning;
698 }
699 Ok(result)
700 }
701
702 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
703 let Some(auth_store) = self.inner.auth_store.read().clone() else {
704 return Ok(());
705 };
706 if !auth_store.iam_authorization_enabled() {
707 return Ok(());
708 }
709 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
710 return Ok(());
711 };
712
713 let tenant = crate::runtime::impl_core::current_tenant();
714 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
715 let request = crate::auth::ColumnAccessRequest {
716 action: "insert".to_string(),
717 schema: None,
718 table: query.table.clone(),
719 columns: query.columns.clone(),
720 };
721 let ctx = crate::auth::policies::EvalContext {
722 principal_tenant: tenant.clone(),
723 current_tenant: tenant,
724 peer_ip: None,
725 mfa_present: false,
726 now_ms: crate::auth::now_ms(),
727 principal_is_admin_role: role == crate::auth::Role::Admin,
728 };
729
730 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
731 let table_allowed = matches!(
732 outcome.table_decision,
733 crate::auth::policies::Decision::Allow { .. }
734 | crate::auth::policies::Decision::AdminBypass
735 );
736 if !table_allowed {
737 return Err(RedDBError::Query(format!(
738 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
739 outcome.table_resource.kind, outcome.table_resource.name
740 )));
741 }
742 if let Some(denied) = outcome.first_denied_column() {
743 return Err(RedDBError::Query(format!(
744 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
745 denied.resource.kind, denied.resource.name
746 )));
747 }
748
749 Ok(())
750 }
751
752 pub(crate) fn insert_timeseries_point(
753 &self,
754 collection: &str,
755 fields: Vec<(String, Value)>,
756 mut metadata: Vec<(String, MetadataValue)>,
757 ) -> RedDBResult<EntityId> {
758 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
759
760 let (columns, values) = pairwise_columns_values(&fields);
761 validate_timeseries_insert_columns(&columns)?;
762
763 let metric = find_column_value_string(&columns, &values, "metric")?;
764 let value = find_column_value_f64(&columns, &values, "value")?;
765 let timestamp_ns =
766 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
767 let tags = find_timeseries_tags(&columns, &values)?;
768
769 let mut entity = UnifiedEntity::new(
770 EntityId::new(0),
771 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
772 series: collection.to_string(),
773 metric: metric.clone(),
774 })),
775 EntityData::TimeSeries(crate::storage::TimeSeriesData {
776 metric,
777 timestamp_ns,
778 value,
779 tags,
780 }),
781 );
782 let writer_xid = match self.current_xid() {
786 Some(xid) => xid,
787 None => {
788 let mgr = self.snapshot_manager();
789 let xid = mgr.begin();
790 mgr.commit(xid);
791 xid
792 }
793 };
794 entity.set_xmin(writer_xid);
795
796 let store = self.inner.db.store();
797 let id = store
798 .insert_auto(collection, entity)
799 .map_err(|err| RedDBError::Internal(err.to_string()))?;
800
801 if !metadata.is_empty() {
802 let _ = store.set_metadata(
803 collection,
804 id,
805 Metadata::with_fields(metadata.into_iter().collect()),
806 );
807 }
808
809 self.cdc_emit(
810 crate::replication::cdc::ChangeOperation::Insert,
811 collection,
812 id.raw(),
813 "timeseries",
814 );
815
816 Ok(id)
817 }
818
819 pub fn execute_update(
824 &self,
825 raw_query: &str,
826 query: &UpdateQuery,
827 ) -> RedDBResult<RuntimeQueryResult> {
828 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
829 crate::runtime::collection_contract::CollectionContractGate::check(
835 self,
836 &query.table,
837 crate::runtime::collection_contract::MutationKind::Update,
838 )?;
839
840 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
846 let augmented_query: UpdateQuery;
847 let effective_query: &UpdateQuery = if rls_gated {
848 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
849 self,
850 &query.table,
851 crate::storage::query::ast::PolicyAction::Update,
852 );
853 let Some(policy) = rls_filter else {
854 let mut response = RuntimeQueryResult::dml_result(
857 raw_query.to_string(),
858 0,
859 "update",
860 "runtime-dml-rls",
861 );
862 if let Some(items) = query.returning.clone() {
863 response.result = build_returning_result(&items, &[], None);
864 }
865 return Ok(response);
866 };
867 let mut augmented = query.clone();
868 augmented.filter = Some(match augmented.filter.take() {
869 Some(existing) => {
870 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
871 }
872 None => policy,
873 });
874 augmented_query = augmented;
875 &augmented_query
876 } else {
877 query
878 };
879
880 if let Some(items) = effective_query.returning.clone() {
885 let mut inner_query = effective_query.clone();
886 inner_query.returning = None;
887 let (mut response, touched_ids) =
888 self.execute_update_inner_tracked(raw_query, &inner_query)?;
889
890 let snapshots = super::dml_target_scan::DmlTargetScan::new(
891 self,
892 &effective_query.table,
893 None,
894 None,
895 )
896 .row_snapshots(&touched_ids);
897
898 response.result = build_returning_result(&items, &snapshots, None);
899 response.engine = "runtime-dml-returning";
900 return Ok(response);
901 }
902
903 self.execute_update_inner(raw_query, effective_query)
904 }
905
906 fn execute_update_inner(
908 &self,
909 raw_query: &str,
910 query: &UpdateQuery,
911 ) -> RedDBResult<RuntimeQueryResult> {
912 self.execute_update_inner_tracked(raw_query, query)
913 .map(|(res, _)| res)
914 }
915
916 fn execute_update_inner_tracked(
917 &self,
918 raw_query: &str,
919 query: &UpdateQuery,
920 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
921 let store = self.inner.db.store();
922 let effective_filter = effective_update_filter(query);
923 let compiled_plan = self.compile_update_plan(query)?;
924 let mut touched_ids: Vec<EntityId> = Vec::new();
925 let limit_cap = query.limit.map(|l| l as usize);
926 let manager = store
927 .get_collection(&query.table)
928 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
929 let ids_to_update = super::dml_target_scan::DmlTargetScan::new(
930 self,
931 &query.table,
932 effective_filter.as_ref(),
933 limit_cap,
934 )
935 .find_target_ids()?;
936
937 let mut affected: u64 = 0;
938 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
939 let mut applied_chunk = Vec::with_capacity(chunk.len());
940 for entity in manager.get_many(chunk).into_iter().flatten() {
941 let assignments =
942 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
943 let applied = self.apply_materialized_update_for_entity(
944 query.table.clone(),
945 entity,
946 &compiled_plan,
947 assignments,
948 )?;
949 touched_ids.push(applied.id);
950 applied_chunk.push(applied);
951 }
952 self.persist_update_chunk(&applied_chunk)?;
953 affected += applied_chunk.len() as u64;
954 let lsns = self.flush_update_chunk(&applied_chunk);
955 if !query.suppress_events {
956 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
957 }
958 }
959
960 if affected > 0 {
961 self.note_table_write(&query.table);
962 }
963
964 Ok((
965 RuntimeQueryResult::dml_result(
966 raw_query.to_string(),
967 affected,
968 "update",
969 "runtime-dml",
970 ),
971 touched_ids,
972 ))
973 }
974
975 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
976 let mut static_field_assignments = Vec::new();
977 let mut static_metadata_assignments = Vec::new();
978 let mut dynamic_assignments = Vec::new();
979 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
980 let mut row_modified_columns = Vec::new();
981
982 for (column, expr) in &query.assignment_exprs {
983 let metadata_key = resolve_sql_ttl_metadata_key(column);
984 if let Ok(value) = fold_expr_to_value(expr.clone()) {
985 if let Some(metadata_key) = metadata_key {
986 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
987 let (canonical_key, canonical_value) =
988 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
989 static_metadata_assignments.push((canonical_key.to_string(), canonical_value));
990 } else {
991 let value = self.resolve_crypto_sentinel(value)?;
992 static_field_assignments.push((
993 column.clone(),
994 normalize_row_update_assignment_with_plan(
995 &query.table,
996 column,
997 value,
998 row_contract_plan.as_ref(),
999 )?,
1000 ));
1001 row_modified_columns.push(column.clone());
1002 }
1003 continue;
1004 }
1005
1006 dynamic_assignments.push(CompiledUpdateAssignment {
1007 column: column.clone(),
1008 expr: expr.clone(),
1009 metadata_key,
1010 row_rule: if metadata_key.is_none() {
1011 if let Some(plan) = row_contract_plan.as_ref() {
1012 if plan.timestamps_enabled
1013 && (column == "created_at" || column == "updated_at")
1014 {
1015 return Err(RedDBError::Query(format!(
1016 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1017 query.table, column
1018 )));
1019 }
1020 if let Some(rule) = plan.declared_rules.get(column) {
1021 Some(rule.clone())
1022 } else if plan.strict_schema {
1023 return Err(RedDBError::Query(format!(
1024 "collection '{}' is strict and does not allow undeclared fields: {}",
1025 query.table, column
1026 )));
1027 } else {
1028 None
1029 }
1030 } else {
1031 None
1032 }
1033 } else {
1034 None
1035 },
1036 });
1037 if metadata_key.is_none() {
1038 row_modified_columns.push(column.clone());
1039 }
1040 }
1041
1042 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1043 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1044 row_modified_columns.iter().any(|column| {
1045 plan.unique_columns
1046 .keys()
1047 .any(|unique| unique.eq_ignore_ascii_case(column))
1048 })
1049 });
1050
1051 if let Some(ttl_ms) = query.ttl_ms {
1052 static_metadata_assignments
1053 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1054 }
1055 if let Some(expires_at_ms) = query.expires_at_ms {
1056 static_metadata_assignments.push((
1057 "_expires_at".to_string(),
1058 metadata_u64_to_value(expires_at_ms),
1059 ));
1060 }
1061 for (key, val) in &query.with_metadata {
1062 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1063 }
1064
1065 Ok(CompiledUpdatePlan {
1066 static_field_assignments,
1067 static_metadata_assignments,
1068 dynamic_assignments,
1069 row_contract_plan,
1070 row_modified_columns,
1071 row_touches_unique_columns,
1072 })
1073 }
1074
1075 fn materialize_update_assignments_for_entity(
1076 &self,
1077 query: &UpdateQuery,
1078 entity: &UnifiedEntity,
1079 compiled_plan: &CompiledUpdatePlan,
1080 ) -> RedDBResult<MaterializedUpdateAssignments> {
1081 let mut assignments = MaterializedUpdateAssignments::default();
1082 let mut record: Option<UnifiedRecord> = None;
1083
1084 for assignment in &compiled_plan.dynamic_assignments {
1085 if record.is_none() {
1086 record = runtime_any_record_from_entity_ref(entity);
1087 }
1088 let Some(record) = record.as_ref() else {
1089 return Err(RedDBError::Query(format!(
1090 "UPDATE could not materialize runtime record for entity {} in '{}'",
1091 entity.id.raw(),
1092 query.table
1093 )));
1094 };
1095 let value = super::expr_eval::evaluate_runtime_expr_with_db(
1096 Some(self.inner.db.as_ref()),
1097 &assignment.expr,
1098 record,
1099 Some(query.table.as_str()),
1100 Some(query.table.as_str()),
1101 )
1102 .ok_or_else(|| {
1103 RedDBError::Query(format!(
1104 "failed to evaluate UPDATE expression for column '{}'",
1105 assignment.column
1106 ))
1107 })?;
1108
1109 if let Some(metadata_key) = assignment.metadata_key {
1110 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1111 let (canonical_key, canonical_value) =
1112 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1113 assignments
1114 .dynamic_metadata_assignments
1115 .push((canonical_key.to_string(), canonical_value));
1116 } else {
1117 assignments.dynamic_field_assignments.push((
1118 assignment.column.clone(),
1119 normalize_row_update_value_for_rule(
1120 &query.table,
1121 self.resolve_crypto_sentinel(value)?,
1122 assignment.row_rule.as_ref(),
1123 )?,
1124 ));
1125 }
1126 }
1127
1128 Ok(assignments)
1129 }
1130
1131 fn apply_materialized_update_for_entity(
1132 &self,
1133 collection: String,
1134 entity: UnifiedEntity,
1135 compiled_plan: &CompiledUpdatePlan,
1136 assignments: MaterializedUpdateAssignments,
1137 ) -> RedDBResult<AppliedEntityMutation> {
1138 if matches!(entity.data, EntityData::Row(_)) {
1139 return self.apply_loaded_sql_update_row_core(
1140 collection,
1141 entity,
1142 &compiled_plan.static_field_assignments,
1143 assignments.dynamic_field_assignments,
1144 &compiled_plan.static_metadata_assignments,
1145 assignments.dynamic_metadata_assignments,
1146 compiled_plan.row_contract_plan.as_ref(),
1147 &compiled_plan.row_modified_columns,
1148 compiled_plan.row_touches_unique_columns,
1149 );
1150 }
1151
1152 self.apply_loaded_patch_entity_core(
1153 collection,
1154 entity,
1155 crate::json::Value::Null,
1156 build_patch_operations_from_materialized_assignments(compiled_plan, assignments),
1157 )
1158 }
1159
1160 pub fn execute_delete(
1162 &self,
1163 raw_query: &str,
1164 query: &DeleteQuery,
1165 ) -> RedDBResult<RuntimeQueryResult> {
1166 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1167 crate::runtime::collection_contract::CollectionContractGate::check(
1171 self,
1172 &query.table,
1173 crate::runtime::collection_contract::MutationKind::Delete,
1174 )?;
1175
1176 if let Some(items) = query.returning.clone() {
1183 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1184 RedDBError::Query(
1185 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1186 )
1187 })?;
1188 let captured = self.execute_query(&select_sql)?;
1189
1190 let mut inner_query = query.clone();
1191 inner_query.returning = None;
1192 let _ = self.execute_delete(raw_query, &inner_query)?;
1193
1194 let snapshots: Vec<Vec<(String, Value)>> = captured
1195 .result
1196 .records
1197 .iter()
1198 .map(|rec| {
1199 rec.iter_fields()
1200 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1201 .collect()
1202 })
1203 .collect();
1204 let affected = snapshots.len() as u64;
1205 let result = build_returning_result(&items, &snapshots, None);
1206
1207 let mut response = RuntimeQueryResult::dml_result(
1208 raw_query.to_string(),
1209 affected,
1210 "delete",
1211 "runtime-dml-returning",
1212 );
1213 response.result = result;
1214 return Ok(response);
1215 }
1216 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1223 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1224 self,
1225 &query.table,
1226 crate::storage::query::ast::PolicyAction::Delete,
1227 );
1228 let Some(policy) = rls_filter else {
1229 return Ok(RuntimeQueryResult::dml_result(
1230 raw_query.to_string(),
1231 0,
1232 "delete",
1233 "runtime-dml-rls",
1234 ));
1235 };
1236 let mut augmented = query.clone();
1241 augmented.filter = Some(match augmented.filter.take() {
1242 Some(existing) => {
1243 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1244 }
1245 None => policy,
1246 });
1247 return self.execute_delete_inner(raw_query, &augmented);
1248 }
1249 self.execute_delete_inner(raw_query, query)
1250 }
1251
1252 fn execute_delete_inner(
1253 &self,
1254 raw_query: &str,
1255 query: &DeleteQuery,
1256 ) -> RedDBResult<RuntimeQueryResult> {
1257 let effective_filter = effective_delete_filter(query);
1258
1259 let scan = super::dml_target_scan::DmlTargetScan::new(
1263 self,
1264 &query.table,
1265 effective_filter.as_ref(),
1266 None,
1267 );
1268 let ids_to_delete = scan.find_target_ids()?;
1269
1270 let needs_delete_events =
1273 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1274 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1275 scan.row_json_pre_images(&ids_to_delete)
1276 } else {
1277 HashMap::new()
1278 };
1279
1280 let mut affected: u64 = 0;
1281 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1282 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1283 affected += count;
1284 if needs_delete_events && !lsns.is_empty() {
1285 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1289 self.emit_delete_events_for_collection(
1290 &query.table,
1291 deleted_chunk,
1292 &lsns,
1293 &pre_images,
1294 )?;
1295 }
1296 }
1297 pre_images.clear();
1298
1299 if affected > 0 {
1300 self.note_table_write(&query.table);
1301 }
1302
1303 Ok(RuntimeQueryResult::dml_result(
1304 raw_query.to_string(),
1305 affected,
1306 "delete",
1307 "runtime-dml",
1308 ))
1309 }
1310}
1311
1312fn build_patch_operations_from_materialized_assignments(
1313 compiled_plan: &CompiledUpdatePlan,
1314 assignments: MaterializedUpdateAssignments,
1315) -> Vec<PatchEntityOperation> {
1316 let mut operations = Vec::with_capacity(
1317 compiled_plan.static_field_assignments.len()
1318 + compiled_plan.static_metadata_assignments.len()
1319 + assignments.dynamic_field_assignments.len()
1320 + assignments.dynamic_metadata_assignments.len(),
1321 );
1322
1323 for (column, value) in &compiled_plan.static_field_assignments {
1324 operations.push(PatchEntityOperation {
1325 op: PatchEntityOperationType::Set,
1326 path: vec!["fields".to_string(), column.clone()],
1327 value: Some(storage_value_to_json(value)),
1328 });
1329 }
1330
1331 for (column, value) in assignments.dynamic_field_assignments {
1332 operations.push(PatchEntityOperation {
1333 op: PatchEntityOperationType::Set,
1334 path: vec!["fields".to_string(), column],
1335 value: Some(storage_value_to_json(&value)),
1336 });
1337 }
1338
1339 for (key, value) in &compiled_plan.static_metadata_assignments {
1340 operations.push(PatchEntityOperation {
1341 op: PatchEntityOperationType::Set,
1342 path: vec!["metadata".to_string(), key.clone()],
1343 value: Some(metadata_value_to_json(value)),
1344 });
1345 }
1346
1347 for (key, value) in assignments.dynamic_metadata_assignments {
1348 operations.push(PatchEntityOperation {
1349 op: PatchEntityOperationType::Set,
1350 path: vec!["metadata".to_string(), key],
1351 value: Some(metadata_value_to_json(&value)),
1352 });
1353 }
1354
1355 operations
1356}
1357
1358fn delete_to_select_sql(sql: &str) -> Option<String> {
1368 let trimmed = sql.trim_start();
1369 let lowered = trimmed.to_ascii_lowercase();
1370 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1371 return None;
1372 }
1373 let from_idx = lowered.find(" from ")?;
1375 let after_from = &trimmed[from_idx + " from ".len()..];
1376 let after_from_lc = &lowered[from_idx + " from ".len()..];
1377
1378 let mut body = after_from.to_string();
1383 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1384 body.truncate(pos);
1385 }
1386 Some(format!("SELECT * FROM {}", body.trim_end()))
1387}
1388
1389fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1394 let bytes = haystack.as_bytes();
1395 let nlen = needle.len();
1396 let mut i = 0usize;
1397 let mut in_string = false;
1398 while i < bytes.len() {
1399 let c = bytes[i];
1400 if c == b'\'' {
1401 in_string = !in_string;
1402 i += 1;
1403 continue;
1404 }
1405 if !in_string
1406 && i + nlen <= bytes.len()
1407 && &bytes[i..i + nlen] == needle.as_bytes()
1408 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1409 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1410 {
1411 return Some(i);
1412 }
1413 i += 1;
1414 }
1415 None
1416}
1417
1418fn build_returning_result(
1425 items: &[ReturningItem],
1426 snapshots: &[Vec<(String, Value)>],
1427 outputs: Option<&[crate::application::entity::CreateEntityOutput]>,
1428) -> UnifiedResult {
1429 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1430
1431 let mut columns: Vec<String> = if project_all {
1432 let mut cols: Vec<String> = Vec::new();
1433 if outputs.is_some() {
1434 cols.push("red_entity_id".to_string());
1435 }
1436 if let Some(first) = snapshots.first() {
1437 for (name, _) in first {
1438 cols.push(name.clone());
1439 }
1440 }
1441 cols
1442 } else {
1443 items
1444 .iter()
1445 .filter_map(|it| match it {
1446 ReturningItem::Column(c) => Some(c.clone()),
1447 ReturningItem::All => None,
1448 })
1449 .collect()
1450 };
1451 {
1453 let mut seen = std::collections::HashSet::new();
1454 columns.retain(|c| seen.insert(c.clone()));
1455 }
1456
1457 let id_key = sys_key_red_entity_id();
1458 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1459 for (idx, snap) in snapshots.iter().enumerate() {
1460 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1461 if let Some(outs) = outputs {
1462 if let Some(out) = outs.get(idx) {
1463 values.insert(Arc::clone(&id_key), Value::Integer(out.id.raw() as i64));
1464 }
1465 }
1466 for (name, val) in snap {
1467 values.insert(Arc::from(name.as_str()), val.clone());
1468 }
1469 let mut rec = UnifiedRecord::default();
1470 for col in &columns {
1472 if let Some(v) = values.get(col.as_str()) {
1473 rec.set_arc(Arc::from(col.as_str()), v.clone());
1474 }
1475 }
1476 records.push(rec);
1477 }
1478
1479 UnifiedResult {
1480 columns,
1481 records,
1482 stats: Default::default(),
1483 pre_serialized_json: None,
1484 }
1485}
1486
1487fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
1488 if columns.is_empty() {
1489 return columns;
1490 }
1491
1492 let mut unique = Vec::with_capacity(columns.len());
1493 for column in columns.drain(..) {
1494 if !unique
1495 .iter()
1496 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1497 {
1498 unique.push(column);
1499 }
1500 }
1501 unique
1502}
1503
1504const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
1509
1510fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
1511 if column.eq_ignore_ascii_case("_ttl") {
1512 Some(SQL_TTL_METADATA_COLUMNS[0])
1513 } else if column.eq_ignore_ascii_case("_ttl_ms") {
1514 Some(SQL_TTL_METADATA_COLUMNS[1])
1515 } else if column.eq_ignore_ascii_case("_expires_at") {
1516 Some(SQL_TTL_METADATA_COLUMNS[2])
1517 } else {
1518 None
1519 }
1520}
1521
1522fn canonicalize_sql_ttl_metadata(
1527 key: &'static str,
1528 value: MetadataValue,
1529) -> (&'static str, MetadataValue) {
1530 if key != "_ttl" {
1531 return (key, value);
1532 }
1533 let scaled = match value {
1534 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
1535 MetadataValue::Timestamp(ms_or_s) => {
1536 MetadataValue::Timestamp(ms_or_s)
1539 }
1540 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
1541 other => other,
1542 };
1543 ("_ttl_ms", scaled)
1544}
1545
1546pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
1550
1551impl RedDBRuntime {
1552 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
1558 match value {
1559 Value::Password(marked) => {
1560 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
1561 Ok(Value::Password(crate::auth::store::hash_password(plain)))
1562 } else {
1563 Ok(Value::Password(marked))
1564 }
1565 }
1566 Value::Secret(bytes) => {
1567 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
1568 if !self.secret_auto_encrypt() {
1569 return Err(RedDBError::Query(
1570 "SECRET() literal rejected: red.config.secret.auto_encrypt \
1571 is false. Insert pre-encrypted bytes directly instead."
1572 .to_string(),
1573 ));
1574 }
1575 let key = self.secret_aes_key().ok_or_else(|| {
1576 RedDBError::Query(
1577 "SECRET() column encryption requires a bootstrapped \
1578 vault (red.secret.aes_key is missing). Start the server \
1579 with --vault to enable."
1580 .to_string(),
1581 )
1582 })?;
1583 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
1584 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
1585 } else {
1586 Ok(Value::Secret(bytes))
1587 }
1588 }
1589 other => Ok(other),
1590 }
1591 }
1592}
1593
1594fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
1597 let nonce_bytes = crate::auth::store::random_bytes(12);
1598 let mut nonce = [0u8; 12];
1599 nonce.copy_from_slice(&nonce_bytes[..12]);
1600 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
1601 let mut out = Vec::with_capacity(12 + ct.len());
1602 out.extend_from_slice(&nonce);
1603 out.extend_from_slice(&ct);
1604 out
1605}
1606
1607pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
1611 if payload.len() < 12 {
1612 return None;
1613 }
1614 let mut nonce = [0u8; 12];
1615 nonce.copy_from_slice(&payload[..12]);
1616 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
1617}
1618
1619fn split_insert_metadata(
1620 runtime: &RedDBRuntime,
1621 columns: &[String],
1622 values: &[Value],
1623) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
1624 let mut fields = Vec::new();
1625 let mut metadata = Vec::new();
1626
1627 for (column, value) in columns.iter().zip(values.iter()) {
1628 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
1630 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
1631 let (canonical_key, canonical_value) =
1632 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1633 metadata.push((canonical_key.to_string(), canonical_value));
1634 continue;
1635 }
1636 fields.push((
1637 column.clone(),
1638 runtime.resolve_crypto_sentinel(value.clone())?,
1639 ));
1640 }
1641
1642 Ok((fields, metadata))
1643}
1644
1645fn merge_with_clauses(
1647 metadata: &mut Vec<(String, MetadataValue)>,
1648 ttl_ms: Option<u64>,
1649 expires_at_ms: Option<u64>,
1650 with_metadata: &[(String, Value)],
1651) {
1652 if let Some(ms) = ttl_ms {
1653 metadata.push((
1654 "_ttl_ms".to_string(),
1655 if ms <= i64::MAX as u64 {
1656 MetadataValue::Int(ms as i64)
1657 } else {
1658 MetadataValue::Timestamp(ms)
1659 },
1660 ));
1661 }
1662 if let Some(ms) = expires_at_ms {
1663 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
1664 }
1665 for (key, value) in with_metadata {
1666 let meta_value = match value {
1667 Value::Text(s) => MetadataValue::String(s.to_string()),
1668 Value::Integer(n) => MetadataValue::Int(*n),
1669 Value::Float(n) => MetadataValue::Float(*n),
1670 Value::Boolean(b) => MetadataValue::Bool(*b),
1671 _ => MetadataValue::String(value.to_string()),
1672 };
1673 metadata.push((key.clone(), meta_value));
1674 }
1675}
1676
1677fn apply_collection_default_ttl_metadata(
1678 runtime: &RedDBRuntime,
1679 collection: &str,
1680 metadata: &mut Vec<(String, MetadataValue)>,
1681) {
1682 if has_internal_ttl_metadata(metadata) {
1683 return;
1684 }
1685
1686 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
1687 return;
1688 };
1689
1690 metadata.push((
1691 "_ttl_ms".to_string(),
1692 if default_ttl_ms <= i64::MAX as u64 {
1693 MetadataValue::Int(default_ttl_ms as i64)
1694 } else {
1695 MetadataValue::Timestamp(default_ttl_ms)
1696 },
1697 ));
1698}
1699
1700fn ensure_non_tree_reserved_metadata_entries(
1701 metadata: &[(String, MetadataValue)],
1702) -> RedDBResult<()> {
1703 for (key, _) in metadata {
1704 ensure_non_tree_reserved_metadata_key(key)?;
1705 }
1706 Ok(())
1707}
1708
1709fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
1710 if key.starts_with(TREE_METADATA_PREFIX) {
1711 return Err(RedDBError::Query(format!(
1712 "metadata key '{}' is reserved for managed trees",
1713 key
1714 )));
1715 }
1716 Ok(())
1717}
1718
1719fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
1720 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
1721 return Err(RedDBError::Query(format!(
1722 "edge label '{}' is reserved for managed trees",
1723 TREE_CHILD_EDGE_LABEL
1724 )));
1725 }
1726 Ok(())
1727}
1728
1729fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
1730 let mut columns = Vec::with_capacity(pairs.len());
1731 let mut values = Vec::with_capacity(pairs.len());
1732
1733 for (column, value) in pairs {
1734 columns.push(column.clone());
1735 values.push(value.clone());
1736 }
1737
1738 (columns, values)
1739}
1740
1741fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
1743 for (i, col) in columns.iter().enumerate() {
1744 if col.eq_ignore_ascii_case(name) {
1745 return Ok(values[i].clone());
1746 }
1747 }
1748 Err(RedDBError::Query(format!(
1749 "required column '{name}' not found in INSERT"
1750 )))
1751}
1752
1753fn find_column_value_string(
1755 columns: &[String],
1756 values: &[Value],
1757 name: &str,
1758) -> RedDBResult<String> {
1759 let val = find_column_value(columns, values, name)?;
1760 match val {
1761 Value::Text(s) => Ok(s.to_string()),
1762 Value::Integer(n) => Ok(n.to_string()),
1763 Value::Float(n) => Ok(n.to_string()),
1764 other => Err(RedDBError::Query(format!(
1765 "column '{name}' expected text, got {other:?}"
1766 ))),
1767 }
1768}
1769
1770fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
1771 let val = find_column_value(columns, values, name)?;
1772 match val {
1773 Value::Float(n) => Ok(n),
1774 Value::Integer(n) => Ok(n as f64),
1775 Value::UnsignedInteger(n) => Ok(n as f64),
1776 Value::Text(s) => s
1777 .parse::<f64>()
1778 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
1779 other => Err(RedDBError::Query(format!(
1780 "column '{name}' expected number, got {other:?}"
1781 ))),
1782 }
1783}
1784
1785fn find_column_value_opt_string(
1787 columns: &[String],
1788 values: &[Value],
1789 name: &str,
1790) -> Option<String> {
1791 for (i, col) in columns.iter().enumerate() {
1792 if col.eq_ignore_ascii_case(name) {
1793 return match &values[i] {
1794 Value::Null => None,
1795 Value::Text(s) => Some(s.to_string()),
1796 Value::Integer(n) => Some(n.to_string()),
1797 Value::Float(n) => Some(n.to_string()),
1798 _ => None,
1799 };
1800 }
1801 }
1802 None
1803}
1804
1805fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
1807 let val = find_column_value(columns, values, name)?;
1808 match val {
1809 Value::Integer(n) => Ok(n as u64),
1810 Value::UnsignedInteger(n) => Ok(n),
1811 Value::Text(s) => s
1812 .parse::<u64>()
1813 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
1814 other => Err(RedDBError::Query(format!(
1815 "column '{name}' expected integer, got {other:?}"
1816 ))),
1817 }
1818}
1819
1820fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
1822 for (i, col) in columns.iter().enumerate() {
1823 if col.eq_ignore_ascii_case(name) {
1824 return match &values[i] {
1825 Value::Float(n) => Some(*n as f32),
1826 Value::Integer(n) => Some(*n as f32),
1827 Value::Null => None,
1828 _ => None,
1829 };
1830 }
1831 }
1832 None
1833}
1834
1835fn find_column_value_vec_f32(
1837 columns: &[String],
1838 values: &[Value],
1839 name: &str,
1840) -> RedDBResult<Vec<f32>> {
1841 let val = find_column_value(columns, values, name)?;
1842 match val {
1843 Value::Vector(v) => Ok(v),
1844 Value::Json(bytes) => {
1845 let s = std::str::from_utf8(&bytes).map_err(|_| {
1847 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
1848 })?;
1849 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
1850 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
1851 })?;
1852 Ok(arr)
1853 }
1854 other => Err(RedDBError::Query(format!(
1855 "column '{name}' expected vector, got {other:?}"
1856 ))),
1857 }
1858}
1859
1860fn extract_remaining_properties(
1862 columns: &[String],
1863 values: &[Value],
1864 exclude: &[&str],
1865) -> Vec<(String, Value)> {
1866 columns
1867 .iter()
1868 .zip(values.iter())
1869 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
1870 .map(|(col, val)| (col.clone(), val.clone()))
1871 .collect()
1872}
1873
1874fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
1875 let mut invalid = Vec::new();
1876 for column in columns {
1877 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
1878 invalid.push(column.clone());
1879 }
1880 }
1881
1882 if invalid.is_empty() {
1883 Ok(())
1884 } else {
1885 Err(RedDBError::Query(format!(
1886 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
1887 invalid.join(", ")
1888 )))
1889 }
1890}
1891
1892fn is_timeseries_insert_column(column: &str) -> bool {
1893 matches!(
1894 column.to_ascii_lowercase().as_str(),
1895 "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
1896 )
1897}
1898
1899fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
1900 let mut found = None;
1901
1902 for alias in ["timestamp_ns", "timestamp", "time"] {
1903 for (index, column) in columns.iter().enumerate() {
1904 if !column.eq_ignore_ascii_case(alias) {
1905 continue;
1906 }
1907
1908 if found.is_some() {
1909 return Err(RedDBError::Query(
1910 "timeseries INSERT accepts only one timestamp column".to_string(),
1911 ));
1912 }
1913
1914 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
1915 }
1916 }
1917
1918 Ok(found)
1919}
1920
1921fn find_timeseries_tags(
1922 columns: &[String],
1923 values: &[Value],
1924) -> RedDBResult<std::collections::HashMap<String, String>> {
1925 for (index, column) in columns.iter().enumerate() {
1926 if column.eq_ignore_ascii_case("tags") {
1927 return parse_timeseries_tags(&values[index]);
1928 }
1929 }
1930 Ok(std::collections::HashMap::new())
1931}
1932
1933fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
1934 match value {
1935 Value::Null => Ok(std::collections::HashMap::new()),
1936 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
1937 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
1938 other => Err(RedDBError::Query(format!(
1939 "timeseries tags must be a JSON object or JSON text, got {other:?}"
1940 ))),
1941 }
1942}
1943
1944fn parse_timeseries_tags_json(
1945 bytes: &[u8],
1946) -> RedDBResult<std::collections::HashMap<String, String>> {
1947 let json: crate::json::Value = crate::json::from_slice(bytes)
1948 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
1949
1950 let object = match json {
1951 crate::json::Value::Object(object) => object,
1952 other => {
1953 return Err(RedDBError::Query(format!(
1954 "timeseries tags must be a JSON object, got {other:?}"
1955 )))
1956 }
1957 };
1958
1959 let mut tags = std::collections::HashMap::with_capacity(object.len());
1960 for (key, value) in object {
1961 tags.insert(key, json_tag_value_to_string(&value));
1962 }
1963 Ok(tags)
1964}
1965
1966fn json_tag_value_to_string(value: &crate::json::Value) -> String {
1967 match value {
1968 crate::json::Value::Null => "null".to_string(),
1969 crate::json::Value::Bool(value) => value.to_string(),
1970 crate::json::Value::Number(value) => value.to_string(),
1971 crate::json::Value::String(value) => value.clone(),
1972 other => other.to_string(),
1973 }
1974}
1975
1976fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
1977 match value {
1978 Value::UnsignedInteger(value) => Ok(*value),
1979 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
1980 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
1981 Value::Text(value) => value.parse::<u64>().map_err(|_| {
1982 RedDBError::Query(format!(
1983 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
1984 ))
1985 }),
1986 other => Err(RedDBError::Query(format!(
1987 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
1988 ))),
1989 }
1990}
1991
1992fn current_unix_ns() -> u64 {
1993 std::time::SystemTime::now()
1994 .duration_since(std::time::UNIX_EPOCH)
1995 .unwrap_or_default()
1996 .as_nanos()
1997 .min(u128::from(u64::MAX)) as u64
1998}
1999
2000fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
2001 use crate::json::{Map, Value as JV};
2002 match value {
2003 MetadataValue::Null => JV::Null,
2004 MetadataValue::Bool(value) => JV::Bool(*value),
2005 MetadataValue::Int(value) => JV::Number(*value as f64),
2006 MetadataValue::Float(value) => JV::Number(*value),
2007 MetadataValue::String(value) => JV::String(value.clone()),
2008 MetadataValue::Bytes(value) => JV::Array(
2009 value
2010 .iter()
2011 .map(|value| JV::Number(*value as f64))
2012 .collect(),
2013 ),
2014 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
2015 MetadataValue::Array(values) => {
2016 JV::Array(values.iter().map(metadata_value_to_json).collect())
2017 }
2018 MetadataValue::Object(object) => {
2019 let entries = object
2020 .iter()
2021 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
2022 .collect();
2023 JV::Object(entries)
2024 }
2025 MetadataValue::Geo { lat, lon } => {
2026 let mut object = Map::new();
2027 object.insert("lat".to_string(), JV::Number(*lat));
2028 object.insert("lon".to_string(), JV::Number(*lon));
2029 JV::Object(object)
2030 }
2031 MetadataValue::Reference(target) => {
2032 let mut object = Map::new();
2033 object.insert(
2034 "collection".to_string(),
2035 JV::String(target.collection().to_string()),
2036 );
2037 object.insert(
2038 "entity_id".to_string(),
2039 JV::Number(target.entity_id().raw() as f64),
2040 );
2041 JV::Object(object)
2042 }
2043 MetadataValue::References(values) => {
2044 let refs = values
2045 .iter()
2046 .map(|target| {
2047 let mut object = Map::new();
2048 object.insert(
2049 "collection".to_string(),
2050 JV::String(target.collection().to_string()),
2051 );
2052 object.insert(
2053 "entity_id".to_string(),
2054 JV::Number(target.entity_id().raw() as f64),
2055 );
2056 JV::Object(object)
2057 })
2058 .collect();
2059 JV::Array(refs)
2060 }
2061 }
2062}
2063
2064fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
2065 match value {
2066 Value::Null => MetadataValue::Null,
2067 Value::Boolean(value) => MetadataValue::Bool(*value),
2068 Value::Integer(value) => MetadataValue::Int(*value),
2069 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
2070 Value::Float(value) => MetadataValue::Float(*value),
2071 Value::Text(value) => MetadataValue::String(value.to_string()),
2072 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
2073 Value::Timestamp(value) => {
2074 if *value >= 0 {
2075 metadata_u64_to_value(*value as u64)
2076 } else {
2077 MetadataValue::Int(*value)
2078 }
2079 }
2080 Value::TimestampMs(value) => {
2081 if *value >= 0 {
2082 metadata_u64_to_value(*value as u64)
2083 } else {
2084 MetadataValue::Int(*value)
2085 }
2086 }
2087 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
2088 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
2089 Value::Date(value) => MetadataValue::String(value.to_string()),
2090 Value::Time(value) => MetadataValue::String(value.to_string()),
2091 Value::Decimal(value) => MetadataValue::String(value.to_string()),
2092 Value::Ipv4(value) => MetadataValue::String(format!(
2093 "{}.{}.{}.{}",
2094 (value >> 24) & 0xFF,
2095 (value >> 16) & 0xFF,
2096 (value >> 8) & 0xFF,
2097 value & 0xFF
2098 )),
2099 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
2100 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2101 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2102 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
2103 lat: *lat as f64 / 1_000_000.0,
2104 lon: *lon as f64 / 1_000_000.0,
2105 },
2106 Value::BigInt(value) => MetadataValue::String(value.to_string()),
2107 Value::TableRef(value) => MetadataValue::String(value.clone()),
2108 Value::PageRef(value) => MetadataValue::Int(*value as i64),
2109 Value::Password(value) => MetadataValue::String(value.clone()),
2110 Value::Array(values) => {
2111 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
2112 }
2113 _ => MetadataValue::String(value.to_string()),
2114 }
2115}
2116
2117fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
2118 match value {
2119 Value::Null => Ok(MetadataValue::Null),
2120 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
2121 Value::Integer(_) => Err(RedDBError::Query(format!(
2122 "column '{field}' must be non-negative for TTL metadata"
2123 ))),
2124 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
2125 Value::Float(value) if value.is_finite() => {
2126 if value.fract().abs() >= f64::EPSILON {
2127 return Err(RedDBError::Query(format!(
2128 "column '{field}' must be an integer (TTL metadata must be an integer)"
2129 )));
2130 }
2131 if *value < 0.0 {
2132 return Err(RedDBError::Query(format!(
2133 "column '{field}' must be non-negative for TTL metadata"
2134 )));
2135 }
2136 if *value > u64::MAX as f64 {
2137 return Err(RedDBError::Query(format!(
2138 "column '{field}' value is too large"
2139 )));
2140 }
2141 Ok(metadata_u64_to_value(*value as u64))
2142 }
2143 Value::Float(_) => Err(RedDBError::Query(format!(
2144 "column '{field}' must be a finite number"
2145 ))),
2146 Value::Text(value) => {
2147 let value = value.trim();
2148 if let Ok(value) = value.parse::<u64>() {
2149 Ok(metadata_u64_to_value(value))
2150 } else if let Ok(value) = value.parse::<i64>() {
2151 if value < 0 {
2152 return Err(RedDBError::Query(format!(
2153 "column '{field}' must be non-negative for TTL metadata"
2154 )));
2155 }
2156 Ok(metadata_u64_to_value(value as u64))
2157 } else if let Ok(value) = value.parse::<f64>() {
2158 if !value.is_finite() {
2159 return Err(RedDBError::Query(format!(
2160 "column '{field}' must be a finite number"
2161 )));
2162 }
2163 if value.fract().abs() >= f64::EPSILON {
2164 return Err(RedDBError::Query(format!(
2165 "column '{field}' must be an integer (TTL metadata must be an integer)"
2166 )));
2167 }
2168 if value < 0.0 {
2169 return Err(RedDBError::Query(format!(
2170 "column '{field}' must be non-negative for TTL metadata"
2171 )));
2172 }
2173 if value > u64::MAX as f64 {
2174 return Err(RedDBError::Query(format!(
2175 "column '{field}' value is too large"
2176 )));
2177 }
2178 Ok(metadata_u64_to_value(value as u64))
2179 } else {
2180 Err(RedDBError::Query(format!(
2181 "column '{field}' expects a numeric value for TTL metadata"
2182 )))
2183 }
2184 }
2185 _ => Err(RedDBError::Query(format!(
2186 "column '{field}' expects a numeric value for TTL metadata"
2187 ))),
2188 }
2189}
2190
2191fn metadata_u64_to_value(value: u64) -> MetadataValue {
2192 if value <= i64::MAX as u64 {
2193 MetadataValue::Int(value as i64)
2194 } else {
2195 MetadataValue::Timestamp(value)
2196 }
2197}
2198
2199fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
2205 let json = match value {
2206 Value::Null => return false,
2207 Value::Json(bytes) | Value::Blob(bytes) => {
2208 match crate::json::from_slice::<crate::json::Value>(bytes) {
2209 Ok(v) => v,
2210 Err(_) => return false,
2211 }
2212 }
2213 Value::Text(s) => {
2214 let trimmed = s.trim_start();
2215 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
2216 return false;
2217 }
2218 match crate::json::from_str::<crate::json::Value>(s) {
2219 Ok(v) => v,
2220 Err(_) => return false,
2221 }
2222 }
2223 _ => return false,
2224 };
2225 let mut cursor = &json;
2226 for seg in tail.split('.') {
2227 match cursor {
2228 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
2229 Some((_, v)) => cursor = v,
2230 None => return false,
2231 },
2232 _ => return false,
2233 }
2234 }
2235 !matches!(cursor, crate::json::Value::Null)
2236}
2237
2238fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
2249 let mut root = match current {
2250 Value::Null => crate::json::Value::Object(Default::default()),
2251 Value::Json(bytes) | Value::Blob(bytes) => {
2252 crate::json::from_slice(&bytes).map_err(|err| {
2253 RedDBError::Query(format!(
2254 "tenant auto-fill: root column is not valid JSON ({err})"
2255 ))
2256 })?
2257 }
2258 Value::Text(s) => {
2259 if s.trim().is_empty() {
2260 crate::json::Value::Object(Default::default())
2261 } else {
2262 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
2263 RedDBError::Query(format!(
2264 "tenant auto-fill: text root is not valid JSON ({err})"
2265 ))
2266 })?
2267 }
2268 }
2269 other => {
2270 return Err(RedDBError::Query(format!(
2271 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
2272 )));
2273 }
2274 };
2275
2276 let segments: Vec<&str> = tail.split('.').collect();
2278 let mut cursor: &mut crate::json::Value = &mut root;
2279 for (i, seg) in segments.iter().enumerate() {
2280 let is_last = i + 1 == segments.len();
2281 let map = match cursor {
2282 crate::json::Value::Object(m) => m,
2283 _ => {
2284 return Err(RedDBError::Query(format!(
2285 "tenant auto-fill: segment '{seg}' is not inside an object"
2286 )));
2287 }
2288 };
2289 if is_last {
2290 map.insert(
2291 seg.to_string(),
2292 crate::json::Value::String(tenant_id.to_string()),
2293 );
2294 break;
2295 }
2296 cursor = map
2297 .entry(seg.to_string())
2298 .or_insert_with(|| crate::json::Value::Object(Default::default()));
2299 }
2300
2301 let bytes = crate::json::to_vec(&root).map_err(|err| {
2302 RedDBError::Query(format!(
2303 "tenant auto-fill: failed to re-serialize JSON ({err})"
2304 ))
2305 })?;
2306 Ok(Value::Json(bytes))
2307}
2308
2309#[cfg(test)]
2310mod tests {
2311 use crate::storage::schema::Value;
2312 use crate::{RedDBOptions, RedDBRuntime};
2313
2314 #[test]
2315 fn update_where_id_in_with_hash_index_updates_expected_rows() {
2316 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2317 rt.execute_query("CREATE TABLE users (id INT, score INT)")
2318 .unwrap();
2319 for id in 0..5 {
2320 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
2321 .unwrap();
2322 }
2323 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
2324 .unwrap();
2325
2326 let updated = rt
2327 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
2328 .unwrap();
2329 assert_eq!(updated.affected_rows, 3);
2330
2331 let selected = rt
2332 .execute_query("SELECT id, score FROM users ORDER BY id")
2333 .unwrap();
2334 let scores: Vec<(i64, i64)> = selected
2335 .result
2336 .records
2337 .iter()
2338 .map(|record| {
2339 let id = match record.get("id").unwrap() {
2340 Value::Integer(value) => *value,
2341 other => panic!("expected integer id, got {other:?}"),
2342 };
2343 let score = match record.get("score").unwrap() {
2344 Value::Integer(value) => *value,
2345 other => panic!("expected integer score, got {other:?}"),
2346 };
2347 (id, score)
2348 })
2349 .collect();
2350 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
2351 }
2352
2353 #[test]
2360 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2361 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2362 rt.execute_query("CREATE TABLE items (id INT, score INT)")
2363 .unwrap();
2364 for id in 0..5 {
2365 rt.execute_query(&format!(
2366 "INSERT INTO items (id, score) VALUES ({id}, {})",
2367 id * 10
2368 ))
2369 .unwrap();
2370 }
2371 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2372 .unwrap();
2373
2374 let updated_one = rt
2378 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
2379 .unwrap();
2380 assert_eq!(updated_one.affected_rows, 1);
2381
2382 let updated_many = rt
2386 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
2387 .unwrap();
2388 assert_eq!(updated_many.affected_rows, 2);
2389
2390 let snapshot = rt
2391 .execute_query("SELECT id, score FROM items ORDER BY id")
2392 .unwrap();
2393 let pairs: Vec<(i64, i64)> = snapshot
2394 .result
2395 .records
2396 .iter()
2397 .map(|record| {
2398 let id = match record.get("id").unwrap() {
2399 Value::Integer(value) => *value,
2400 other => panic!("expected integer id, got {other:?}"),
2401 };
2402 let score = match record.get("score").unwrap() {
2403 Value::Integer(value) => *value,
2404 other => panic!("expected integer score, got {other:?}"),
2405 };
2406 (id, score)
2407 })
2408 .collect();
2409 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
2410
2411 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
2413 assert_eq!(updated_all.affected_rows, 5);
2414 let after = rt
2415 .execute_query("SELECT score FROM items ORDER BY id")
2416 .unwrap();
2417 let scores: Vec<i64> = after
2418 .result
2419 .records
2420 .iter()
2421 .map(|record| match record.get("score").unwrap() {
2422 Value::Integer(value) => *value,
2423 other => panic!("expected integer score, got {other:?}"),
2424 })
2425 .collect();
2426 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
2427 }
2428
2429 #[test]
2435 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2436 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2437 rt.execute_query("CREATE TABLE items (id INT, score INT)")
2438 .unwrap();
2439 for id in 0..5 {
2440 rt.execute_query(&format!(
2441 "INSERT INTO items (id, score) VALUES ({id}, {})",
2442 id * 10
2443 ))
2444 .unwrap();
2445 }
2446 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2447 .unwrap();
2448
2449 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2452 assert_eq!(deleted_one.affected_rows, 1);
2453
2454 let deleted_many = rt
2458 .execute_query("DELETE FROM items WHERE score > 25")
2459 .unwrap();
2460 assert_eq!(deleted_many.affected_rows, 2);
2461
2462 let surviving = rt
2463 .execute_query("SELECT id FROM items ORDER BY id")
2464 .unwrap();
2465 let ids: Vec<i64> = surviving
2466 .result
2467 .records
2468 .iter()
2469 .map(|record| match record.get("id").unwrap() {
2470 Value::Integer(value) => *value,
2471 other => panic!("expected integer id, got {other:?}"),
2472 })
2473 .collect();
2474 assert_eq!(ids, vec![0, 1]);
2475
2476 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
2478 assert_eq!(deleted_rest.affected_rows, 2);
2479 let empty = rt.execute_query("SELECT id FROM items").unwrap();
2480 assert!(empty.result.records.is_empty());
2481 }
2482
2483 #[test]
2488 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
2489 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2490 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
2491 .unwrap();
2492
2493 let inserted = rt
2496 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
2497 .unwrap();
2498 assert_eq!(inserted.affected_rows, 1);
2499
2500 let update_err = rt
2502 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
2503 .unwrap_err();
2504 let msg = format!("{update_err}");
2505 assert!(
2506 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
2507 "expected UPDATE rejection message, got: {msg}"
2508 );
2509
2510 let delete_err = rt
2512 .execute_query("DELETE FROM events WHERE id = 1")
2513 .unwrap_err();
2514 let msg = format!("{delete_err}");
2515 assert!(
2516 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
2517 "expected DELETE rejection message, got: {msg}"
2518 );
2519
2520 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
2523 assert_eq!(surviving.result.records.len(), 1);
2524 }
2525
2526 #[test]
2530 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
2531 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2532 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
2533 .unwrap();
2534
2535 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
2536 .unwrap();
2537 let updated = rt
2538 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
2539 .unwrap();
2540 assert_eq!(updated.affected_rows, 1);
2541 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
2542 assert_eq!(deleted.affected_rows, 1);
2543 }
2544
2545 #[test]
2546 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
2547 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2548 rt.execute_query(
2549 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
2550 )
2551 .unwrap();
2552
2553 let inserted = rt
2554 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
2555 .unwrap();
2556 assert_eq!(inserted.affected_rows, 1);
2557
2558 let events = queue_payloads(&rt, "audit_log");
2559 assert_eq!(events.len(), 1);
2560 let event = events[0].as_object().expect("event payload object");
2561 assert!(event
2562 .get("event_id")
2563 .and_then(crate::json::Value::as_str)
2564 .is_some_and(|value| !value.is_empty()));
2565 assert_eq!(
2566 event.get("op").and_then(crate::json::Value::as_str),
2567 Some("insert")
2568 );
2569 assert_eq!(
2570 event.get("collection").and_then(crate::json::Value::as_str),
2571 Some("users")
2572 );
2573 assert_eq!(
2574 event.get("id").and_then(crate::json::Value::as_u64),
2575 Some(7)
2576 );
2577 assert!(event
2578 .get("ts")
2579 .and_then(crate::json::Value::as_u64)
2580 .is_some());
2581 assert!(event
2582 .get("lsn")
2583 .and_then(crate::json::Value::as_u64)
2584 .is_some());
2585 assert!(matches!(
2586 event.get("tenant"),
2587 Some(crate::json::Value::Null)
2588 ));
2589 assert!(matches!(
2590 event.get("before"),
2591 Some(crate::json::Value::Null)
2592 ));
2593 let after = event
2594 .get("after")
2595 .and_then(crate::json::Value::as_object)
2596 .expect("after object");
2597 assert_eq!(
2598 after.get("id").and_then(crate::json::Value::as_u64),
2599 Some(7)
2600 );
2601 assert_eq!(
2602 after.get("email").and_then(crate::json::Value::as_str),
2603 Some("a@example.com")
2604 );
2605 }
2606
2607 #[test]
2608 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
2609 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2610 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
2611 .unwrap();
2612
2613 rt.execute_query(
2614 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
2615 )
2616 .unwrap();
2617
2618 let events = queue_payloads(&rt, "users_events");
2619 assert_eq!(events.len(), 2);
2620 let mut previous_lsn = 0;
2621 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
2622 let object = event.as_object().expect("event payload object");
2623 assert_eq!(
2624 object.get("op").and_then(crate::json::Value::as_str),
2625 Some("insert")
2626 );
2627 assert_eq!(
2628 object.get("id").and_then(crate::json::Value::as_u64),
2629 Some(expected_id)
2630 );
2631 let lsn = object
2632 .get("lsn")
2633 .and_then(crate::json::Value::as_u64)
2634 .expect("event lsn");
2635 assert!(
2636 lsn > previous_lsn,
2637 "event LSNs should increase in row order"
2638 );
2639 previous_lsn = lsn;
2640 let after = object
2641 .get("after")
2642 .and_then(crate::json::Value::as_object)
2643 .expect("after object");
2644 assert_eq!(
2645 after.get("id").and_then(crate::json::Value::as_u64),
2646 Some(expected_id)
2647 );
2648 }
2649 }
2650
2651 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2652 let result = rt
2653 .execute_query(&format!("QUEUE PEEK {queue} 10"))
2654 .expect("peek queue");
2655 result
2656 .result
2657 .records
2658 .iter()
2659 .map(
2660 |record| match record.get("payload").expect("payload column") {
2661 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2662 other => panic!("expected JSON queue payload, got {other:?}"),
2663 },
2664 )
2665 .collect()
2666 }
2667
2668 #[test]
2680 fn auto_index_id_fires_on_first_insert() {
2681 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2682 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
2683 .unwrap();
2684
2685 assert!(
2687 rt.index_store_ref()
2688 .find_index_for_column("bench_users", "id")
2689 .is_none(),
2690 "freshly created collection should not have an `id` index"
2691 );
2692
2693 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
2695 .unwrap();
2696
2697 let registered = rt
2699 .index_store_ref()
2700 .find_index_for_column("bench_users", "id")
2701 .expect("auto-index hook should have registered idx_id on first insert");
2702 assert_eq!(registered.name, "idx_id");
2703 assert_eq!(registered.collection, "bench_users");
2704 assert_eq!(registered.columns, vec!["id".to_string()]);
2705 assert!(matches!(
2706 registered.method,
2707 super::super::index_store::IndexMethodKind::Hash
2708 ));
2709
2710 for id in 2..=5 {
2713 rt.execute_query(&format!(
2714 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
2715 id * 10
2716 ))
2717 .unwrap();
2718 }
2719 for id in 1..=5 {
2720 let result = rt
2721 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
2722 .unwrap();
2723 assert_eq!(
2724 result.result.records.len(),
2725 1,
2726 "id={id} should match one row"
2727 );
2728 }
2729
2730 let deleted = rt
2735 .execute_query("DELETE FROM bench_users WHERE id = 3")
2736 .unwrap();
2737 assert_eq!(deleted.affected_rows, 1);
2738 }
2739
2740 #[test]
2745 fn auto_index_id_fires_on_first_bulk_insert() {
2746 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2747 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
2748 .unwrap();
2749
2750 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
2751 .unwrap();
2752
2753 let registered = rt
2754 .index_store_ref()
2755 .find_index_for_column("bench_bulk", "id")
2756 .expect("auto-index hook should fire on first bulk insert");
2757 assert_eq!(registered.name, "idx_id");
2758
2759 for id in 1..=3 {
2761 let result = rt
2762 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
2763 .unwrap();
2764 assert_eq!(result.result.records.len(), 1);
2765 }
2766 }
2767
2768 #[test]
2772 fn auto_index_id_skips_when_no_id_column() {
2773 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2774 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
2775 .unwrap();
2776 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
2777 .unwrap();
2778
2779 assert!(rt
2780 .index_store_ref()
2781 .find_index_for_column("plain", "id")
2782 .is_none());
2783 assert!(rt
2784 .index_store_ref()
2785 .find_index_for_column("plain", "uid")
2786 .is_none());
2787 }
2788
2789 #[test]
2794 fn auto_index_id_skips_when_index_already_exists() {
2795 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2796 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
2797 .unwrap();
2798 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
2800 .unwrap();
2801 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
2802 .unwrap();
2803
2804 let registered = rt
2805 .index_store_ref()
2806 .find_index_for_column("pre", "id")
2807 .expect("user index should still be there");
2808 assert_eq!(
2809 registered.name, "user_idx",
2810 "auto-index hook must not overwrite an existing index"
2811 );
2812 }
2813
2814 #[test]
2818 fn auto_index_id_dropped_with_collection() {
2819 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2820 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
2821 .unwrap();
2822 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
2823 .unwrap();
2824 assert!(rt
2825 .index_store_ref()
2826 .find_index_for_column("ephemeral", "id")
2827 .is_some());
2828
2829 rt.execute_query("DROP TABLE ephemeral").unwrap();
2830
2831 assert!(
2832 rt.index_store_ref()
2833 .find_index_for_column("ephemeral", "id")
2834 .is_none(),
2835 "implicit `idx_id` must be reaped when its collection drops"
2836 );
2837 }
2838
2839 #[test]
2844 fn auto_index_id_disabled_by_config() {
2845 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
2846 let rt = RedDBRuntime::with_options(opts).unwrap();
2847
2848 rt.execute_query("CREATE TABLE off (id INT, score INT)")
2849 .unwrap();
2850 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
2851 .unwrap();
2852
2853 assert!(
2854 rt.index_store_ref()
2855 .find_index_for_column("off", "id")
2856 .is_none(),
2857 "with auto_index_id=false, no implicit index should be created"
2858 );
2859 }
2860
2861 #[test]
2864 fn update_single_row_emits_update_event() {
2865 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866 rt.execute_query(
2867 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
2868 )
2869 .unwrap();
2870 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
2871 .unwrap();
2872
2873 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
2874 .unwrap();
2875
2876 let events = queue_payloads(&rt, "audit_log");
2877 assert_eq!(events.len(), 1, "expected exactly 1 update event");
2878 let event = events[0].as_object().expect("event payload object");
2879 assert_eq!(
2880 event.get("op").and_then(crate::json::Value::as_str),
2881 Some("update")
2882 );
2883 assert_eq!(
2884 event.get("collection").and_then(crate::json::Value::as_str),
2885 Some("users")
2886 );
2887 assert!(event
2888 .get("event_id")
2889 .and_then(crate::json::Value::as_str)
2890 .is_some_and(|v| !v.is_empty()));
2891 let before = event
2892 .get("before")
2893 .and_then(crate::json::Value::as_object)
2894 .expect("before must be an object");
2895 let after = event
2896 .get("after")
2897 .and_then(crate::json::Value::as_object)
2898 .expect("after must be an object");
2899 assert_eq!(
2900 before.get("name").and_then(crate::json::Value::as_str),
2901 Some("Alice"),
2902 "before.name should be the old value"
2903 );
2904 assert_eq!(
2905 after.get("name").and_then(crate::json::Value::as_str),
2906 Some("Bob"),
2907 "after.name should be the new value"
2908 );
2909 }
2910
2911 #[test]
2912 fn update_event_only_includes_changed_fields() {
2913 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2914 rt.execute_query(
2915 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
2916 )
2917 .unwrap();
2918 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
2919 .unwrap();
2920
2921 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
2922 .unwrap();
2923
2924 let events = queue_payloads(&rt, "evts");
2925 assert_eq!(events.len(), 1);
2926 let event = events[0].as_object().unwrap();
2927 let before = event
2928 .get("before")
2929 .and_then(crate::json::Value::as_object)
2930 .unwrap();
2931 let after = event
2932 .get("after")
2933 .and_then(crate::json::Value::as_object)
2934 .unwrap();
2935 assert!(
2937 before.contains_key("name"),
2938 "before must include changed field"
2939 );
2940 assert!(
2941 after.contains_key("name"),
2942 "after must include changed field"
2943 );
2944 assert!(
2946 !before.contains_key("email"),
2947 "before must not include unchanged email"
2948 );
2949 assert!(
2950 !after.contains_key("email"),
2951 "after must not include unchanged email"
2952 );
2953 }
2954
2955 #[test]
2956 fn multi_row_update_emits_one_event_per_row() {
2957 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2958 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
2959 .unwrap();
2960 rt.execute_query(
2961 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
2962 )
2963 .unwrap();
2964
2965 rt.execute_query("UPDATE items SET status = 'done'")
2966 .unwrap();
2967
2968 let events = queue_payloads(&rt, "evts");
2969 assert_eq!(events.len(), 3, "expected one update event per row");
2970 for event in &events {
2971 let obj = event.as_object().unwrap();
2972 assert_eq!(
2973 obj.get("op").and_then(crate::json::Value::as_str),
2974 Some("update")
2975 );
2976 }
2977 }
2978
2979 #[test]
2980 fn delete_single_row_emits_delete_event() {
2981 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2982 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
2983 .unwrap();
2984 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
2985 .unwrap();
2986
2987 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
2988
2989 let events = queue_payloads(&rt, "del_log");
2990 assert_eq!(events.len(), 1);
2991 let event = events[0].as_object().expect("event payload object");
2992 assert_eq!(
2993 event.get("op").and_then(crate::json::Value::as_str),
2994 Some("delete")
2995 );
2996 assert_eq!(
2997 event.get("collection").and_then(crate::json::Value::as_str),
2998 Some("users")
2999 );
3000 assert!(event
3001 .get("event_id")
3002 .and_then(crate::json::Value::as_str)
3003 .is_some_and(|v| !v.is_empty()));
3004 let before = event
3005 .get("before")
3006 .and_then(crate::json::Value::as_object)
3007 .expect("before must be an object for delete");
3008 assert_eq!(
3009 before.get("id").and_then(crate::json::Value::as_u64),
3010 Some(42)
3011 );
3012 assert_eq!(
3013 before.get("name").and_then(crate::json::Value::as_str),
3014 Some("Alice")
3015 );
3016 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
3017 }
3018
3019 #[test]
3020 fn multi_row_delete_emits_one_event_per_row() {
3021 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3022 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
3023 .unwrap();
3024 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
3025 .unwrap();
3026
3027 rt.execute_query("DELETE FROM items").unwrap();
3028
3029 let events = queue_payloads(&rt, "del_log");
3030 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
3031 for event in &events {
3032 let obj = event.as_object().unwrap();
3033 assert_eq!(
3034 obj.get("op").and_then(crate::json::Value::as_str),
3035 Some("delete")
3036 );
3037 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
3038 }
3039 }
3040
3041 #[test]
3042 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
3043 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3044 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
3045 .unwrap();
3046
3047 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3048 .unwrap();
3049 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3050
3051 let events = queue_payloads(&rt, "evts");
3052 assert!(
3053 events.is_empty(),
3054 "UPDATE-only filter must not emit INSERT or DELETE events"
3055 );
3056 }
3057
3058 #[test]
3061 fn suppress_events_on_insert_emits_no_events() {
3062 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3063 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3064 .unwrap();
3065
3066 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3067 .unwrap();
3068
3069 let events = queue_payloads(&rt, "evts");
3070 assert!(
3071 events.is_empty(),
3072 "SUPPRESS EVENTS must prevent INSERT events"
3073 );
3074 }
3075
3076 #[test]
3077 fn suppress_events_on_update_emits_no_events() {
3078 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3079 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3080 .unwrap();
3081 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3082 .unwrap();
3083 let _ = queue_payloads(&rt, "evts");
3085 rt.execute_query("QUEUE PURGE evts").unwrap();
3087
3088 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
3089 .unwrap();
3090
3091 let events = queue_payloads(&rt, "evts");
3092 assert!(
3093 events.is_empty(),
3094 "SUPPRESS EVENTS must prevent UPDATE events"
3095 );
3096 }
3097
3098 #[test]
3099 fn suppress_events_on_delete_emits_no_events() {
3100 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3101 rt.execute_query(
3102 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
3103 )
3104 .unwrap();
3105 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3106 .unwrap();
3107
3108 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
3109 .unwrap();
3110
3111 let events = queue_payloads(&rt, "evts");
3112 assert!(
3113 events.is_empty(),
3114 "SUPPRESS EVENTS must prevent DELETE events"
3115 );
3116 }
3117
3118 #[test]
3119 fn normal_insert_after_suppress_still_emits() {
3120 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3121 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3122 .unwrap();
3123
3124 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3125 .unwrap();
3126 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
3127 .unwrap();
3128
3129 let events = queue_payloads(&rt, "evts");
3130 assert_eq!(
3131 events.len(),
3132 1,
3133 "only the non-suppressed INSERT should emit"
3134 );
3135 assert_eq!(
3136 events[0].get("id").and_then(crate::json::Value::as_u64),
3137 Some(2)
3138 );
3139 }
3140}