use std::collections::HashMap;
use std::sync::Arc;
use crate::json::{Map as JsonMap, Value as JsonValue};
use crate::presentation::entity_json::storage_value_to_json;
use crate::storage::schema::Value;
use crate::storage::unified::devx::refs::{NodeRef, VectorRef};
use crate::storage::unified::{
entity::{CrossRef, EntityData, EntityId, EntityKind, RefType, RowData, UnifiedEntity},
Metadata, MetadataValue, UnifiedStore,
};
use crate::{RedDBError, RedDBResult};
pub(crate) struct MutationRow {
pub fields: Vec<(String, Value)>,
pub metadata: Vec<(String, MetadataValue)>,
pub node_links: Vec<NodeRef>,
pub vector_links: Vec<VectorRef>,
}
pub(crate) struct MutationResult {
pub ids: Vec<EntityId>,
}
impl MutationResult {
fn empty() -> Self {
Self { ids: Vec::new() }
}
}
pub(crate) struct MutationEngine<'rt> {
runtime: &'rt crate::RedDBRuntime,
store: Arc<UnifiedStore>,
suppress_events: bool,
}
impl<'rt> MutationEngine<'rt> {
pub(crate) fn new(runtime: &'rt crate::RedDBRuntime) -> Self {
let store = runtime.db().store();
Self {
runtime,
store,
suppress_events: false,
}
}
pub(crate) fn with_suppress_events(mut self) -> Self {
self.suppress_events = true;
self
}
pub(crate) fn apply(
&self,
collection: String,
mut rows: Vec<MutationRow>,
) -> RedDBResult<MutationResult> {
self.runtime
.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
match rows.len() {
0 => Ok(MutationResult::empty()),
1 => self.append_one(collection, rows.remove(0)),
_ => self.append_batch(collection, rows),
}
}
fn append_one(&self, collection: String, row: MutationRow) -> RedDBResult<MutationResult> {
let db = self.runtime.db();
let mut entity = build_table_entity(
self.store.as_ref(),
&collection,
row.fields.clone(),
&row.node_links,
&row.vector_links,
);
let writer_xid = match self.runtime.current_xid() {
Some(xid) => xid,
None => self.runtime.snapshot_manager().allocate_committed_xid(),
};
entity.set_xmin(writer_xid);
let id = self
.store
.insert_auto(&collection, entity)
.map_err(|e| RedDBError::Internal(format!("{e:?}")))?;
if !row.metadata.is_empty() {
let _ = self.store.set_metadata(
&collection,
id,
Metadata::with_fields(row.metadata.into_iter().collect()),
);
}
self.maybe_auto_index_id(&collection, &row.fields);
self.runtime
.index_store_ref()
.index_entity_insert(&collection, id, &row.fields)
.map_err(RedDBError::Internal)?;
let lsn = self.runtime.cdc_emit(
crate::replication::cdc::ChangeOperation::Insert,
&collection,
id.raw(),
"table",
);
if !self.suppress_events {
self.runtime
.emit_insert_events_for_collection(&collection, &[id], &[lsn])?;
}
Ok(MutationResult { ids: vec![id] })
}
fn append_batch(
&self,
collection: String,
rows: Vec<MutationRow>,
) -> RedDBResult<MutationResult> {
let n = rows.len();
if let Some(first) = rows.first() {
self.maybe_auto_index_id(&collection, &first.fields);
}
let has_secondary_indexes = !self
.runtime
.index_store_ref()
.list_indices(&collection)
.is_empty();
let mut field_snapshots: Vec<Vec<(String, Value)>> = if has_secondary_indexes {
Vec::with_capacity(n)
} else {
Vec::new()
};
let any_metadata = rows.iter().any(|r| !r.metadata.is_empty());
let mut metadata_batch: Vec<Vec<(String, MetadataValue)>> = if any_metadata {
Vec::with_capacity(n)
} else {
Vec::new()
};
let mut entities: Vec<UnifiedEntity> = Vec::with_capacity(n);
let current_xid = match self.runtime.current_xid() {
Some(xid) => xid,
None => self.runtime.snapshot_manager().allocate_committed_xid(),
};
let ci_enabled = self
.store
.context_index()
.is_collection_enabled(&collection);
let any_xrefs = rows
.iter()
.any(|r| !r.node_links.is_empty() || !r.vector_links.is_empty());
let needs_entity_fetch = ci_enabled || any_xrefs;
let table_arc: Arc<str> = Arc::from(collection.as_str());
for row in rows {
if has_secondary_indexes {
field_snapshots.push(row.fields.clone());
}
if any_metadata {
metadata_batch.push(row.metadata);
}
let mut entity = build_table_entity_shared(
self.store.as_ref(),
Arc::clone(&table_arc),
row.fields,
&row.node_links,
&row.vector_links,
);
entity.set_xmin(current_xid);
entities.push(entity);
}
let ids = self
.store
.bulk_insert(&collection, entities)
.map_err(|e| RedDBError::Internal(format!("{e:?}")))?;
if ids.len() != n {
return Err(RedDBError::Internal(format!(
"bulk_insert returned {} ids for {} rows",
ids.len(),
n
)));
}
if any_metadata || needs_entity_fetch {
for (i, &id) in ids.iter().enumerate() {
if any_metadata && !metadata_batch[i].is_empty() {
let meta = Metadata::with_fields(
std::mem::take(&mut metadata_batch[i]).into_iter().collect(),
);
let _ = self.store.set_metadata(&collection, id, meta);
}
if needs_entity_fetch {
if let Some(entity) = self.store.get(&collection, id) {
if ci_enabled {
self.store
.context_index()
.index_entity(&collection, &entity);
}
if any_xrefs {
let _ = self.store.index_cross_refs(&entity, &collection);
}
}
}
}
}
if has_secondary_indexes {
let index_rows: Vec<(EntityId, Vec<(String, Value)>)> = ids
.iter()
.zip(field_snapshots)
.map(|(id, fields)| (*id, fields))
.collect();
self.runtime
.index_store_ref()
.index_entity_insert_batch(&collection, &index_rows)
.map_err(RedDBError::Internal)?;
}
self.runtime.invalidate_result_cache();
let lsns =
self.runtime
.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
if !self.suppress_events {
self.runtime
.emit_insert_events_for_collection(&collection, &ids, &lsns)?;
}
Ok(MutationResult { ids })
}
fn maybe_auto_index_id(&self, collection: &str, fields: &[(String, Value)]) {
if !self.store.config().auto_index_id {
return;
}
if !fields.iter().any(|(name, _)| name == "id") {
return;
}
let index_store = self.runtime.index_store_ref();
if index_store
.find_index_for_column(collection, "id")
.is_some()
{
return;
}
let columns = vec!["id".to_string()];
if let Err(err) = index_store.create_index(
"idx_id",
collection,
&columns,
super::index_store::IndexMethodKind::Hash,
false,
&[],
) {
tracing::debug!(
target: "reddb::runtime::auto_index_id",
collection = %collection,
error = %err,
"auto_index_id: failed to create implicit hash index on `id`"
);
return;
}
index_store.register(super::index_store::RegisteredIndex {
name: "idx_id".to_string(),
collection: collection.to_string(),
columns,
method: super::index_store::IndexMethodKind::Hash,
unique: false,
});
self.runtime.invalidate_plan_cache();
}
}
impl crate::RedDBRuntime {
pub(crate) fn emit_insert_events_for_collection(
&self,
collection: &str,
ids: &[EntityId],
lsns: &[u64],
) -> RedDBResult<()> {
if ids.is_empty() {
return Ok(());
}
if ids.len() != lsns.len() {
return Err(RedDBError::Internal(format!(
"insert event emission expected {} LSNs, got {}",
ids.len(),
lsns.len()
)));
}
let Some(contract) = self.db().collection_contract_arc(collection) else {
return Ok(());
};
let subscriptions = contract
.subscriptions
.iter()
.filter(|subscription| {
subscription.enabled
&& (subscription.ops_filter.is_empty()
|| subscription
.ops_filter
.contains(&crate::catalog::SubscriptionOperation::Insert))
})
.cloned()
.collect::<Vec<_>>();
if subscriptions.is_empty() {
return Ok(());
}
let store = self.db().store();
for (&id, &lsn) in ids.iter().zip(lsns) {
let Some(entity) = store.get(collection, id) else {
continue;
};
let after = table_row_after_json(&entity);
for subscription in &subscriptions {
if !entity_passes_where_filter(subscription, &entity, collection) {
continue;
}
let payload = insert_event_payload(
collection,
id.raw(),
lsn,
&after,
subscription.redact_fields.as_slice(),
)?;
self.enqueue_event_payload(
&effective_queue_name(subscription),
Value::Json(payload),
)?;
}
}
Ok(())
}
pub(crate) fn collection_has_delete_subscriptions(&self, collection: &str) -> bool {
let Some(contract) = self.db().collection_contract_arc(collection) else {
return false;
};
contract.subscriptions.iter().any(|s| {
s.enabled
&& (s.ops_filter.is_empty()
|| s.ops_filter
.contains(&crate::catalog::SubscriptionOperation::Delete))
})
}
pub(crate) fn emit_update_events_for_collection(
&self,
collection: &str,
applied: &[crate::application::entity::AppliedEntityMutation],
lsns: &[u64],
) -> RedDBResult<()> {
if applied.is_empty() {
return Ok(());
}
let Some(contract) = self.db().collection_contract_arc(collection) else {
return Ok(());
};
let subscriptions = contract
.subscriptions
.iter()
.filter(|s| {
s.enabled
&& (s.ops_filter.is_empty()
|| s.ops_filter
.contains(&crate::catalog::SubscriptionOperation::Update))
})
.cloned()
.collect::<Vec<_>>();
if subscriptions.is_empty() {
return Ok(());
}
for (mutation, &lsn) in applied.iter().zip(lsns) {
let before = build_update_before_json(mutation);
let after = build_update_after_json(mutation);
for subscription in &subscriptions {
if !entity_passes_where_filter(subscription, &mutation.entity, collection) {
continue;
}
let payload = update_event_payload(
collection,
mutation.id.raw(),
lsn,
&before,
&after,
subscription.redact_fields.as_slice(),
)?;
self.enqueue_event_payload(
&effective_queue_name(subscription),
Value::Json(payload),
)?;
}
}
Ok(())
}
pub(crate) fn emit_delete_events_for_collection(
&self,
collection: &str,
deleted_ids: &[EntityId],
lsns: &[u64],
pre_images: &std::collections::HashMap<u64, crate::json::Value>,
) -> RedDBResult<()> {
if deleted_ids.is_empty() {
return Ok(());
}
let Some(contract) = self.db().collection_contract_arc(collection) else {
return Ok(());
};
let subscriptions = contract
.subscriptions
.iter()
.filter(|s| {
s.enabled
&& (s.ops_filter.is_empty()
|| s.ops_filter
.contains(&crate::catalog::SubscriptionOperation::Delete))
})
.cloned()
.collect::<Vec<_>>();
if subscriptions.is_empty() {
return Ok(());
}
for (&id, &lsn) in deleted_ids.iter().zip(lsns) {
let before = pre_images
.get(&id.raw())
.cloned()
.unwrap_or(crate::json::Value::Null);
for subscription in &subscriptions {
if !json_passes_where_filter(subscription, &before) {
continue;
}
let payload = delete_event_payload(
collection,
id.raw(),
lsn,
&before,
subscription.redact_fields.as_slice(),
)?;
self.enqueue_event_payload(
&effective_queue_name(subscription),
Value::Json(payload),
)?;
}
}
Ok(())
}
}
pub(crate) fn effective_queue_name(
subscription: &crate::catalog::SubscriptionDescriptor,
) -> String {
if !subscription.all_tenants {
if let Some(tenant) = crate::runtime::impl_core::current_tenant() {
return format!("{tenant}__{}", subscription.target_queue);
}
}
subscription.target_queue.clone()
}
pub(crate) fn backfill_event_id(collection: &str, id: &JsonValue, subscription_id: &str) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update(collection.as_bytes());
hasher.update(&[0]);
hasher.update(json_id_for_hash(id).as_bytes());
hasher.update(&[0]);
hasher.update(b"backfill");
hasher.update(&[0]);
hasher.update(subscription_id.as_bytes());
hasher.finalize().to_hex().to_string()
}
pub(crate) fn backfill_event_payload(
collection: &str,
raw_entity_id: u64,
after: &JsonValue,
subscription_id: &str,
redact_fields: &[String],
) -> RedDBResult<(String, Vec<u8>)> {
let mut object = JsonMap::new();
let subject_id = after
.get("id")
.cloned()
.unwrap_or(JsonValue::Number(raw_entity_id as f64));
let event_id = backfill_event_id(collection, &subject_id, subscription_id);
object.insert("event_id".to_string(), JsonValue::String(event_id.clone()));
object.insert("op".to_string(), JsonValue::String("insert".to_string()));
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert("id".to_string(), subject_id);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(0.0));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert("synthetic".to_string(), JsonValue::Bool(true));
object.insert("before".to_string(), JsonValue::Null);
object.insert(
"after".to_string(),
redact_json_object(after.clone(), redact_fields),
);
let bytes = crate::json::to_vec(&JsonValue::Object(object))
.map_err(|err| RedDBError::Internal(format!("encode backfill event payload: {err}")))?;
Ok((event_id, bytes))
}
fn insert_event_payload(
collection: &str,
id: u64,
lsn: u64,
after: &JsonValue,
redact_fields: &[String],
) -> RedDBResult<Vec<u8>> {
let mut object = JsonMap::new();
let subject_id = after
.get("id")
.cloned()
.unwrap_or(JsonValue::Number(id as f64));
let subject_id_for_hash = json_id_for_hash(&subject_id);
object.insert(
"event_id".to_string(),
JsonValue::String(deterministic_event_id(
collection,
&subject_id_for_hash,
lsn,
"insert",
)),
);
object.insert("op".to_string(), JsonValue::String("insert".to_string()));
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert("id".to_string(), subject_id);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(lsn as f64));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert("before".to_string(), JsonValue::Null);
object.insert(
"after".to_string(),
redact_json_object(after.clone(), redact_fields),
);
crate::json::to_vec(&JsonValue::Object(object))
.map_err(|err| RedDBError::Internal(format!("encode insert event payload: {err}")))
}
fn deterministic_event_id(collection: &str, id: &str, lsn: u64, op: &str) -> String {
let mut hasher = crate::crypto::sha256::Sha256::new();
hasher.update(collection.as_bytes());
hasher.update(&[0]);
hasher.update(id.as_bytes());
hasher.update(&[0]);
hasher.update(&lsn.to_le_bytes());
hasher.update(op.as_bytes());
hex::encode(hasher.finalize())
}
fn json_id_for_hash(value: &JsonValue) -> String {
match value {
JsonValue::String(value) => value.clone(),
JsonValue::Number(value) => value.to_string(),
JsonValue::Bool(value) => value.to_string(),
JsonValue::Null => "null".to_string(),
JsonValue::Array(_) | JsonValue::Object(_) => {
crate::json::to_string(value).unwrap_or_else(|_| "structured".to_string())
}
}
}
fn table_row_after_json(entity: &UnifiedEntity) -> JsonValue {
let mut object = JsonMap::new();
if let EntityData::Row(row) = &entity.data {
if let Some(named) = &row.named {
for (key, value) in named {
object.insert(key.to_string(), storage_value_to_json(value));
}
} else if let Some(schema) = &row.schema {
for (idx, column) in schema.iter().enumerate() {
if let Some(value) = row.columns.get(idx) {
object.insert(column.clone(), storage_value_to_json(value));
}
}
}
}
JsonValue::Object(object)
}
fn redact_json_object(mut value: JsonValue, redact_fields: &[String]) -> JsonValue {
for field in redact_fields {
redact_path(&mut value, field);
}
value
}
fn redact_path(value: &mut JsonValue, path: &str) {
let (key, rest) = match path.split_once('.') {
Some((k, r)) => (k, Some(r)),
None => (path, None),
};
let JsonValue::Object(obj) = value else {
return;
};
match rest {
None => {
obj.insert(key.to_string(), JsonValue::String("[REDACTED]".to_string()));
}
Some(rest) if key == "*" => {
for child in obj.values_mut() {
redact_path(child, rest);
}
}
Some(rest) => {
if let Some(child) = obj.get_mut(key) {
redact_path(child, rest);
}
}
}
}
fn current_unix_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
fn build_table_entity(
store: &UnifiedStore,
collection: &str,
fields: Vec<(String, Value)>,
node_links: &[NodeRef],
vector_links: &[VectorRef],
) -> UnifiedEntity {
build_table_entity_shared(
store,
Arc::from(collection),
fields,
node_links,
vector_links,
)
}
fn build_table_entity_shared(
store: &UnifiedStore,
table: Arc<str>,
fields: Vec<(String, Value)>,
node_links: &[NodeRef],
vector_links: &[VectorRef],
) -> UnifiedEntity {
let id = store.next_entity_id();
let kind = EntityKind::TableRow {
table,
row_id: 0, };
let mut row = RowData::new(Vec::new());
row.named = Some(fields.into_iter().collect());
let mut entity = UnifiedEntity::new(id, kind, EntityData::Row(row));
for node_ref in node_links {
entity.add_cross_ref(CrossRef::new(
id,
node_ref.node_id,
node_ref.collection.clone(),
RefType::RowToNode,
));
}
for vector_ref in vector_links {
entity.add_cross_ref(CrossRef::new(
id,
vector_ref.vector_id,
vector_ref.collection.clone(),
RefType::RowToVector,
));
}
entity
}
pub(crate) fn entity_row_json(entity: &UnifiedEntity) -> JsonValue {
table_row_after_json(entity)
}
fn build_update_before_json(
mutation: &crate::application::entity::AppliedEntityMutation,
) -> JsonValue {
if mutation.modified_columns.is_empty() {
return JsonValue::Object(JsonMap::new());
}
let changed: std::collections::HashSet<&str> = mutation
.modified_columns
.iter()
.map(String::as_str)
.collect();
let mut object = JsonMap::new();
for (key, value) in &mutation.pre_mutation_fields {
if changed.contains(key.as_str()) {
object.insert(key.clone(), storage_value_to_json(value));
}
}
JsonValue::Object(object)
}
fn build_update_after_json(
mutation: &crate::application::entity::AppliedEntityMutation,
) -> JsonValue {
if mutation.modified_columns.is_empty() {
return JsonValue::Object(JsonMap::new());
}
let changed: std::collections::HashSet<&str> = mutation
.modified_columns
.iter()
.map(String::as_str)
.collect();
let mut object = JsonMap::new();
if let EntityData::Row(row) = &mutation.entity.data {
if let Some(named) = &row.named {
for (key, value) in named {
if changed.contains(key.as_str()) {
object.insert(key.clone(), storage_value_to_json(value));
}
}
} else if let Some(schema) = &row.schema {
for (idx, column) in schema.iter().enumerate() {
if changed.contains(column.as_str()) {
if let Some(value) = row.columns.get(idx) {
object.insert(column.clone(), storage_value_to_json(value));
}
}
}
}
}
JsonValue::Object(object)
}
fn update_event_payload(
collection: &str,
id: u64,
lsn: u64,
before: &JsonValue,
after: &JsonValue,
redact_fields: &[String],
) -> RedDBResult<Vec<u8>> {
let mut object = JsonMap::new();
let id_json = JsonValue::Number(id as f64);
let subject_id_for_hash = json_id_for_hash(&id_json);
object.insert(
"event_id".to_string(),
JsonValue::String(deterministic_event_id(
collection,
&subject_id_for_hash,
lsn,
"update",
)),
);
object.insert("op".to_string(), JsonValue::String("update".to_string()));
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert("id".to_string(), id_json);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(lsn as f64));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert(
"before".to_string(),
redact_json_object(before.clone(), redact_fields),
);
object.insert(
"after".to_string(),
redact_json_object(after.clone(), redact_fields),
);
crate::json::to_vec(&JsonValue::Object(object))
.map_err(|err| RedDBError::Internal(format!("encode update event payload: {err}")))
}
pub(crate) fn emit_truncate_event_for_collection(
runtime: &crate::RedDBRuntime,
collection: &str,
entities_count: u64,
) -> crate::RedDBResult<()> {
let Some(contract) = runtime.db().collection_contract_arc(collection) else {
return Ok(());
};
let subscriptions: Vec<_> = contract
.subscriptions
.iter()
.filter(|s| s.enabled)
.cloned()
.collect();
if subscriptions.is_empty() {
return Ok(());
}
let lsn = runtime.cdc_emit(
crate::replication::cdc::ChangeOperation::Delete,
collection,
0,
"truncate",
);
for subscription in &subscriptions {
let payload = truncate_event_payload(collection, lsn, entities_count)?;
runtime.enqueue_event_payload(&effective_queue_name(subscription), Value::Json(payload))?;
}
Ok(())
}
pub(crate) fn emit_collection_dropped_event_for_collection(
runtime: &crate::RedDBRuntime,
collection: &str,
final_entities_count: u64,
) -> crate::RedDBResult<()> {
let Some(contract) = runtime.db().collection_contract_arc(collection) else {
return Ok(());
};
let subscriptions: Vec<_> = contract
.subscriptions
.iter()
.filter(|s| s.enabled)
.cloned()
.collect();
if subscriptions.is_empty() {
return Ok(());
}
let lsn = runtime.cdc_emit(
crate::replication::cdc::ChangeOperation::Delete,
collection,
0,
"collection_dropped",
);
for subscription in &subscriptions {
let payload = collection_dropped_event_payload(collection, lsn, final_entities_count)?;
runtime.enqueue_event_payload(&effective_queue_name(subscription), Value::Json(payload))?;
}
Ok(())
}
fn truncate_event_payload(collection: &str, lsn: u64, entities_count: u64) -> RedDBResult<Vec<u8>> {
let mut object = JsonMap::new();
object.insert(
"event_id".to_string(),
JsonValue::String(deterministic_event_id(
collection, "truncate", lsn, "truncate",
)),
);
object.insert("op".to_string(), JsonValue::String("truncate".to_string()));
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(lsn as f64));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert(
"entities_count".to_string(),
JsonValue::Number(entities_count as f64),
);
crate::json::to_vec(&JsonValue::Object(object))
.map_err(|err| RedDBError::Internal(format!("encode truncate event payload: {err}")))
}
fn collection_dropped_event_payload(
collection: &str,
lsn: u64,
final_entities_count: u64,
) -> RedDBResult<Vec<u8>> {
let mut object = JsonMap::new();
object.insert(
"event_id".to_string(),
JsonValue::String(deterministic_event_id(
collection,
"collection_dropped",
lsn,
"collection_dropped",
)),
);
object.insert(
"op".to_string(),
JsonValue::String("collection_dropped".to_string()),
);
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(lsn as f64));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert(
"final_entities_count".to_string(),
JsonValue::Number(final_entities_count as f64),
);
crate::json::to_vec(&JsonValue::Object(object)).map_err(|err| {
RedDBError::Internal(format!("encode collection_dropped event payload: {err}"))
})
}
fn parse_where_filter(sql: &str) -> Option<crate::storage::query::AstFilter> {
crate::storage::query::Parser::new(sql)
.ok()
.and_then(|mut p| p.parse_filter().ok())
}
fn entity_passes_where_filter(
sub: &crate::catalog::SubscriptionDescriptor,
entity: &crate::storage::unified::entity::UnifiedEntity,
collection: &str,
) -> bool {
let Some(sql) = &sub.where_filter else {
return true;
};
let Some(filter) = parse_where_filter(sql) else {
return true;
};
crate::runtime::query_exec::evaluate_entity_filter(entity, &filter, collection, collection)
}
fn json_passes_where_filter(
sub: &crate::catalog::SubscriptionDescriptor,
json: &JsonValue,
) -> bool {
let Some(sql) = &sub.where_filter else {
return true;
};
let Some(filter) = parse_where_filter(sql) else {
return true;
};
eval_filter_against_json(json, &filter)
}
fn eval_filter_against_json(json: &JsonValue, filter: &crate::storage::query::AstFilter) -> bool {
use crate::storage::query::AstFilter as F;
match filter {
F::Compare { field, op, value } => {
let col = ast_field_col_name(field);
let json_val = json_object_field(json, col);
compare_json_to_store_value(json_val, value, *op)
}
F::And(a, b) => eval_filter_against_json(json, a) && eval_filter_against_json(json, b),
F::Or(a, b) => eval_filter_against_json(json, a) || eval_filter_against_json(json, b),
F::Not(f) => !eval_filter_against_json(json, f),
F::IsNull(field) => json_object_field(json, ast_field_col_name(field))
.is_none_or(|jv| matches!(jv, JsonValue::Null)),
F::IsNotNull(field) => json_object_field(json, ast_field_col_name(field))
.is_some_and(|jv| !matches!(jv, JsonValue::Null)),
_ => true,
}
}
fn ast_field_col_name(field: &crate::storage::query::FieldRef) -> &str {
use crate::storage::query::FieldRef;
match field {
FieldRef::TableColumn { column, .. } => column.as_str(),
FieldRef::NodeProperty { property, .. } => property.as_str(),
FieldRef::EdgeProperty { property, .. } => property.as_str(),
FieldRef::NodeId { alias } => alias.as_str(),
}
}
fn json_object_field<'a>(json: &'a JsonValue, col: &str) -> Option<&'a JsonValue> {
if let JsonValue::Object(map) = json {
map.get(col)
} else {
None
}
}
fn compare_json_to_store_value(
json: Option<&JsonValue>,
rhs: &Value,
op: crate::storage::query::CompareOp,
) -> bool {
use crate::storage::query::CompareOp;
use std::cmp::Ordering;
let lhs: Value = match json {
None | Some(JsonValue::Null) => Value::Null,
Some(JsonValue::Bool(b)) => Value::Boolean(*b),
Some(JsonValue::Number(n)) => {
let n = *n;
if n.fract() == 0.0 && n >= i64::MIN as f64 && n <= i64::MAX as f64 {
Value::Integer(n as i64)
} else {
Value::Float(n)
}
}
Some(JsonValue::String(s)) => Value::text(s.clone()),
Some(JsonValue::Array(_) | JsonValue::Object(_)) => return false,
};
match op {
CompareOp::Eq => lhs == *rhs,
CompareOp::Ne => lhs != *rhs,
_ => {
fn val_to_f64(v: &Value) -> Option<f64> {
match v {
Value::Integer(i) => Some(*i as f64),
Value::UnsignedInteger(u) => Some(*u as f64),
Value::Float(f) => Some(*f),
_ => None,
}
}
let Some(lf) = val_to_f64(&lhs) else {
return false;
};
let Some(rf) = val_to_f64(rhs) else {
return false;
};
let ord = lf.partial_cmp(&rf).unwrap_or(Ordering::Equal);
match op {
CompareOp::Lt => ord == Ordering::Less,
CompareOp::Le => ord != Ordering::Greater,
CompareOp::Gt => ord == Ordering::Greater,
CompareOp::Ge => ord != Ordering::Less,
_ => false,
}
}
}
}
fn delete_event_payload(
collection: &str,
id: u64,
lsn: u64,
before: &JsonValue,
redact_fields: &[String],
) -> RedDBResult<Vec<u8>> {
let mut object = JsonMap::new();
let id_json = JsonValue::Number(id as f64);
let subject_id_for_hash = json_id_for_hash(&id_json);
object.insert(
"event_id".to_string(),
JsonValue::String(deterministic_event_id(
collection,
&subject_id_for_hash,
lsn,
"delete",
)),
);
object.insert("op".to_string(), JsonValue::String("delete".to_string()));
object.insert(
"collection".to_string(),
JsonValue::String(collection.to_string()),
);
object.insert("id".to_string(), id_json);
object.insert(
"ts".to_string(),
JsonValue::Number(current_unix_ms() as f64),
);
object.insert("lsn".to_string(), JsonValue::Number(lsn as f64));
object.insert(
"tenant".to_string(),
crate::runtime::impl_core::current_tenant()
.map(JsonValue::String)
.unwrap_or(JsonValue::Null),
);
object.insert(
"before".to_string(),
redact_json_object(before.clone(), redact_fields),
);
object.insert("after".to_string(), JsonValue::Null);
crate::json::to_vec(&JsonValue::Object(object))
.map_err(|err| RedDBError::Internal(format!("encode delete event payload: {err}")))
}