1use crate::application::entity::{
10 metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput, CreateKvInput,
11 CreateNodeInput, CreateRowInput, CreateRowsBatchInput, CreateVectorInput, DeleteEntityInput,
12 PatchEntityOperation, PatchEntityOperationType, RowUpdateColumnRule, RowUpdateContractPlan,
13};
14use crate::application::ports::{
15 build_row_update_contract_plan, entity_row_fields_snapshot,
16 normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
17 RuntimeEntityPort,
18};
19use crate::application::ttl_payload::has_internal_ttl_metadata;
20use crate::presentation::entity_json::storage_value_to_json;
21use crate::storage::query::ast::{Expr, ReturningItem};
22use crate::storage::query::sql_lowering::{
23 effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
24};
25use crate::storage::query::unified::{sys_key_red_entity_id, UnifiedRecord, UnifiedResult};
26use crate::storage::unified::MetadataValue;
27use crate::storage::Metadata;
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use super::*;
32
33const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
34const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
35const TREE_METADATA_PREFIX: &str = "red.tree.";
36
37#[derive(Clone)]
38struct CompiledUpdateAssignment {
39 column: String,
40 expr: Expr,
41 metadata_key: Option<&'static str>,
42 row_rule: Option<RowUpdateColumnRule>,
43}
44
45struct CompiledUpdatePlan {
46 static_field_assignments: Vec<(String, Value)>,
47 static_metadata_assignments: Vec<(String, MetadataValue)>,
48 dynamic_assignments: Vec<CompiledUpdateAssignment>,
49 row_contract_plan: Option<RowUpdateContractPlan>,
50 row_modified_columns: Vec<String>,
51 row_touches_unique_columns: bool,
52}
53
54#[derive(Default)]
55struct MaterializedUpdateAssignments {
56 dynamic_field_assignments: Vec<(String, Value)>,
57 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
58}
59
60impl RedDBRuntime {
61 fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
75 let Some(tenant_col) = self.tenant_column(&query.table) else {
76 return Ok(None);
77 };
78 if query
80 .columns
81 .iter()
82 .any(|c| c.eq_ignore_ascii_case(&tenant_col))
83 {
84 return Ok(None);
85 }
86
87 if let Some(dot_pos) = tenant_col.find('.') {
93 let (root, tail) = tenant_col.split_at(dot_pos);
94 let tail = &tail[1..]; return self.inject_dotted_tenant(query, root, tail);
96 }
97
98 let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
99 return Err(RedDBError::Query(format!(
100 "INSERT into tenant-scoped table '{}' requires an active tenant — \
101 run SET TENANT '<id>' first or name column '{}' explicitly",
102 query.table, tenant_col
103 )));
104 };
105
106 let mut augmented = query.clone();
107 augmented.columns.push(tenant_col);
108 let lit = Value::text(tenant_id.clone());
109 for row in augmented.values.iter_mut() {
110 row.push(lit.clone());
111 }
112 for row in augmented.value_exprs.iter_mut() {
113 row.push(crate::storage::query::ast::Expr::Literal {
114 value: lit.clone(),
115 span: crate::storage::query::ast::Span::synthetic(),
116 });
117 }
118 Ok(Some(augmented))
119 }
120
121 fn inject_dotted_tenant(
131 &self,
132 query: &InsertQuery,
133 root: &str,
134 tail: &str,
135 ) -> RedDBResult<Option<InsertQuery>> {
136 let active_tenant = crate::runtime::impl_core::current_tenant();
137 let mut augmented = query.clone();
138 let root_idx = augmented
139 .columns
140 .iter()
141 .position(|c| c.eq_ignore_ascii_case(root));
142
143 if let Some(idx) = root_idx {
144 for row in augmented.values.iter_mut() {
150 let Some(slot) = row.get_mut(idx) else {
151 continue;
152 };
153 if dotted_tail_already_set(slot, tail) {
154 continue;
155 }
156 let Some(tenant_id) = &active_tenant else {
157 return Err(RedDBError::Query(format!(
158 "INSERT into tenant-scoped table '{}' requires an active tenant — \
159 run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
160 query.table, root, tail
161 )));
162 };
163 *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
164 }
165 for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
169 if let Some(slot) = row.get_mut(idx) {
170 let new_value = augmented
171 .values
172 .get(row_idx)
173 .and_then(|v| v.get(idx))
174 .cloned()
175 .unwrap_or(Value::Null);
176 *slot = crate::storage::query::ast::Expr::Literal {
177 value: new_value,
178 span: crate::storage::query::ast::Span::synthetic(),
179 };
180 }
181 }
182 } else {
183 let Some(tenant_id) = &active_tenant else {
187 return Err(RedDBError::Query(format!(
188 "INSERT into tenant-scoped table '{}' requires an active tenant — \
189 run SET TENANT '<id>' first or name path '{}.{}' explicitly",
190 query.table, root, tail
191 )));
192 };
193 augmented.columns.push(root.to_string());
195 let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
196 for row in augmented.values.iter_mut() {
197 row.push(fresh.clone());
198 }
199 for row in augmented.value_exprs.iter_mut() {
200 row.push(crate::storage::query::ast::Expr::Literal {
201 value: fresh.clone(),
202 span: crate::storage::query::ast::Span::synthetic(),
203 });
204 }
205 }
206
207 Ok(Some(augmented))
208 }
209
210 fn delete_entities_batch(
213 &self,
214 collection: &str,
215 ids: &[EntityId],
216 ) -> RedDBResult<(u64, Vec<u64>)> {
217 if ids.is_empty() {
218 return Ok((0, vec![]));
219 }
220
221 let store = self.db().store();
222 let Some(manager) = store.get_collection(collection) else {
223 return Ok((0, vec![]));
224 };
225
226 let active_xid = self.current_xid();
227 let conn_id = crate::runtime::impl_core::current_connection_id();
228 let mut autocommit_xid = None;
229 let mut tombstoned_ids = Vec::new();
230 let mut tombstoned_entities = Vec::new();
231 let mut physical_delete_ids = Vec::new();
232
233 for &id in ids {
234 let Some(mut entity) = manager.get(id) else {
235 continue;
236 };
237 if matches!(entity.data, EntityData::Row(_)) {
238 let previous_xmax = entity.xmax;
239 if entity.xmax != 0 {
242 continue;
243 }
244
245 let xid = match active_xid {
246 Some(xid) => xid,
247 None => match autocommit_xid {
248 Some(xid) => xid,
249 None => {
250 let mgr = self.snapshot_manager();
251 let xid = mgr.begin();
252 autocommit_xid = Some(xid);
253 xid
254 }
255 },
256 };
257 entity.set_xmax(xid);
258 if manager.update(entity.clone()).is_ok() {
259 if active_xid.is_some() {
260 self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
261 }
262 tombstoned_entities.push(entity);
263 tombstoned_ids.push(id);
264 }
265 } else {
266 physical_delete_ids.push(id);
267 }
268 }
269
270 if let Some(xid) = autocommit_xid {
271 self.snapshot_manager().commit(xid);
272 }
273
274 let mut affected = tombstoned_ids.len() as u64;
275 let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
276 if active_xid.is_some() {
277 store
278 .persist_entities_to_pager(collection, &tombstoned_entities)
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 } else {
281 store
282 .persist_entities_to_pager(collection, &tombstoned_entities)
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 for id in &tombstoned_ids {
285 store.context_index().remove_entity(*id);
286 let lsn = self.cdc_emit(
287 crate::replication::cdc::ChangeOperation::Delete,
288 collection,
289 id.raw(),
290 "entity",
291 );
292 lsns.push(lsn);
293 }
294 }
295
296 let deleted_ids = store
297 .delete_batch(collection, &physical_delete_ids)
298 .map_err(|err| RedDBError::Internal(err.to_string()))?;
299 affected += deleted_ids.len() as u64;
300 for id in &deleted_ids {
301 store.context_index().remove_entity(*id);
302 let lsn = self.cdc_emit(
303 crate::replication::cdc::ChangeOperation::Delete,
304 collection,
305 id.raw(),
306 "entity",
307 );
308 lsns.push(lsn);
309 }
310
311 Ok((affected, lsns))
312 }
313
314 fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
317 if applied.is_empty() {
318 return Ok(Vec::new());
319 }
320
321 let store = self.db().store();
322 if applied.iter().any(|item| item.context_index_dirty) {
323 store.context_index().index_entities(
324 &applied[0].collection,
325 applied
326 .iter()
327 .filter(|item| item.context_index_dirty)
328 .map(|item| &item.entity),
329 );
330 }
331
332 for item in applied {
333 self.refresh_update_secondary_indexes(item)?;
334 }
335
336 let mut lsns = Vec::with_capacity(applied.len());
337 for item in applied {
338 let lsn = self.cdc_emit_prebuilt(
339 crate::replication::cdc::ChangeOperation::Update,
340 &item.collection,
341 &item.entity,
342 "entity",
343 item.metadata.as_ref(),
344 false,
345 );
346 lsns.push(lsn);
347 }
348 Ok(lsns)
349 }
350
351 fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
352 self.persist_applied_entity_mutations(applied)
353 }
354
355 fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
356 if applied.pre_mutation_fields.is_empty() {
357 return Ok(());
358 }
359 let post = entity_row_fields_snapshot(&applied.entity);
360 if post.is_empty() {
361 return Ok(());
362 }
363
364 let indexed_cols = self
365 .index_store_ref()
366 .indexed_columns_set(&applied.collection);
367 if indexed_cols.is_empty() {
368 return Ok(());
369 }
370
371 if let Some(old_version) = applied.replaced_entity.as_ref() {
372 let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
373 .pre_mutation_fields
374 .iter()
375 .filter(|(col, _)| indexed_cols.contains(col))
376 .cloned()
377 .collect();
378 let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
379 .iter()
380 .filter(|(col, _)| indexed_cols.contains(col))
381 .cloned()
382 .collect();
383 if !old_index_fields.is_empty() {
384 self.index_store_ref()
385 .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
386 .map_err(crate::RedDBError::Internal)?;
387 }
388 if !new_index_fields.is_empty() {
389 self.index_store_ref()
390 .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
391 .map_err(crate::RedDBError::Internal)?;
392 }
393 return Ok(());
394 }
395
396 let damage =
397 crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
398 if damage
399 .touched_columns()
400 .into_iter()
401 .any(|col| indexed_cols.contains(col))
402 {
403 self.index_store_ref()
404 .index_entity_update(
405 &applied.collection,
406 applied.id,
407 &applied.pre_mutation_fields,
408 &post,
409 )
410 .map_err(crate::RedDBError::Internal)?;
411 }
412 Ok(())
413 }
414
415 pub fn execute_insert(
420 &self,
421 raw_query: &str,
422 query: &InsertQuery,
423 ) -> RedDBResult<RuntimeQueryResult> {
424 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
425 crate::runtime::collection_contract::CollectionContractGate::check(
431 self,
432 &query.table,
433 crate::runtime::collection_contract::MutationKind::Insert,
434 )?;
435 let augmented_owned;
444 let query = match self.maybe_inject_tenant_column(query)? {
445 Some(new_q) => {
446 augmented_owned = new_q;
447 &augmented_owned
448 }
449 None => query,
450 };
451 self.check_insert_column_policy(query)?;
452
453 let mut inserted_count: u64 = 0;
454 let effective_rows =
455 effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
456
457 let store = self.inner.db.store();
459 let _ = store.get_or_create_collection(&query.table);
460 let declared_model = self
461 .db()
462 .collection_contract_arc(&query.table)
463 .map(|contract| contract.declared_model);
464
465 let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
466 if query.returning.is_some() {
467 Some(Vec::with_capacity(effective_rows.len()))
468 } else {
469 None
470 };
471 let mut returning_result: Option<UnifiedResult> = None;
472
473 if matches!(query.entity_type, InsertEntityType::Row)
474 && !matches!(
475 declared_model,
476 Some(crate::catalog::CollectionModel::TimeSeries)
477 )
478 {
479 let mut rows = Vec::with_capacity(effective_rows.len());
480 for row_values in &effective_rows {
481 if row_values.len() != query.columns.len() {
482 return Err(RedDBError::Query(format!(
483 "INSERT column count ({}) does not match value count ({})",
484 query.columns.len(),
485 row_values.len()
486 )));
487 }
488 let (fields, mut metadata) =
489 split_insert_metadata(self, &query.columns, row_values)?;
490 merge_with_clauses(
491 &mut metadata,
492 query.ttl_ms,
493 query.expires_at_ms,
494 &query.with_metadata,
495 );
496 if let Some(snaps) = returning_snapshots.as_mut() {
497 snaps.push(fields.clone());
498 }
499 rows.push(CreateRowInput {
500 collection: query.table.clone(),
501 fields,
502 metadata,
503 node_links: Vec::new(),
504 vector_links: Vec::new(),
505 });
506 }
507 let outputs = self.create_rows_batch(CreateRowsBatchInput {
508 collection: query.table.clone(),
509 rows,
510 suppress_events: query.suppress_events,
511 })?;
512 inserted_count = outputs.len() as u64;
513
514 if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
522 let time_col = &spec.time_column;
523 if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
525 for row in &effective_rows {
526 if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
527 if *n >= 0 {
528 let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
529 }
530 } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
531 let _ = self.inner.db.hypertables().route(&query.table, *n);
532 }
533 }
534 }
535 }
536
537 if let (Some(items), Some(snaps)) =
538 (query.returning.as_ref(), returning_snapshots.take())
539 {
540 returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
541 }
542 } else {
543 let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
550 Vec::with_capacity(effective_rows.len());
551 let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
552 {
553 Vec::with_capacity(effective_rows.len())
554 } else {
555 Vec::new()
556 };
557 if matches!(
558 query.entity_type,
559 InsertEntityType::Node | InsertEntityType::Edge
560 ) {
561 enum PreparedGraphInsert {
562 Node {
563 fields: Vec<(String, Value)>,
564 input: CreateNodeInput,
565 },
566 Edge {
567 fields: Vec<(String, Value)>,
568 input: CreateEdgeInput,
569 },
570 }
571
572 let mut prepared = Vec::with_capacity(effective_rows.len());
573 for row_values in &effective_rows {
574 if row_values.len() != query.columns.len() {
575 return Err(RedDBError::Query(format!(
576 "INSERT column count ({}) does not match value count ({})",
577 query.columns.len(),
578 row_values.len()
579 )));
580 }
581
582 match query.entity_type {
583 InsertEntityType::Node => {
584 let (node_values, mut metadata) =
585 split_insert_metadata(self, &query.columns, row_values)?;
586 merge_with_clauses(
587 &mut metadata,
588 query.ttl_ms,
589 query.expires_at_ms,
590 &query.with_metadata,
591 );
592 ensure_non_tree_reserved_metadata_entries(&metadata)?;
593 apply_collection_default_ttl_metadata(
594 self,
595 &query.table,
596 &mut metadata,
597 );
598 let (columns, values) = pairwise_columns_values(&node_values);
599 let label = find_column_value_string(&columns, &values, "label")?;
600 let node_type =
601 find_column_value_opt_string(&columns, &values, "node_type");
602 let properties = extract_remaining_properties(
603 &columns,
604 &values,
605 &["label", "node_type"],
606 );
607 prepared.push(PreparedGraphInsert::Node {
608 fields: node_values,
609 input: CreateNodeInput {
610 collection: query.table.clone(),
611 label,
612 node_type,
613 properties,
614 metadata,
615 embeddings: Vec::new(),
616 table_links: Vec::new(),
617 node_links: Vec::new(),
618 },
619 });
620 }
621 InsertEntityType::Edge => {
622 let (edge_values, mut metadata) =
623 split_insert_metadata(self, &query.columns, row_values)?;
624 merge_with_clauses(
625 &mut metadata,
626 query.ttl_ms,
627 query.expires_at_ms,
628 &query.with_metadata,
629 );
630 ensure_non_tree_reserved_metadata_entries(&metadata)?;
631 apply_collection_default_ttl_metadata(
632 self,
633 &query.table,
634 &mut metadata,
635 );
636 let (columns, values) = pairwise_columns_values(&edge_values);
637 let label = find_column_value_string(&columns, &values, "label")?;
638 ensure_non_tree_structural_edge_label(&label)?;
639 let from_id = resolve_edge_endpoint(
640 self.inner.db.store().as_ref(),
641 &query.table,
642 &columns,
643 &values,
644 "from",
645 )?;
646 let to_id = resolve_edge_endpoint(
647 self.inner.db.store().as_ref(),
648 &query.table,
649 &columns,
650 &values,
651 "to",
652 )?;
653 let weight = find_column_value_f32_opt(&columns, &values, "weight");
654 let properties = extract_remaining_properties(
655 &columns,
656 &values,
657 &["label", "from", "to", "weight"],
658 );
659 prepared.push(PreparedGraphInsert::Edge {
660 fields: edge_values,
661 input: CreateEdgeInput {
662 collection: query.table.clone(),
663 label,
664 from: EntityId::new(from_id),
665 to: EntityId::new(to_id),
666 weight,
667 properties,
668 metadata,
669 },
670 });
671 }
672 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
673 }
674 }
675
676 ensure_graph_insert_contract(self, &query.table)?;
677 let mut batch = self.inner.db.batch();
678 for item in prepared {
679 match item {
680 PreparedGraphInsert::Node { fields, input } => {
681 if query.returning.is_some() {
682 returning_field_snaps.push(fields);
683 }
684 let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
685 batch = batch.add_node_with_type(
686 input.collection,
687 input.label,
688 node_type,
689 input.properties.into_iter().collect(),
690 input.metadata.into_iter().collect(),
691 );
692 }
693 PreparedGraphInsert::Edge { fields, input } => {
694 if query.returning.is_some() {
695 returning_field_snaps.push(fields);
696 }
697 batch = batch.add_edge(
698 input.collection,
699 input.label,
700 input.from,
701 input.to,
702 input.weight.unwrap_or(1.0),
703 input.properties.into_iter().collect(),
704 input.metadata.into_iter().collect(),
705 );
706 }
707 }
708 }
709 let batch_result = batch
710 .execute()
711 .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
712 let (ids, entity_kind) = match query.entity_type {
713 InsertEntityType::Node => (batch_result.nodes, "graph_node"),
714 InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
715 _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
716 };
717 for id in &ids {
718 self.stamp_xmin_if_in_txn(&query.table, *id);
719 }
720 self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
721 entity_outputs.extend(ids.iter().map(|id| {
722 crate::application::entity::CreateEntityOutput {
723 id: *id,
724 entity: None,
725 }
726 }));
727 inserted_count = ids.len() as u64;
728 } else {
729 for row_values in &effective_rows {
730 if row_values.len() != query.columns.len() {
731 return Err(RedDBError::Query(format!(
732 "INSERT column count ({}) does not match value count ({})",
733 query.columns.len(),
734 row_values.len()
735 )));
736 }
737
738 match query.entity_type {
739 InsertEntityType::Row => {
740 if query.returning.is_some() {
741 return Err(RedDBError::Query(
742 "RETURNING is not yet supported for this INSERT path (TimeSeries)"
743 .to_string(),
744 ));
745 }
746 let (fields, mut metadata) =
747 split_insert_metadata(self, &query.columns, row_values)?;
748 merge_with_clauses(
749 &mut metadata,
750 query.ttl_ms,
751 query.expires_at_ms,
752 &query.with_metadata,
753 );
754 self.insert_timeseries_point(&query.table, fields, metadata)?;
755 }
756 InsertEntityType::Node | InsertEntityType::Edge => {
757 unreachable!("NODE and EDGE are handled by the prepared graph path")
758 }
759 InsertEntityType::Vector => {
760 let (vector_values, mut metadata) =
761 split_insert_metadata(self, &query.columns, row_values)?;
762 merge_with_clauses(
763 &mut metadata,
764 query.ttl_ms,
765 query.expires_at_ms,
766 &query.with_metadata,
767 );
768 let (columns, values) = pairwise_columns_values(&vector_values);
769 let dense = find_column_value_vec_f32_any(
770 &columns,
771 &values,
772 &["dense", "embedding"],
773 )?;
774 merge_vector_metadata_column(&mut metadata, &columns, &values)?;
775 let content =
776 find_column_value_opt_string(&columns, &values, "content");
777 if query.returning.is_some() {
778 returning_field_snaps.push(vector_values.clone());
779 }
780 let input = CreateVectorInput {
781 collection: query.table.clone(),
782 dense,
783 content,
784 metadata,
785 link_row: None,
786 link_node: None,
787 };
788 entity_outputs.push(self.create_vector(input)?);
789 }
790 InsertEntityType::Document => {
791 let (document_values, mut metadata) =
792 split_insert_metadata(self, &query.columns, row_values)?;
793 merge_with_clauses(
794 &mut metadata,
795 query.ttl_ms,
796 query.expires_at_ms,
797 &query.with_metadata,
798 );
799 let (columns, values) = pairwise_columns_values(&document_values);
800 let body_str = find_column_value_string(&columns, &values, "body")?;
801 let body: crate::json::Value = crate::json::from_str(&body_str)
802 .map_err(|e| {
803 RedDBError::Query(format!("invalid JSON body: {e}"))
804 })?;
805 if query.returning.is_some() {
806 returning_field_snaps.push(document_values.clone());
807 }
808 let input = CreateDocumentInput {
809 collection: query.table.clone(),
810 body,
811 metadata,
812 node_links: Vec::new(),
813 vector_links: Vec::new(),
814 };
815 entity_outputs.push(self.create_document(input)?);
816 }
817 InsertEntityType::Kv => {
818 let (kv_values, mut metadata) =
819 split_insert_metadata(self, &query.columns, row_values)?;
820 merge_with_clauses(
821 &mut metadata,
822 query.ttl_ms,
823 query.expires_at_ms,
824 &query.with_metadata,
825 );
826 let (columns, values) = pairwise_columns_values(&kv_values);
827 let key = find_column_value_string(&columns, &values, "key")?;
828 let value = find_column_value(&columns, &values, "value")?;
829 if query.returning.is_some() {
830 returning_field_snaps.push(kv_values.clone());
831 }
832 let input = CreateKvInput {
833 collection: query.table.clone(),
834 key,
835 value,
836 metadata,
837 };
838 entity_outputs.push(self.create_kv(input)?);
839 }
840 }
841
842 inserted_count += 1;
843 }
844 }
845
846 if let Some(items) = query.returning.as_ref() {
847 if !entity_outputs.is_empty() {
848 returning_result = Some(build_returning_result(
849 items,
850 &returning_field_snaps,
851 Some(&entity_outputs),
852 ));
853 }
854 }
855 }
856
857 if let Some(ref embed_config) = query.auto_embed {
859 let store = self.inner.db.store();
860 let provider = crate::ai::parse_provider(&embed_config.provider)?;
861 let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
862 let model = embed_config.model.clone().unwrap_or_else(|| {
863 std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
864 .ok()
865 .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
866 });
867
868 let manager = store
870 .get_collection(&query.table)
871 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
872 let entities = manager.query_all(|_| true);
873 let recent: Vec<_> = entities
874 .into_iter()
875 .rev()
876 .take(effective_rows.len())
877 .collect();
878
879 let entity_combos: Vec<(usize, String)> = recent
881 .iter()
882 .enumerate()
883 .filter_map(|(i, entity)| {
884 if let EntityData::Row(ref row) = entity.data {
885 if let Some(ref named) = row.named {
886 let texts: Vec<String> = embed_config
887 .fields
888 .iter()
889 .filter_map(|field| match named.get(field) {
890 Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
891 _ => None,
892 })
893 .collect();
894 if !texts.is_empty() {
895 return Some((i, texts.join(" ")));
896 }
897 }
898 }
899 None
900 })
901 .collect();
902
903 if !entity_combos.is_empty() {
904 let batch_texts: Vec<String> =
906 entity_combos.iter().map(|(_, t)| t.clone()).collect();
907
908 let batch_client =
909 crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
910
911 let embeddings = match tokio::runtime::Handle::try_current() {
912 Ok(handle) => tokio::task::block_in_place(|| {
913 handle.block_on(batch_client.embed_batch(
914 &provider,
915 &model,
916 &api_key,
917 batch_texts,
918 ))
919 }),
920 Err(_) => {
921 return Err(RedDBError::Query(
922 "AUTO EMBED requires a Tokio runtime context".to_string(),
923 ));
924 }
925 }
926 .map_err(|e| RedDBError::Query(e.to_string()))?;
927
928 for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
930 if dense.is_empty() {
931 continue;
932 }
933 self.create_vector(CreateVectorInput {
934 collection: query.table.clone(),
935 dense,
936 content: Some(combined.clone()),
937 metadata: Vec::new(),
938 link_row: None,
939 link_node: None,
940 })?;
941 }
942 }
943 }
944
945 if inserted_count > 0 {
946 self.note_table_write(&query.table);
947 }
948
949 let mut result = RuntimeQueryResult::dml_result(
950 raw_query.to_string(),
951 inserted_count,
952 "insert",
953 "runtime-dml",
954 );
955 if let Some(returning) = returning_result {
956 result.result = returning;
957 }
958 Ok(result)
959 }
960
961 fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
962 let Some(auth_store) = self.inner.auth_store.read().clone() else {
963 return Ok(());
964 };
965 if !auth_store.iam_authorization_enabled() {
966 return Ok(());
967 }
968 let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
969 return Ok(());
970 };
971
972 let tenant = crate::runtime::impl_core::current_tenant();
973 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
974 let request = crate::auth::ColumnAccessRequest {
975 action: "insert".to_string(),
976 schema: None,
977 table: query.table.clone(),
978 columns: query.columns.clone(),
979 };
980 let ctx = crate::auth::policies::EvalContext {
981 principal_tenant: tenant.clone(),
982 current_tenant: tenant,
983 peer_ip: None,
984 mfa_present: false,
985 now_ms: crate::auth::now_ms(),
986 principal_is_admin_role: role == crate::auth::Role::Admin,
987 };
988
989 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
990 let table_allowed = matches!(
991 outcome.table_decision,
992 crate::auth::policies::Decision::Allow { .. }
993 | crate::auth::policies::Decision::AdminBypass
994 );
995 if !table_allowed {
996 return Err(RedDBError::Query(format!(
997 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
998 outcome.table_resource.kind, outcome.table_resource.name
999 )));
1000 }
1001 if let Some(denied) = outcome.first_denied_column() {
1002 return Err(RedDBError::Query(format!(
1003 "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1004 denied.resource.kind, denied.resource.name
1005 )));
1006 }
1007
1008 Ok(())
1009 }
1010
1011 pub(crate) fn insert_timeseries_point(
1012 &self,
1013 collection: &str,
1014 fields: Vec<(String, Value)>,
1015 mut metadata: Vec<(String, MetadataValue)>,
1016 ) -> RedDBResult<EntityId> {
1017 apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1018
1019 let (columns, values) = pairwise_columns_values(&fields);
1020 validate_timeseries_insert_columns(&columns)?;
1021
1022 let metric = find_column_value_string(&columns, &values, "metric")?;
1023 let value = find_column_value_f64(&columns, &values, "value")?;
1024 let timestamp_ns =
1025 find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1026 let tags = find_timeseries_tags(&columns, &values)?;
1027
1028 let mut entity = UnifiedEntity::new(
1029 EntityId::new(0),
1030 EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1031 series: collection.to_string(),
1032 metric: metric.clone(),
1033 })),
1034 EntityData::TimeSeries(crate::storage::TimeSeriesData {
1035 metric,
1036 timestamp_ns,
1037 value,
1038 tags,
1039 }),
1040 );
1041 let writer_xid = match self.current_xid() {
1045 Some(xid) => xid,
1046 None => {
1047 let mgr = self.snapshot_manager();
1048 let xid = mgr.begin();
1049 mgr.commit(xid);
1050 xid
1051 }
1052 };
1053 entity.set_xmin(writer_xid);
1054
1055 let store = self.inner.db.store();
1056 let id = store
1057 .insert_auto(collection, entity)
1058 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1059
1060 if !metadata.is_empty() {
1061 let _ = store.set_metadata(
1062 collection,
1063 id,
1064 Metadata::with_fields(metadata.into_iter().collect()),
1065 );
1066 }
1067
1068 self.cdc_emit(
1069 crate::replication::cdc::ChangeOperation::Insert,
1070 collection,
1071 id.raw(),
1072 "timeseries",
1073 );
1074
1075 Ok(id)
1076 }
1077
1078 pub fn execute_update(
1083 &self,
1084 raw_query: &str,
1085 query: &UpdateQuery,
1086 ) -> RedDBResult<RuntimeQueryResult> {
1087 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1088 crate::runtime::collection_contract::CollectionContractGate::check(
1094 self,
1095 &query.table,
1096 crate::runtime::collection_contract::MutationKind::Update,
1097 )?;
1098
1099 let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1105 let augmented_query: UpdateQuery;
1106 let effective_query: &UpdateQuery = if rls_gated {
1107 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1108 self,
1109 &query.table,
1110 crate::storage::query::ast::PolicyAction::Update,
1111 );
1112 let Some(policy) = rls_filter else {
1113 let mut response = RuntimeQueryResult::dml_result(
1116 raw_query.to_string(),
1117 0,
1118 "update",
1119 "runtime-dml-rls",
1120 );
1121 if let Some(items) = query.returning.clone() {
1122 response.result = build_returning_result(&items, &[], None);
1123 }
1124 return Ok(response);
1125 };
1126 let mut augmented = query.clone();
1127 augmented.filter = Some(match augmented.filter.take() {
1128 Some(existing) => {
1129 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1130 }
1131 None => policy,
1132 });
1133 augmented_query = augmented;
1134 &augmented_query
1135 } else {
1136 query
1137 };
1138
1139 if let Some(items) = effective_query.returning.clone() {
1144 let mut inner_query = effective_query.clone();
1145 inner_query.returning = None;
1146 let (mut response, touched_ids) =
1147 self.execute_update_inner_tracked(raw_query, &inner_query)?;
1148
1149 let snapshots = super::dml_target_scan::DmlTargetScan::new(
1150 self,
1151 &effective_query.table,
1152 None,
1153 None,
1154 )
1155 .row_snapshots(&touched_ids);
1156
1157 response.result = build_returning_result(&items, &snapshots, None);
1158 response.engine = "runtime-dml-returning";
1159 return Ok(response);
1160 }
1161
1162 self.execute_update_inner(raw_query, effective_query)
1163 }
1164
1165 fn execute_update_inner(
1167 &self,
1168 raw_query: &str,
1169 query: &UpdateQuery,
1170 ) -> RedDBResult<RuntimeQueryResult> {
1171 self.execute_update_inner_tracked(raw_query, query)
1172 .map(|(res, _)| res)
1173 }
1174
1175 fn execute_update_inner_tracked(
1176 &self,
1177 raw_query: &str,
1178 query: &UpdateQuery,
1179 ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1180 let store = self.inner.db.store();
1181 let effective_filter = effective_update_filter(query);
1182 let compiled_plan = self.compile_update_plan(query)?;
1183 let mut touched_ids: Vec<EntityId> = Vec::new();
1184 let limit_cap = query.limit.map(|l| l as usize);
1185 let manager = store
1186 .get_collection(&query.table)
1187 .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1188 let ids_to_update = super::dml_target_scan::DmlTargetScan::new(
1189 self,
1190 &query.table,
1191 effective_filter.as_ref(),
1192 limit_cap,
1193 )
1194 .find_target_ids()?;
1195
1196 let mut affected: u64 = 0;
1197 for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1198 let mut applied_chunk = Vec::with_capacity(chunk.len());
1199 for entity in manager.get_many(chunk).into_iter().flatten() {
1200 let assignments =
1201 self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1202 let applied = self.apply_materialized_update_for_entity(
1203 query.table.clone(),
1204 entity,
1205 &compiled_plan,
1206 assignments,
1207 )?;
1208 touched_ids.push(applied.id);
1209 applied_chunk.push(applied);
1210 }
1211 self.persist_update_chunk(&applied_chunk)?;
1212 affected += applied_chunk.len() as u64;
1213 let lsns = self.flush_update_chunk(&applied_chunk)?;
1214 if !query.suppress_events {
1215 self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1216 }
1217 }
1218
1219 if affected > 0 {
1220 self.note_table_write(&query.table);
1221 }
1222
1223 Ok((
1224 RuntimeQueryResult::dml_result(
1225 raw_query.to_string(),
1226 affected,
1227 "update",
1228 "runtime-dml",
1229 ),
1230 touched_ids,
1231 ))
1232 }
1233
1234 fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1235 let mut static_field_assignments = Vec::new();
1236 let mut static_metadata_assignments = Vec::new();
1237 let mut dynamic_assignments = Vec::new();
1238 let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1239 let mut row_modified_columns = Vec::new();
1240
1241 for (column, expr) in &query.assignment_exprs {
1242 let metadata_key = resolve_sql_ttl_metadata_key(column);
1243 if let Ok(value) = fold_expr_to_value(expr.clone()) {
1244 if let Some(metadata_key) = metadata_key {
1245 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1246 let (canonical_key, canonical_value) =
1247 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1248 static_metadata_assignments.push((canonical_key.to_string(), canonical_value));
1249 } else {
1250 let value = self.resolve_crypto_sentinel(value)?;
1251 static_field_assignments.push((
1252 column.clone(),
1253 normalize_row_update_assignment_with_plan(
1254 &query.table,
1255 column,
1256 value,
1257 row_contract_plan.as_ref(),
1258 )?,
1259 ));
1260 row_modified_columns.push(column.clone());
1261 }
1262 continue;
1263 }
1264
1265 dynamic_assignments.push(CompiledUpdateAssignment {
1266 column: column.clone(),
1267 expr: expr.clone(),
1268 metadata_key,
1269 row_rule: if metadata_key.is_none() {
1270 if let Some(plan) = row_contract_plan.as_ref() {
1271 if plan.timestamps_enabled
1272 && (column == "created_at" || column == "updated_at")
1273 {
1274 return Err(RedDBError::Query(format!(
1275 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1276 query.table, column
1277 )));
1278 }
1279 if let Some(rule) = plan.declared_rules.get(column) {
1280 Some(rule.clone())
1281 } else if plan.strict_schema {
1282 return Err(RedDBError::Query(format!(
1283 "collection '{}' is strict and does not allow undeclared fields: {}",
1284 query.table, column
1285 )));
1286 } else {
1287 None
1288 }
1289 } else {
1290 None
1291 }
1292 } else {
1293 None
1294 },
1295 });
1296 if metadata_key.is_none() {
1297 row_modified_columns.push(column.clone());
1298 }
1299 }
1300
1301 let row_modified_columns = dedupe_update_columns(row_modified_columns);
1302 let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1303 row_modified_columns.iter().any(|column| {
1304 plan.unique_columns
1305 .keys()
1306 .any(|unique| unique.eq_ignore_ascii_case(column))
1307 })
1308 });
1309
1310 if let Some(ttl_ms) = query.ttl_ms {
1311 static_metadata_assignments
1312 .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1313 }
1314 if let Some(expires_at_ms) = query.expires_at_ms {
1315 static_metadata_assignments.push((
1316 "_expires_at".to_string(),
1317 metadata_u64_to_value(expires_at_ms),
1318 ));
1319 }
1320 for (key, val) in &query.with_metadata {
1321 static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1322 }
1323
1324 Ok(CompiledUpdatePlan {
1325 static_field_assignments,
1326 static_metadata_assignments,
1327 dynamic_assignments,
1328 row_contract_plan,
1329 row_modified_columns,
1330 row_touches_unique_columns,
1331 })
1332 }
1333
1334 fn materialize_update_assignments_for_entity(
1335 &self,
1336 query: &UpdateQuery,
1337 entity: &UnifiedEntity,
1338 compiled_plan: &CompiledUpdatePlan,
1339 ) -> RedDBResult<MaterializedUpdateAssignments> {
1340 let mut assignments = MaterializedUpdateAssignments::default();
1341 let mut record: Option<UnifiedRecord> = None;
1342
1343 for assignment in &compiled_plan.dynamic_assignments {
1344 if record.is_none() {
1345 record = runtime_any_record_from_entity_ref(entity);
1346 }
1347 let Some(record) = record.as_ref() else {
1348 return Err(RedDBError::Query(format!(
1349 "UPDATE could not materialize runtime record for entity {} in '{}'",
1350 entity.id.raw(),
1351 query.table
1352 )));
1353 };
1354 let value = super::expr_eval::evaluate_runtime_expr_with_db(
1355 Some(self.inner.db.as_ref()),
1356 &assignment.expr,
1357 record,
1358 Some(query.table.as_str()),
1359 Some(query.table.as_str()),
1360 )
1361 .ok_or_else(|| {
1362 RedDBError::Query(format!(
1363 "failed to evaluate UPDATE expression for column '{}'",
1364 assignment.column
1365 ))
1366 })?;
1367
1368 if let Some(metadata_key) = assignment.metadata_key {
1369 let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1370 let (canonical_key, canonical_value) =
1371 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1372 assignments
1373 .dynamic_metadata_assignments
1374 .push((canonical_key.to_string(), canonical_value));
1375 } else {
1376 assignments.dynamic_field_assignments.push((
1377 assignment.column.clone(),
1378 normalize_row_update_value_for_rule(
1379 &query.table,
1380 self.resolve_crypto_sentinel(value)?,
1381 assignment.row_rule.as_ref(),
1382 )?,
1383 ));
1384 }
1385 }
1386
1387 Ok(assignments)
1388 }
1389
1390 fn apply_materialized_update_for_entity(
1391 &self,
1392 collection: String,
1393 entity: UnifiedEntity,
1394 compiled_plan: &CompiledUpdatePlan,
1395 assignments: MaterializedUpdateAssignments,
1396 ) -> RedDBResult<AppliedEntityMutation> {
1397 if matches!(entity.data, EntityData::Row(_)) {
1398 return self.apply_loaded_sql_update_row_core(
1399 collection,
1400 entity,
1401 &compiled_plan.static_field_assignments,
1402 assignments.dynamic_field_assignments,
1403 &compiled_plan.static_metadata_assignments,
1404 assignments.dynamic_metadata_assignments,
1405 compiled_plan.row_contract_plan.as_ref(),
1406 &compiled_plan.row_modified_columns,
1407 compiled_plan.row_touches_unique_columns,
1408 );
1409 }
1410
1411 self.apply_loaded_patch_entity_core(
1412 collection,
1413 entity,
1414 crate::json::Value::Null,
1415 build_patch_operations_from_materialized_assignments(compiled_plan, assignments),
1416 )
1417 }
1418
1419 pub fn execute_delete(
1421 &self,
1422 raw_query: &str,
1423 query: &DeleteQuery,
1424 ) -> RedDBResult<RuntimeQueryResult> {
1425 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1426 crate::runtime::collection_contract::CollectionContractGate::check(
1430 self,
1431 &query.table,
1432 crate::runtime::collection_contract::MutationKind::Delete,
1433 )?;
1434
1435 if let Some(items) = query.returning.clone() {
1442 let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1443 RedDBError::Query(
1444 "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1445 )
1446 })?;
1447 let captured = self.execute_query(&select_sql)?;
1448
1449 let mut inner_query = query.clone();
1450 inner_query.returning = None;
1451 let _ = self.execute_delete(raw_query, &inner_query)?;
1452
1453 let snapshots: Vec<Vec<(String, Value)>> = captured
1454 .result
1455 .records
1456 .iter()
1457 .map(|rec| {
1458 rec.iter_fields()
1459 .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1460 .collect()
1461 })
1462 .collect();
1463 let affected = snapshots.len() as u64;
1464 let result = build_returning_result(&items, &snapshots, None);
1465
1466 let mut response = RuntimeQueryResult::dml_result(
1467 raw_query.to_string(),
1468 affected,
1469 "delete",
1470 "runtime-dml-returning",
1471 );
1472 response.result = result;
1473 return Ok(response);
1474 }
1475 if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1482 let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1483 self,
1484 &query.table,
1485 crate::storage::query::ast::PolicyAction::Delete,
1486 );
1487 let Some(policy) = rls_filter else {
1488 return Ok(RuntimeQueryResult::dml_result(
1489 raw_query.to_string(),
1490 0,
1491 "delete",
1492 "runtime-dml-rls",
1493 ));
1494 };
1495 let mut augmented = query.clone();
1500 augmented.filter = Some(match augmented.filter.take() {
1501 Some(existing) => {
1502 crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1503 }
1504 None => policy,
1505 });
1506 return self.execute_delete_inner(raw_query, &augmented);
1507 }
1508 self.execute_delete_inner(raw_query, query)
1509 }
1510
1511 fn execute_delete_inner(
1512 &self,
1513 raw_query: &str,
1514 query: &DeleteQuery,
1515 ) -> RedDBResult<RuntimeQueryResult> {
1516 let effective_filter = effective_delete_filter(query);
1517
1518 let scan = super::dml_target_scan::DmlTargetScan::new(
1522 self,
1523 &query.table,
1524 effective_filter.as_ref(),
1525 None,
1526 );
1527 let ids_to_delete = scan.find_target_ids()?;
1528
1529 let needs_delete_events =
1532 !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1533 let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1534 scan.row_json_pre_images(&ids_to_delete)
1535 } else {
1536 HashMap::new()
1537 };
1538
1539 let mut affected: u64 = 0;
1540 for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1541 let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1542 affected += count;
1543 if needs_delete_events && !lsns.is_empty() {
1544 let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1548 self.emit_delete_events_for_collection(
1549 &query.table,
1550 deleted_chunk,
1551 &lsns,
1552 &pre_images,
1553 )?;
1554 }
1555 }
1556 pre_images.clear();
1557
1558 if affected > 0 {
1559 self.note_table_write(&query.table);
1560 }
1561
1562 Ok(RuntimeQueryResult::dml_result(
1563 raw_query.to_string(),
1564 affected,
1565 "delete",
1566 "runtime-dml",
1567 ))
1568 }
1569}
1570
1571fn build_patch_operations_from_materialized_assignments(
1572 compiled_plan: &CompiledUpdatePlan,
1573 assignments: MaterializedUpdateAssignments,
1574) -> Vec<PatchEntityOperation> {
1575 let mut operations = Vec::with_capacity(
1576 compiled_plan.static_field_assignments.len()
1577 + compiled_plan.static_metadata_assignments.len()
1578 + assignments.dynamic_field_assignments.len()
1579 + assignments.dynamic_metadata_assignments.len(),
1580 );
1581
1582 for (column, value) in &compiled_plan.static_field_assignments {
1583 operations.push(PatchEntityOperation {
1584 op: PatchEntityOperationType::Set,
1585 path: vec!["fields".to_string(), column.clone()],
1586 value: Some(storage_value_to_json(value)),
1587 });
1588 }
1589
1590 for (column, value) in assignments.dynamic_field_assignments {
1591 operations.push(PatchEntityOperation {
1592 op: PatchEntityOperationType::Set,
1593 path: vec!["fields".to_string(), column],
1594 value: Some(storage_value_to_json(&value)),
1595 });
1596 }
1597
1598 for (key, value) in &compiled_plan.static_metadata_assignments {
1599 operations.push(PatchEntityOperation {
1600 op: PatchEntityOperationType::Set,
1601 path: vec!["metadata".to_string(), key.clone()],
1602 value: Some(metadata_value_to_json(value)),
1603 });
1604 }
1605
1606 for (key, value) in assignments.dynamic_metadata_assignments {
1607 operations.push(PatchEntityOperation {
1608 op: PatchEntityOperationType::Set,
1609 path: vec!["metadata".to_string(), key],
1610 value: Some(metadata_value_to_json(&value)),
1611 });
1612 }
1613
1614 operations
1615}
1616
1617fn delete_to_select_sql(sql: &str) -> Option<String> {
1627 let trimmed = sql.trim_start();
1628 let lowered = trimmed.to_ascii_lowercase();
1629 if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1630 return None;
1631 }
1632 let from_idx = lowered.find(" from ")?;
1634 let after_from = &trimmed[from_idx + " from ".len()..];
1635 let after_from_lc = &lowered[from_idx + " from ".len()..];
1636
1637 let mut body = after_from.to_string();
1642 if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1643 body.truncate(pos);
1644 }
1645 Some(format!("SELECT * FROM {}", body.trim_end()))
1646}
1647
1648fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1653 let bytes = haystack.as_bytes();
1654 let nlen = needle.len();
1655 let mut i = 0usize;
1656 let mut in_string = false;
1657 while i < bytes.len() {
1658 let c = bytes[i];
1659 if c == b'\'' {
1660 in_string = !in_string;
1661 i += 1;
1662 continue;
1663 }
1664 if !in_string
1665 && i + nlen <= bytes.len()
1666 && &bytes[i..i + nlen] == needle.as_bytes()
1667 && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1668 && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1669 {
1670 return Some(i);
1671 }
1672 i += 1;
1673 }
1674 None
1675}
1676
1677fn build_returning_result(
1684 items: &[ReturningItem],
1685 snapshots: &[Vec<(String, Value)>],
1686 outputs: Option<&[crate::application::entity::CreateEntityOutput]>,
1687) -> UnifiedResult {
1688 let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1689
1690 let mut columns: Vec<String> = if project_all {
1691 let mut cols: Vec<String> = Vec::new();
1692 if outputs.is_some() {
1693 cols.push("red_entity_id".to_string());
1694 }
1695 if let Some(first) = snapshots.first() {
1696 for (name, _) in first {
1697 cols.push(name.clone());
1698 }
1699 }
1700 cols
1701 } else {
1702 items
1703 .iter()
1704 .filter_map(|it| match it {
1705 ReturningItem::Column(c) => Some(c.clone()),
1706 ReturningItem::All => None,
1707 })
1708 .collect()
1709 };
1710 {
1712 let mut seen = std::collections::HashSet::new();
1713 columns.retain(|c| seen.insert(c.clone()));
1714 }
1715
1716 let id_key = sys_key_red_entity_id();
1717 let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1718 for (idx, snap) in snapshots.iter().enumerate() {
1719 let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1720 if let Some(outs) = outputs {
1721 if let Some(out) = outs.get(idx) {
1722 values.insert(Arc::clone(&id_key), Value::Integer(out.id.raw() as i64));
1723 }
1724 }
1725 for (name, val) in snap {
1726 values.insert(Arc::from(name.as_str()), val.clone());
1727 }
1728 let mut rec = UnifiedRecord::default();
1729 for col in &columns {
1731 if let Some(v) = values.get(col.as_str()) {
1732 rec.set_arc(Arc::from(col.as_str()), v.clone());
1733 }
1734 }
1735 records.push(rec);
1736 }
1737
1738 UnifiedResult {
1739 columns,
1740 records,
1741 stats: Default::default(),
1742 pre_serialized_json: None,
1743 }
1744}
1745
1746fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
1747 let db = runtime.db();
1748 if let Some(contract) = db.collection_contract(collection) {
1749 let advisory_implicit_dynamic = matches!(
1750 (&contract.origin, &contract.schema_mode),
1751 (
1752 crate::physical::ContractOrigin::Implicit,
1753 crate::catalog::SchemaMode::Dynamic,
1754 )
1755 );
1756 if advisory_implicit_dynamic
1757 || matches!(
1758 contract.declared_model,
1759 crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
1760 )
1761 {
1762 return Ok(());
1763 }
1764 return Err(RedDBError::InvalidOperation(format!(
1765 "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
1766 collection, contract.declared_model
1767 )));
1768 }
1769
1770 let now = std::time::SystemTime::now()
1771 .duration_since(std::time::UNIX_EPOCH)
1772 .unwrap_or_default()
1773 .as_millis();
1774 db.save_collection_contract(crate::physical::CollectionContract {
1775 name: collection.to_string(),
1776 declared_model: crate::catalog::CollectionModel::Graph,
1777 schema_mode: crate::catalog::SchemaMode::Dynamic,
1778 origin: crate::physical::ContractOrigin::Implicit,
1779 version: 1,
1780 created_at_unix_ms: now,
1781 updated_at_unix_ms: now,
1782 default_ttl_ms: db.collection_default_ttl_ms(collection),
1783 vector_dimension: None,
1784 vector_metric: None,
1785 context_index_fields: Vec::new(),
1786 declared_columns: Vec::new(),
1787 table_def: None,
1788 timestamps_enabled: false,
1789 context_index_enabled: false,
1790 append_only: false,
1791 subscriptions: Vec::new(),
1792 })
1793 .map(|_| ())
1794 .map_err(|err| RedDBError::Internal(err.to_string()))
1795}
1796
1797fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
1798 if columns.is_empty() {
1799 return columns;
1800 }
1801
1802 let mut unique = Vec::with_capacity(columns.len());
1803 for column in columns.drain(..) {
1804 if !unique
1805 .iter()
1806 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1807 {
1808 unique.push(column);
1809 }
1810 }
1811 unique
1812}
1813
1814const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
1819
1820fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
1821 if column.eq_ignore_ascii_case("_ttl") {
1822 Some(SQL_TTL_METADATA_COLUMNS[0])
1823 } else if column.eq_ignore_ascii_case("_ttl_ms") {
1824 Some(SQL_TTL_METADATA_COLUMNS[1])
1825 } else if column.eq_ignore_ascii_case("_expires_at") {
1826 Some(SQL_TTL_METADATA_COLUMNS[2])
1827 } else {
1828 None
1829 }
1830}
1831
1832fn canonicalize_sql_ttl_metadata(
1837 key: &'static str,
1838 value: MetadataValue,
1839) -> (&'static str, MetadataValue) {
1840 if key != "_ttl" {
1841 return (key, value);
1842 }
1843 let scaled = match value {
1844 MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
1845 MetadataValue::Timestamp(ms_or_s) => {
1846 MetadataValue::Timestamp(ms_or_s)
1849 }
1850 MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
1851 other => other,
1852 };
1853 ("_ttl_ms", scaled)
1854}
1855
1856pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
1860
1861impl RedDBRuntime {
1862 pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
1868 match value {
1869 Value::Password(marked) => {
1870 if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
1871 Ok(Value::Password(crate::auth::store::hash_password(plain)))
1872 } else {
1873 Ok(Value::Password(marked))
1874 }
1875 }
1876 Value::Secret(bytes) => {
1877 if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
1878 if !self.secret_auto_encrypt() {
1879 return Err(RedDBError::Query(
1880 "SECRET() literal rejected: red.config.secret.auto_encrypt \
1881 is false. Insert pre-encrypted bytes directly instead."
1882 .to_string(),
1883 ));
1884 }
1885 let key = self.secret_aes_key().ok_or_else(|| {
1886 RedDBError::Query(
1887 "SECRET() column encryption requires a bootstrapped \
1888 vault (red.secret.aes_key is missing). Start the server \
1889 with --vault to enable."
1890 .to_string(),
1891 )
1892 })?;
1893 let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
1894 Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
1895 } else {
1896 Ok(Value::Secret(bytes))
1897 }
1898 }
1899 other => Ok(other),
1900 }
1901 }
1902}
1903
1904fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
1907 let nonce_bytes = crate::auth::store::random_bytes(12);
1908 let mut nonce = [0u8; 12];
1909 nonce.copy_from_slice(&nonce_bytes[..12]);
1910 let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
1911 let mut out = Vec::with_capacity(12 + ct.len());
1912 out.extend_from_slice(&nonce);
1913 out.extend_from_slice(&ct);
1914 out
1915}
1916
1917pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
1921 if payload.len() < 12 {
1922 return None;
1923 }
1924 let mut nonce = [0u8; 12];
1925 nonce.copy_from_slice(&payload[..12]);
1926 crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
1927}
1928
1929fn split_insert_metadata(
1930 runtime: &RedDBRuntime,
1931 columns: &[String],
1932 values: &[Value],
1933) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
1934 let mut fields = Vec::new();
1935 let mut metadata = Vec::new();
1936
1937 for (column, value) in columns.iter().zip(values.iter()) {
1938 if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
1940 let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
1941 let (canonical_key, canonical_value) =
1942 canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1943 metadata.push((canonical_key.to_string(), canonical_value));
1944 continue;
1945 }
1946 fields.push((
1947 column.clone(),
1948 runtime.resolve_crypto_sentinel(value.clone())?,
1949 ));
1950 }
1951
1952 Ok((fields, metadata))
1953}
1954
1955fn merge_with_clauses(
1957 metadata: &mut Vec<(String, MetadataValue)>,
1958 ttl_ms: Option<u64>,
1959 expires_at_ms: Option<u64>,
1960 with_metadata: &[(String, Value)],
1961) {
1962 if let Some(ms) = ttl_ms {
1963 metadata.push((
1964 "_ttl_ms".to_string(),
1965 if ms <= i64::MAX as u64 {
1966 MetadataValue::Int(ms as i64)
1967 } else {
1968 MetadataValue::Timestamp(ms)
1969 },
1970 ));
1971 }
1972 if let Some(ms) = expires_at_ms {
1973 metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
1974 }
1975 for (key, value) in with_metadata {
1976 let meta_value = match value {
1977 Value::Text(s) => MetadataValue::String(s.to_string()),
1978 Value::Integer(n) => MetadataValue::Int(*n),
1979 Value::Float(n) => MetadataValue::Float(*n),
1980 Value::Boolean(b) => MetadataValue::Bool(*b),
1981 _ => MetadataValue::String(value.to_string()),
1982 };
1983 metadata.push((key.clone(), meta_value));
1984 }
1985}
1986
1987fn merge_vector_metadata_column(
1988 metadata: &mut Vec<(String, MetadataValue)>,
1989 columns: &[String],
1990 values: &[Value],
1991) -> RedDBResult<()> {
1992 let Some(value) = columns
1993 .iter()
1994 .position(|column| column.eq_ignore_ascii_case("metadata"))
1995 .map(|index| &values[index])
1996 else {
1997 return Ok(());
1998 };
1999 let json = match value {
2000 Value::Null => return Ok(()),
2001 Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
2002 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2003 })?,
2004 Value::Text(text) => crate::json::from_str(text).map_err(|err| {
2005 RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2006 })?,
2007 other => {
2008 return Err(RedDBError::Query(format!(
2009 "column 'metadata' expected JSON object, got {other:?}"
2010 )))
2011 }
2012 };
2013 let parsed = metadata_from_json(&json)?;
2014 for (key, value) in parsed.iter() {
2015 metadata.push((key.clone(), value.clone()));
2016 }
2017 Ok(())
2018}
2019
2020fn apply_collection_default_ttl_metadata(
2021 runtime: &RedDBRuntime,
2022 collection: &str,
2023 metadata: &mut Vec<(String, MetadataValue)>,
2024) {
2025 if has_internal_ttl_metadata(metadata) {
2026 return;
2027 }
2028
2029 let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
2030 return;
2031 };
2032
2033 metadata.push((
2034 "_ttl_ms".to_string(),
2035 if default_ttl_ms <= i64::MAX as u64 {
2036 MetadataValue::Int(default_ttl_ms as i64)
2037 } else {
2038 MetadataValue::Timestamp(default_ttl_ms)
2039 },
2040 ));
2041}
2042
2043fn ensure_non_tree_reserved_metadata_entries(
2044 metadata: &[(String, MetadataValue)],
2045) -> RedDBResult<()> {
2046 for (key, _) in metadata {
2047 ensure_non_tree_reserved_metadata_key(key)?;
2048 }
2049 Ok(())
2050}
2051
2052fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2053 if key.starts_with(TREE_METADATA_PREFIX) {
2054 return Err(RedDBError::Query(format!(
2055 "metadata key '{}' is reserved for managed trees",
2056 key
2057 )));
2058 }
2059 Ok(())
2060}
2061
2062fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2063 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2064 return Err(RedDBError::Query(format!(
2065 "edge label '{}' is reserved for managed trees",
2066 TREE_CHILD_EDGE_LABEL
2067 )));
2068 }
2069 Ok(())
2070}
2071
2072fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
2073 let mut columns = Vec::with_capacity(pairs.len());
2074 let mut values = Vec::with_capacity(pairs.len());
2075
2076 for (column, value) in pairs {
2077 columns.push(column.clone());
2078 values.push(value.clone());
2079 }
2080
2081 (columns, values)
2082}
2083
2084fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
2086 for (i, col) in columns.iter().enumerate() {
2087 if col.eq_ignore_ascii_case(name) {
2088 return Ok(values[i].clone());
2089 }
2090 }
2091 Err(RedDBError::Query(format!(
2092 "required column '{name}' not found in INSERT"
2093 )))
2094}
2095
2096fn find_column_value_string(
2098 columns: &[String],
2099 values: &[Value],
2100 name: &str,
2101) -> RedDBResult<String> {
2102 let val = find_column_value(columns, values, name)?;
2103 match val {
2104 Value::Text(s) => Ok(s.to_string()),
2105 Value::Integer(n) => Ok(n.to_string()),
2106 Value::Float(n) => Ok(n.to_string()),
2107 other => Err(RedDBError::Query(format!(
2108 "column '{name}' expected text, got {other:?}"
2109 ))),
2110 }
2111}
2112
2113fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
2114 let val = find_column_value(columns, values, name)?;
2115 match val {
2116 Value::Float(n) => Ok(n),
2117 Value::Integer(n) => Ok(n as f64),
2118 Value::UnsignedInteger(n) => Ok(n as f64),
2119 Value::Text(s) => s
2120 .parse::<f64>()
2121 .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
2122 other => Err(RedDBError::Query(format!(
2123 "column '{name}' expected number, got {other:?}"
2124 ))),
2125 }
2126}
2127
2128fn find_column_value_opt_string(
2130 columns: &[String],
2131 values: &[Value],
2132 name: &str,
2133) -> Option<String> {
2134 for (i, col) in columns.iter().enumerate() {
2135 if col.eq_ignore_ascii_case(name) {
2136 return match &values[i] {
2137 Value::Null => None,
2138 Value::Text(s) => Some(s.to_string()),
2139 Value::Integer(n) => Some(n.to_string()),
2140 Value::Float(n) => Some(n.to_string()),
2141 _ => None,
2142 };
2143 }
2144 }
2145 None
2146}
2147
2148fn resolve_edge_endpoint(
2155 store: &crate::storage::unified::UnifiedStore,
2156 collection: &str,
2157 columns: &[String],
2158 values: &[Value],
2159 name: &str,
2160) -> RedDBResult<u64> {
2161 let val = find_column_value(columns, values, name)?;
2162 match val {
2163 Value::Integer(n) => Ok(n as u64),
2164 Value::UnsignedInteger(n) => Ok(n),
2165 Value::Text(s) => {
2166 if let Ok(n) = s.parse::<u64>() {
2167 return Ok(n);
2168 }
2169 let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
2170 match matches.len() {
2171 0 => Err(RedDBError::Query(format!(
2172 "column '{name}': no graph node with label '{s}' in collection '{collection}'"
2173 ))),
2174 1 => Ok(matches[0].raw()),
2175 n => Err(RedDBError::Query(format!(
2176 "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
2177 ))),
2178 }
2179 }
2180 other => Err(RedDBError::Query(format!(
2181 "column '{name}' expected integer or node label, got {other:?}"
2182 ))),
2183 }
2184}
2185
2186fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
2188 let val = find_column_value(columns, values, name)?;
2189 match val {
2190 Value::Integer(n) => Ok(n as u64),
2191 Value::UnsignedInteger(n) => Ok(n),
2192 Value::Text(s) => s
2193 .parse::<u64>()
2194 .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
2195 other => Err(RedDBError::Query(format!(
2196 "column '{name}' expected integer, got {other:?}"
2197 ))),
2198 }
2199}
2200
2201fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
2203 for (i, col) in columns.iter().enumerate() {
2204 if col.eq_ignore_ascii_case(name) {
2205 return match &values[i] {
2206 Value::Float(n) => Some(*n as f32),
2207 Value::Integer(n) => Some(*n as f32),
2208 Value::Null => None,
2209 _ => None,
2210 };
2211 }
2212 }
2213 None
2214}
2215
2216fn find_column_value_vec_f32(
2218 columns: &[String],
2219 values: &[Value],
2220 name: &str,
2221) -> RedDBResult<Vec<f32>> {
2222 let val = find_column_value(columns, values, name)?;
2223 match val {
2224 Value::Vector(v) => Ok(v),
2225 Value::Json(bytes) => {
2226 let s = std::str::from_utf8(&bytes).map_err(|_| {
2228 RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
2229 })?;
2230 let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
2231 RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
2232 })?;
2233 Ok(arr)
2234 }
2235 other => Err(RedDBError::Query(format!(
2236 "column '{name}' expected vector, got {other:?}"
2237 ))),
2238 }
2239}
2240
2241fn find_column_value_vec_f32_any(
2242 columns: &[String],
2243 values: &[Value],
2244 names: &[&str],
2245) -> RedDBResult<Vec<f32>> {
2246 for name in names {
2247 if columns
2248 .iter()
2249 .any(|column| column.eq_ignore_ascii_case(name))
2250 {
2251 return find_column_value_vec_f32(columns, values, name);
2252 }
2253 }
2254 Err(RedDBError::Query(format!(
2255 "required vector column '{}' not found in INSERT",
2256 names.join("' or '")
2257 )))
2258}
2259
2260fn extract_remaining_properties(
2262 columns: &[String],
2263 values: &[Value],
2264 exclude: &[&str],
2265) -> Vec<(String, Value)> {
2266 columns
2267 .iter()
2268 .zip(values.iter())
2269 .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
2270 .map(|(col, val)| (col.clone(), val.clone()))
2271 .collect()
2272}
2273
2274fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
2275 let mut invalid = Vec::new();
2276 for column in columns {
2277 if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
2278 invalid.push(column.clone());
2279 }
2280 }
2281
2282 if invalid.is_empty() {
2283 Ok(())
2284 } else {
2285 Err(RedDBError::Query(format!(
2286 "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
2287 invalid.join(", ")
2288 )))
2289 }
2290}
2291
2292fn is_timeseries_insert_column(column: &str) -> bool {
2293 matches!(
2294 column.to_ascii_lowercase().as_str(),
2295 "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
2296 )
2297}
2298
2299fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
2300 let mut found = None;
2301
2302 for alias in ["timestamp_ns", "timestamp", "time"] {
2303 for (index, column) in columns.iter().enumerate() {
2304 if !column.eq_ignore_ascii_case(alias) {
2305 continue;
2306 }
2307
2308 if found.is_some() {
2309 return Err(RedDBError::Query(
2310 "timeseries INSERT accepts only one timestamp column".to_string(),
2311 ));
2312 }
2313
2314 found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
2315 }
2316 }
2317
2318 Ok(found)
2319}
2320
2321fn find_timeseries_tags(
2322 columns: &[String],
2323 values: &[Value],
2324) -> RedDBResult<std::collections::HashMap<String, String>> {
2325 for (index, column) in columns.iter().enumerate() {
2326 if column.eq_ignore_ascii_case("tags") {
2327 return parse_timeseries_tags(&values[index]);
2328 }
2329 }
2330 Ok(std::collections::HashMap::new())
2331}
2332
2333fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
2334 match value {
2335 Value::Null => Ok(std::collections::HashMap::new()),
2336 Value::Json(bytes) => parse_timeseries_tags_json(bytes),
2337 Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
2338 other => Err(RedDBError::Query(format!(
2339 "timeseries tags must be a JSON object or JSON text, got {other:?}"
2340 ))),
2341 }
2342}
2343
2344fn parse_timeseries_tags_json(
2345 bytes: &[u8],
2346) -> RedDBResult<std::collections::HashMap<String, String>> {
2347 let json: crate::json::Value = crate::json::from_slice(bytes)
2348 .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
2349
2350 let object = match json {
2351 crate::json::Value::Object(object) => object,
2352 other => {
2353 return Err(RedDBError::Query(format!(
2354 "timeseries tags must be a JSON object, got {other:?}"
2355 )))
2356 }
2357 };
2358
2359 let mut tags = std::collections::HashMap::with_capacity(object.len());
2360 for (key, value) in object {
2361 tags.insert(key, json_tag_value_to_string(&value));
2362 }
2363 Ok(tags)
2364}
2365
2366fn json_tag_value_to_string(value: &crate::json::Value) -> String {
2367 match value {
2368 crate::json::Value::Null => "null".to_string(),
2369 crate::json::Value::Bool(value) => value.to_string(),
2370 crate::json::Value::Number(value) => value.to_string(),
2371 crate::json::Value::String(value) => value.clone(),
2372 other => other.to_string(),
2373 }
2374}
2375
2376fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
2377 match value {
2378 Value::UnsignedInteger(value) => Ok(*value),
2379 Value::Integer(value) if *value >= 0 => Ok(*value as u64),
2380 Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
2381 Value::Text(value) => value.parse::<u64>().map_err(|_| {
2382 RedDBError::Query(format!(
2383 "column '{column}' expected a non-negative integer timestamp, got '{value}'"
2384 ))
2385 }),
2386 other => Err(RedDBError::Query(format!(
2387 "column '{column}' expected a non-negative integer timestamp, got {other:?}"
2388 ))),
2389 }
2390}
2391
2392fn current_unix_ns() -> u64 {
2393 std::time::SystemTime::now()
2394 .duration_since(std::time::UNIX_EPOCH)
2395 .unwrap_or_default()
2396 .as_nanos()
2397 .min(u128::from(u64::MAX)) as u64
2398}
2399
2400fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
2401 use crate::json::{Map, Value as JV};
2402 match value {
2403 MetadataValue::Null => JV::Null,
2404 MetadataValue::Bool(value) => JV::Bool(*value),
2405 MetadataValue::Int(value) => JV::Number(*value as f64),
2406 MetadataValue::Float(value) => JV::Number(*value),
2407 MetadataValue::String(value) => JV::String(value.clone()),
2408 MetadataValue::Bytes(value) => JV::Array(
2409 value
2410 .iter()
2411 .map(|value| JV::Number(*value as f64))
2412 .collect(),
2413 ),
2414 MetadataValue::Timestamp(value) => JV::Number(*value as f64),
2415 MetadataValue::Array(values) => {
2416 JV::Array(values.iter().map(metadata_value_to_json).collect())
2417 }
2418 MetadataValue::Object(object) => {
2419 let entries = object
2420 .iter()
2421 .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
2422 .collect();
2423 JV::Object(entries)
2424 }
2425 MetadataValue::Geo { lat, lon } => {
2426 let mut object = Map::new();
2427 object.insert("lat".to_string(), JV::Number(*lat));
2428 object.insert("lon".to_string(), JV::Number(*lon));
2429 JV::Object(object)
2430 }
2431 MetadataValue::Reference(target) => {
2432 let mut object = Map::new();
2433 object.insert(
2434 "collection".to_string(),
2435 JV::String(target.collection().to_string()),
2436 );
2437 object.insert(
2438 "entity_id".to_string(),
2439 JV::Number(target.entity_id().raw() as f64),
2440 );
2441 JV::Object(object)
2442 }
2443 MetadataValue::References(values) => {
2444 let refs = values
2445 .iter()
2446 .map(|target| {
2447 let mut object = Map::new();
2448 object.insert(
2449 "collection".to_string(),
2450 JV::String(target.collection().to_string()),
2451 );
2452 object.insert(
2453 "entity_id".to_string(),
2454 JV::Number(target.entity_id().raw() as f64),
2455 );
2456 JV::Object(object)
2457 })
2458 .collect();
2459 JV::Array(refs)
2460 }
2461 }
2462}
2463
2464fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
2465 match value {
2466 Value::Null => MetadataValue::Null,
2467 Value::Boolean(value) => MetadataValue::Bool(*value),
2468 Value::Integer(value) => MetadataValue::Int(*value),
2469 Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
2470 Value::Float(value) => MetadataValue::Float(*value),
2471 Value::Text(value) => MetadataValue::String(value.to_string()),
2472 Value::Blob(value) => MetadataValue::Bytes(value.clone()),
2473 Value::Timestamp(value) => {
2474 if *value >= 0 {
2475 metadata_u64_to_value(*value as u64)
2476 } else {
2477 MetadataValue::Int(*value)
2478 }
2479 }
2480 Value::TimestampMs(value) => {
2481 if *value >= 0 {
2482 metadata_u64_to_value(*value as u64)
2483 } else {
2484 MetadataValue::Int(*value)
2485 }
2486 }
2487 Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
2488 Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
2489 Value::Date(value) => MetadataValue::String(value.to_string()),
2490 Value::Time(value) => MetadataValue::String(value.to_string()),
2491 Value::Decimal(value) => MetadataValue::String(value.to_string()),
2492 Value::Ipv4(value) => MetadataValue::String(format!(
2493 "{}.{}.{}.{}",
2494 (value >> 24) & 0xFF,
2495 (value >> 16) & 0xFF,
2496 (value >> 8) & 0xFF,
2497 value & 0xFF
2498 )),
2499 Value::Port(value) => MetadataValue::Int(i64::from(*value)),
2500 Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2501 Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2502 Value::GeoPoint(lat, lon) => MetadataValue::Geo {
2503 lat: *lat as f64 / 1_000_000.0,
2504 lon: *lon as f64 / 1_000_000.0,
2505 },
2506 Value::BigInt(value) => MetadataValue::String(value.to_string()),
2507 Value::TableRef(value) => MetadataValue::String(value.clone()),
2508 Value::PageRef(value) => MetadataValue::Int(*value as i64),
2509 Value::Password(value) => MetadataValue::String(value.clone()),
2510 Value::Array(values) => {
2511 MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
2512 }
2513 _ => MetadataValue::String(value.to_string()),
2514 }
2515}
2516
2517fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
2518 match value {
2519 Value::Null => Ok(MetadataValue::Null),
2520 Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
2521 Value::Integer(_) => Err(RedDBError::Query(format!(
2522 "column '{field}' must be non-negative for TTL metadata"
2523 ))),
2524 Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
2525 Value::Float(value) if value.is_finite() => {
2526 if value.fract().abs() >= f64::EPSILON {
2527 return Err(RedDBError::Query(format!(
2528 "column '{field}' must be an integer (TTL metadata must be an integer)"
2529 )));
2530 }
2531 if *value < 0.0 {
2532 return Err(RedDBError::Query(format!(
2533 "column '{field}' must be non-negative for TTL metadata"
2534 )));
2535 }
2536 if *value > u64::MAX as f64 {
2537 return Err(RedDBError::Query(format!(
2538 "column '{field}' value is too large"
2539 )));
2540 }
2541 Ok(metadata_u64_to_value(*value as u64))
2542 }
2543 Value::Float(_) => Err(RedDBError::Query(format!(
2544 "column '{field}' must be a finite number"
2545 ))),
2546 Value::Text(value) => {
2547 let value = value.trim();
2548 if let Ok(value) = value.parse::<u64>() {
2549 Ok(metadata_u64_to_value(value))
2550 } else if let Ok(value) = value.parse::<i64>() {
2551 if value < 0 {
2552 return Err(RedDBError::Query(format!(
2553 "column '{field}' must be non-negative for TTL metadata"
2554 )));
2555 }
2556 Ok(metadata_u64_to_value(value as u64))
2557 } else if let Ok(value) = value.parse::<f64>() {
2558 if !value.is_finite() {
2559 return Err(RedDBError::Query(format!(
2560 "column '{field}' must be a finite number"
2561 )));
2562 }
2563 if value.fract().abs() >= f64::EPSILON {
2564 return Err(RedDBError::Query(format!(
2565 "column '{field}' must be an integer (TTL metadata must be an integer)"
2566 )));
2567 }
2568 if value < 0.0 {
2569 return Err(RedDBError::Query(format!(
2570 "column '{field}' must be non-negative for TTL metadata"
2571 )));
2572 }
2573 if value > u64::MAX as f64 {
2574 return Err(RedDBError::Query(format!(
2575 "column '{field}' value is too large"
2576 )));
2577 }
2578 Ok(metadata_u64_to_value(value as u64))
2579 } else {
2580 Err(RedDBError::Query(format!(
2581 "column '{field}' expects a numeric value for TTL metadata"
2582 )))
2583 }
2584 }
2585 _ => Err(RedDBError::Query(format!(
2586 "column '{field}' expects a numeric value for TTL metadata"
2587 ))),
2588 }
2589}
2590
2591fn metadata_u64_to_value(value: u64) -> MetadataValue {
2592 if value <= i64::MAX as u64 {
2593 MetadataValue::Int(value as i64)
2594 } else {
2595 MetadataValue::Timestamp(value)
2596 }
2597}
2598
2599fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
2605 let json = match value {
2606 Value::Null => return false,
2607 Value::Json(bytes) | Value::Blob(bytes) => {
2608 match crate::json::from_slice::<crate::json::Value>(bytes) {
2609 Ok(v) => v,
2610 Err(_) => return false,
2611 }
2612 }
2613 Value::Text(s) => {
2614 let trimmed = s.trim_start();
2615 if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
2616 return false;
2617 }
2618 match crate::json::from_str::<crate::json::Value>(s) {
2619 Ok(v) => v,
2620 Err(_) => return false,
2621 }
2622 }
2623 _ => return false,
2624 };
2625 let mut cursor = &json;
2626 for seg in tail.split('.') {
2627 match cursor {
2628 crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
2629 Some((_, v)) => cursor = v,
2630 None => return false,
2631 },
2632 _ => return false,
2633 }
2634 }
2635 !matches!(cursor, crate::json::Value::Null)
2636}
2637
2638fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
2649 let mut root = match current {
2650 Value::Null => crate::json::Value::Object(Default::default()),
2651 Value::Json(bytes) | Value::Blob(bytes) => {
2652 crate::json::from_slice(&bytes).map_err(|err| {
2653 RedDBError::Query(format!(
2654 "tenant auto-fill: root column is not valid JSON ({err})"
2655 ))
2656 })?
2657 }
2658 Value::Text(s) => {
2659 if s.trim().is_empty() {
2660 crate::json::Value::Object(Default::default())
2661 } else {
2662 crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
2663 RedDBError::Query(format!(
2664 "tenant auto-fill: text root is not valid JSON ({err})"
2665 ))
2666 })?
2667 }
2668 }
2669 other => {
2670 return Err(RedDBError::Query(format!(
2671 "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
2672 )));
2673 }
2674 };
2675
2676 let segments: Vec<&str> = tail.split('.').collect();
2678 let mut cursor: &mut crate::json::Value = &mut root;
2679 for (i, seg) in segments.iter().enumerate() {
2680 let is_last = i + 1 == segments.len();
2681 let map = match cursor {
2682 crate::json::Value::Object(m) => m,
2683 _ => {
2684 return Err(RedDBError::Query(format!(
2685 "tenant auto-fill: segment '{seg}' is not inside an object"
2686 )));
2687 }
2688 };
2689 if is_last {
2690 map.insert(
2691 seg.to_string(),
2692 crate::json::Value::String(tenant_id.to_string()),
2693 );
2694 break;
2695 }
2696 cursor = map
2697 .entry(seg.to_string())
2698 .or_insert_with(|| crate::json::Value::Object(Default::default()));
2699 }
2700
2701 let bytes = crate::json::to_vec(&root).map_err(|err| {
2702 RedDBError::Query(format!(
2703 "tenant auto-fill: failed to re-serialize JSON ({err})"
2704 ))
2705 })?;
2706 Ok(Value::Json(bytes))
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711 use crate::storage::schema::Value;
2712 use crate::{RedDBOptions, RedDBRuntime};
2713
2714 #[test]
2715 fn update_where_id_in_with_hash_index_updates_expected_rows() {
2716 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2717 rt.execute_query("CREATE TABLE users (id INT, score INT)")
2718 .unwrap();
2719 for id in 0..5 {
2720 rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
2721 .unwrap();
2722 }
2723 rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
2724 .unwrap();
2725
2726 let updated = rt
2727 .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
2728 .unwrap();
2729 assert_eq!(updated.affected_rows, 3);
2730
2731 let selected = rt
2732 .execute_query("SELECT id, score FROM users ORDER BY id")
2733 .unwrap();
2734 let scores: Vec<(i64, i64)> = selected
2735 .result
2736 .records
2737 .iter()
2738 .map(|record| {
2739 let id = match record.get("id").unwrap() {
2740 Value::Integer(value) => *value,
2741 other => panic!("expected integer id, got {other:?}"),
2742 };
2743 let score = match record.get("score").unwrap() {
2744 Value::Integer(value) => *value,
2745 other => panic!("expected integer score, got {other:?}"),
2746 };
2747 (id, score)
2748 })
2749 .collect();
2750 assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
2751 }
2752
2753 #[test]
2760 fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2761 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2762 rt.execute_query("CREATE TABLE items (id INT, score INT)")
2763 .unwrap();
2764 for id in 0..5 {
2765 rt.execute_query(&format!(
2766 "INSERT INTO items (id, score) VALUES ({id}, {})",
2767 id * 10
2768 ))
2769 .unwrap();
2770 }
2771 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2772 .unwrap();
2773
2774 let updated_one = rt
2778 .execute_query("UPDATE items SET score = 5 WHERE id = 2")
2779 .unwrap();
2780 assert_eq!(updated_one.affected_rows, 1);
2781
2782 let updated_many = rt
2786 .execute_query("UPDATE items SET score = 7 WHERE score > 25")
2787 .unwrap();
2788 assert_eq!(updated_many.affected_rows, 2);
2789
2790 let snapshot = rt
2791 .execute_query("SELECT id, score FROM items ORDER BY id")
2792 .unwrap();
2793 let pairs: Vec<(i64, i64)> = snapshot
2794 .result
2795 .records
2796 .iter()
2797 .map(|record| {
2798 let id = match record.get("id").unwrap() {
2799 Value::Integer(value) => *value,
2800 other => panic!("expected integer id, got {other:?}"),
2801 };
2802 let score = match record.get("score").unwrap() {
2803 Value::Integer(value) => *value,
2804 other => panic!("expected integer score, got {other:?}"),
2805 };
2806 (id, score)
2807 })
2808 .collect();
2809 assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
2810
2811 let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
2813 assert_eq!(updated_all.affected_rows, 5);
2814 let after = rt
2815 .execute_query("SELECT score FROM items ORDER BY id")
2816 .unwrap();
2817 let scores: Vec<i64> = after
2818 .result
2819 .records
2820 .iter()
2821 .map(|record| match record.get("score").unwrap() {
2822 Value::Integer(value) => *value,
2823 other => panic!("expected integer score, got {other:?}"),
2824 })
2825 .collect();
2826 assert_eq!(scores, vec![1, 1, 1, 1, 1]);
2827 }
2828
2829 #[test]
2835 fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2836 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2837 rt.execute_query("CREATE TABLE items (id INT, score INT)")
2838 .unwrap();
2839 for id in 0..5 {
2840 rt.execute_query(&format!(
2841 "INSERT INTO items (id, score) VALUES ({id}, {})",
2842 id * 10
2843 ))
2844 .unwrap();
2845 }
2846 rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2847 .unwrap();
2848
2849 let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2852 assert_eq!(deleted_one.affected_rows, 1);
2853
2854 let deleted_many = rt
2858 .execute_query("DELETE FROM items WHERE score > 25")
2859 .unwrap();
2860 assert_eq!(deleted_many.affected_rows, 2);
2861
2862 let surviving = rt
2863 .execute_query("SELECT id FROM items ORDER BY id")
2864 .unwrap();
2865 let ids: Vec<i64> = surviving
2866 .result
2867 .records
2868 .iter()
2869 .map(|record| match record.get("id").unwrap() {
2870 Value::Integer(value) => *value,
2871 other => panic!("expected integer id, got {other:?}"),
2872 })
2873 .collect();
2874 assert_eq!(ids, vec![0, 1]);
2875
2876 let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
2878 assert_eq!(deleted_rest.affected_rows, 2);
2879 let empty = rt.execute_query("SELECT id FROM items").unwrap();
2880 assert!(empty.result.records.is_empty());
2881 }
2882
2883 #[test]
2888 fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
2889 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2890 rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
2891 .unwrap();
2892
2893 let inserted = rt
2896 .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
2897 .unwrap();
2898 assert_eq!(inserted.affected_rows, 1);
2899
2900 let update_err = rt
2902 .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
2903 .unwrap_err();
2904 let msg = format!("{update_err}");
2905 assert!(
2906 msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
2907 "expected UPDATE rejection message, got: {msg}"
2908 );
2909
2910 let delete_err = rt
2912 .execute_query("DELETE FROM events WHERE id = 1")
2913 .unwrap_err();
2914 let msg = format!("{delete_err}");
2915 assert!(
2916 msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
2917 "expected DELETE rejection message, got: {msg}"
2918 );
2919
2920 let surviving = rt.execute_query("SELECT id FROM events").unwrap();
2923 assert_eq!(surviving.result.records.len(), 1);
2924 }
2925
2926 #[test]
2930 fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
2931 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2932 rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
2933 .unwrap();
2934
2935 rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
2936 .unwrap();
2937 let updated = rt
2938 .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
2939 .unwrap();
2940 assert_eq!(updated.affected_rows, 1);
2941 let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
2942 assert_eq!(deleted.affected_rows, 1);
2943 }
2944
2945 #[test]
2946 fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
2947 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2948 rt.execute_query(
2949 "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
2950 )
2951 .unwrap();
2952
2953 let inserted = rt
2954 .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
2955 .unwrap();
2956 assert_eq!(inserted.affected_rows, 1);
2957
2958 let events = queue_payloads(&rt, "audit_log");
2959 assert_eq!(events.len(), 1);
2960 let event = events[0].as_object().expect("event payload object");
2961 assert!(event
2962 .get("event_id")
2963 .and_then(crate::json::Value::as_str)
2964 .is_some_and(|value| !value.is_empty()));
2965 assert_eq!(
2966 event.get("op").and_then(crate::json::Value::as_str),
2967 Some("insert")
2968 );
2969 assert_eq!(
2970 event.get("collection").and_then(crate::json::Value::as_str),
2971 Some("users")
2972 );
2973 assert_eq!(
2974 event.get("id").and_then(crate::json::Value::as_u64),
2975 Some(7)
2976 );
2977 assert!(event
2978 .get("ts")
2979 .and_then(crate::json::Value::as_u64)
2980 .is_some());
2981 assert!(event
2982 .get("lsn")
2983 .and_then(crate::json::Value::as_u64)
2984 .is_some());
2985 assert!(matches!(
2986 event.get("tenant"),
2987 Some(crate::json::Value::Null)
2988 ));
2989 assert!(matches!(
2990 event.get("before"),
2991 Some(crate::json::Value::Null)
2992 ));
2993 let after = event
2994 .get("after")
2995 .and_then(crate::json::Value::as_object)
2996 .expect("after object");
2997 assert_eq!(
2998 after.get("id").and_then(crate::json::Value::as_u64),
2999 Some(7)
3000 );
3001 assert_eq!(
3002 after.get("email").and_then(crate::json::Value::as_str),
3003 Some("a@example.com")
3004 );
3005 }
3006
3007 #[test]
3008 fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
3009 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3010 rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3011 .unwrap();
3012
3013 rt.execute_query(
3014 "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
3015 )
3016 .unwrap();
3017
3018 let events = queue_payloads(&rt, "users_events");
3019 assert_eq!(events.len(), 2);
3020 let mut previous_lsn = 0;
3021 for (event, expected_id) in events.iter().zip([1_u64, 2]) {
3022 let object = event.as_object().expect("event payload object");
3023 assert_eq!(
3024 object.get("op").and_then(crate::json::Value::as_str),
3025 Some("insert")
3026 );
3027 assert_eq!(
3028 object.get("id").and_then(crate::json::Value::as_u64),
3029 Some(expected_id)
3030 );
3031 let lsn = object
3032 .get("lsn")
3033 .and_then(crate::json::Value::as_u64)
3034 .expect("event lsn");
3035 assert!(
3036 lsn > previous_lsn,
3037 "event LSNs should increase in row order"
3038 );
3039 previous_lsn = lsn;
3040 let after = object
3041 .get("after")
3042 .and_then(crate::json::Value::as_object)
3043 .expect("after object");
3044 assert_eq!(
3045 after.get("id").and_then(crate::json::Value::as_u64),
3046 Some(expected_id)
3047 );
3048 }
3049 }
3050
3051 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
3052 let result = rt
3053 .execute_query(&format!("QUEUE PEEK {queue} 10"))
3054 .expect("peek queue");
3055 result
3056 .result
3057 .records
3058 .iter()
3059 .map(
3060 |record| match record.get("payload").expect("payload column") {
3061 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
3062 other => panic!("expected JSON queue payload, got {other:?}"),
3063 },
3064 )
3065 .collect()
3066 }
3067
3068 #[test]
3080 fn auto_index_id_fires_on_first_insert() {
3081 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3082 rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
3083 .unwrap();
3084
3085 assert!(
3087 rt.index_store_ref()
3088 .find_index_for_column("bench_users", "id")
3089 .is_none(),
3090 "freshly created collection should not have an `id` index"
3091 );
3092
3093 rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
3095 .unwrap();
3096
3097 let registered = rt
3099 .index_store_ref()
3100 .find_index_for_column("bench_users", "id")
3101 .expect("auto-index hook should have registered idx_id on first insert");
3102 assert_eq!(registered.name, "idx_id");
3103 assert_eq!(registered.collection, "bench_users");
3104 assert_eq!(registered.columns, vec!["id".to_string()]);
3105 assert!(matches!(
3106 registered.method,
3107 super::super::index_store::IndexMethodKind::Hash
3108 ));
3109
3110 for id in 2..=5 {
3113 rt.execute_query(&format!(
3114 "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
3115 id * 10
3116 ))
3117 .unwrap();
3118 }
3119 for id in 1..=5 {
3120 let result = rt
3121 .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
3122 .unwrap();
3123 assert_eq!(
3124 result.result.records.len(),
3125 1,
3126 "id={id} should match one row"
3127 );
3128 }
3129
3130 let deleted = rt
3135 .execute_query("DELETE FROM bench_users WHERE id = 3")
3136 .unwrap();
3137 assert_eq!(deleted.affected_rows, 1);
3138 }
3139
3140 #[test]
3145 fn auto_index_id_fires_on_first_bulk_insert() {
3146 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3147 rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
3148 .unwrap();
3149
3150 rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
3151 .unwrap();
3152
3153 let registered = rt
3154 .index_store_ref()
3155 .find_index_for_column("bench_bulk", "id")
3156 .expect("auto-index hook should fire on first bulk insert");
3157 assert_eq!(registered.name, "idx_id");
3158
3159 for id in 1..=3 {
3161 let result = rt
3162 .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
3163 .unwrap();
3164 assert_eq!(result.result.records.len(), 1);
3165 }
3166 }
3167
3168 #[test]
3172 fn auto_index_id_skips_when_no_id_column() {
3173 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3174 rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
3175 .unwrap();
3176 rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
3177 .unwrap();
3178
3179 assert!(rt
3180 .index_store_ref()
3181 .find_index_for_column("plain", "id")
3182 .is_none());
3183 assert!(rt
3184 .index_store_ref()
3185 .find_index_for_column("plain", "uid")
3186 .is_none());
3187 }
3188
3189 #[test]
3194 fn auto_index_id_skips_when_index_already_exists() {
3195 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3196 rt.execute_query("CREATE TABLE pre (id INT, score INT)")
3197 .unwrap();
3198 rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
3200 .unwrap();
3201 rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
3202 .unwrap();
3203
3204 let registered = rt
3205 .index_store_ref()
3206 .find_index_for_column("pre", "id")
3207 .expect("user index should still be there");
3208 assert_eq!(
3209 registered.name, "user_idx",
3210 "auto-index hook must not overwrite an existing index"
3211 );
3212 }
3213
3214 #[test]
3218 fn auto_index_id_dropped_with_collection() {
3219 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3220 rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
3221 .unwrap();
3222 rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
3223 .unwrap();
3224 assert!(rt
3225 .index_store_ref()
3226 .find_index_for_column("ephemeral", "id")
3227 .is_some());
3228
3229 rt.execute_query("DROP TABLE ephemeral").unwrap();
3230
3231 assert!(
3232 rt.index_store_ref()
3233 .find_index_for_column("ephemeral", "id")
3234 .is_none(),
3235 "implicit `idx_id` must be reaped when its collection drops"
3236 );
3237 }
3238
3239 #[test]
3244 fn auto_index_id_disabled_by_config() {
3245 let opts = RedDBOptions::in_memory().with_auto_index_id(false);
3246 let rt = RedDBRuntime::with_options(opts).unwrap();
3247
3248 rt.execute_query("CREATE TABLE off (id INT, score INT)")
3249 .unwrap();
3250 rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
3251 .unwrap();
3252
3253 assert!(
3254 rt.index_store_ref()
3255 .find_index_for_column("off", "id")
3256 .is_none(),
3257 "with auto_index_id=false, no implicit index should be created"
3258 );
3259 }
3260
3261 #[test]
3264 fn update_single_row_emits_update_event() {
3265 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3266 rt.execute_query(
3267 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
3268 )
3269 .unwrap();
3270 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3271 .unwrap();
3272
3273 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
3274 .unwrap();
3275
3276 let events = queue_payloads(&rt, "audit_log");
3277 assert_eq!(events.len(), 1, "expected exactly 1 update event");
3278 let event = events[0].as_object().expect("event payload object");
3279 assert_eq!(
3280 event.get("op").and_then(crate::json::Value::as_str),
3281 Some("update")
3282 );
3283 assert_eq!(
3284 event.get("collection").and_then(crate::json::Value::as_str),
3285 Some("users")
3286 );
3287 assert!(event
3288 .get("event_id")
3289 .and_then(crate::json::Value::as_str)
3290 .is_some_and(|v| !v.is_empty()));
3291 let before = event
3292 .get("before")
3293 .and_then(crate::json::Value::as_object)
3294 .expect("before must be an object");
3295 let after = event
3296 .get("after")
3297 .and_then(crate::json::Value::as_object)
3298 .expect("after must be an object");
3299 assert_eq!(
3300 before.get("name").and_then(crate::json::Value::as_str),
3301 Some("Alice"),
3302 "before.name should be the old value"
3303 );
3304 assert_eq!(
3305 after.get("name").and_then(crate::json::Value::as_str),
3306 Some("Bob"),
3307 "after.name should be the new value"
3308 );
3309 }
3310
3311 #[test]
3312 fn update_event_only_includes_changed_fields() {
3313 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3314 rt.execute_query(
3315 "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
3316 )
3317 .unwrap();
3318 rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
3319 .unwrap();
3320
3321 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
3322 .unwrap();
3323
3324 let events = queue_payloads(&rt, "evts");
3325 assert_eq!(events.len(), 1);
3326 let event = events[0].as_object().unwrap();
3327 let before = event
3328 .get("before")
3329 .and_then(crate::json::Value::as_object)
3330 .unwrap();
3331 let after = event
3332 .get("after")
3333 .and_then(crate::json::Value::as_object)
3334 .unwrap();
3335 assert!(
3337 before.contains_key("name"),
3338 "before must include changed field"
3339 );
3340 assert!(
3341 after.contains_key("name"),
3342 "after must include changed field"
3343 );
3344 assert!(
3346 !before.contains_key("email"),
3347 "before must not include unchanged email"
3348 );
3349 assert!(
3350 !after.contains_key("email"),
3351 "after must not include unchanged email"
3352 );
3353 }
3354
3355 #[test]
3356 fn multi_row_update_emits_one_event_per_row() {
3357 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3358 rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
3359 .unwrap();
3360 rt.execute_query(
3361 "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
3362 )
3363 .unwrap();
3364
3365 rt.execute_query("UPDATE items SET status = 'done'")
3366 .unwrap();
3367
3368 let events = queue_payloads(&rt, "evts");
3369 assert_eq!(events.len(), 3, "expected one update event per row");
3370 for event in &events {
3371 let obj = event.as_object().unwrap();
3372 assert_eq!(
3373 obj.get("op").and_then(crate::json::Value::as_str),
3374 Some("update")
3375 );
3376 }
3377 }
3378
3379 #[test]
3380 fn delete_single_row_emits_delete_event() {
3381 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3382 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
3383 .unwrap();
3384 rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
3385 .unwrap();
3386
3387 rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
3388
3389 let events = queue_payloads(&rt, "del_log");
3390 assert_eq!(events.len(), 1);
3391 let event = events[0].as_object().expect("event payload object");
3392 assert_eq!(
3393 event.get("op").and_then(crate::json::Value::as_str),
3394 Some("delete")
3395 );
3396 assert_eq!(
3397 event.get("collection").and_then(crate::json::Value::as_str),
3398 Some("users")
3399 );
3400 assert!(event
3401 .get("event_id")
3402 .and_then(crate::json::Value::as_str)
3403 .is_some_and(|v| !v.is_empty()));
3404 let before = event
3405 .get("before")
3406 .and_then(crate::json::Value::as_object)
3407 .expect("before must be an object for delete");
3408 assert_eq!(
3409 before.get("id").and_then(crate::json::Value::as_u64),
3410 Some(42)
3411 );
3412 assert_eq!(
3413 before.get("name").and_then(crate::json::Value::as_str),
3414 Some("Alice")
3415 );
3416 assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
3417 }
3418
3419 #[test]
3420 fn multi_row_delete_emits_one_event_per_row() {
3421 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3422 rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
3423 .unwrap();
3424 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
3425 .unwrap();
3426
3427 rt.execute_query("DELETE FROM items").unwrap();
3428
3429 let events = queue_payloads(&rt, "del_log");
3430 assert_eq!(events.len(), 3, "expected one delete event per deleted row");
3431 for event in &events {
3432 let obj = event.as_object().unwrap();
3433 assert_eq!(
3434 obj.get("op").and_then(crate::json::Value::as_str),
3435 Some("delete")
3436 );
3437 assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
3438 }
3439 }
3440
3441 #[test]
3442 fn ops_filter_update_does_not_emit_on_insert_or_delete() {
3443 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3444 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
3445 .unwrap();
3446
3447 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3448 .unwrap();
3449 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3450
3451 let events = queue_payloads(&rt, "evts");
3452 assert!(
3453 events.is_empty(),
3454 "UPDATE-only filter must not emit INSERT or DELETE events"
3455 );
3456 }
3457
3458 #[test]
3461 fn suppress_events_on_insert_emits_no_events() {
3462 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3463 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3464 .unwrap();
3465
3466 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3467 .unwrap();
3468
3469 let events = queue_payloads(&rt, "evts");
3470 assert!(
3471 events.is_empty(),
3472 "SUPPRESS EVENTS must prevent INSERT events"
3473 );
3474 }
3475
3476 #[test]
3477 fn suppress_events_on_update_emits_no_events() {
3478 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3479 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3480 .unwrap();
3481 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3482 .unwrap();
3483 let _ = queue_payloads(&rt, "evts");
3485 rt.execute_query("QUEUE PURGE evts").unwrap();
3487
3488 rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
3489 .unwrap();
3490
3491 let events = queue_payloads(&rt, "evts");
3492 assert!(
3493 events.is_empty(),
3494 "SUPPRESS EVENTS must prevent UPDATE events"
3495 );
3496 }
3497
3498 #[test]
3499 fn suppress_events_on_delete_emits_no_events() {
3500 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3501 rt.execute_query(
3502 "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
3503 )
3504 .unwrap();
3505 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3506 .unwrap();
3507
3508 rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
3509 .unwrap();
3510
3511 let events = queue_payloads(&rt, "evts");
3512 assert!(
3513 events.is_empty(),
3514 "SUPPRESS EVENTS must prevent DELETE events"
3515 );
3516 }
3517
3518 #[test]
3519 fn normal_insert_after_suppress_still_emits() {
3520 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3521 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3522 .unwrap();
3523
3524 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3525 .unwrap();
3526 rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
3527 .unwrap();
3528
3529 let events = queue_payloads(&rt, "evts");
3530 assert_eq!(
3531 events.len(),
3532 1,
3533 "only the non-suppressed INSERT should emit"
3534 );
3535 assert_eq!(
3536 events[0].get("id").and_then(crate::json::Value::as_u64),
3537 Some(2)
3538 );
3539 }
3540}