use std::collections::HashMap;
use sonic_rs;
use crate::control::state::SharedState;
use crate::types::{TenantId, TraceId};
use nodedb_physical::physical_plan::DocumentOp;
use super::registry::DmlEvent;
#[derive(Debug)]
pub struct DmlWriteInfo {
pub collection: String,
pub document_id: Option<String>,
pub event: DmlEvent,
pub new_fields: Option<HashMap<String, nodedb_types::Value>>,
pub needs_existence_probe: bool,
}
pub fn classify_dml_write(plan: &crate::bridge::envelope::PhysicalPlan) -> Option<DmlWriteInfo> {
match plan {
crate::bridge::envelope::PhysicalPlan::Document(doc_op) => classify_document_op(doc_op),
_ => None,
}
}
fn classify_document_op(op: &DocumentOp) -> Option<DmlWriteInfo> {
match op {
DocumentOp::PointPut {
collection,
document_id,
value,
..
}
| DocumentOp::PointInsert {
collection,
document_id,
value,
..
} => {
let new_fields = deserialize_value_to_fields(value);
Some(DmlWriteInfo {
collection: collection.clone(),
document_id: Some(document_id.clone()),
event: DmlEvent::Insert,
new_fields: Some(new_fields),
needs_existence_probe: false,
})
}
DocumentOp::Upsert {
collection,
document_id,
value,
..
} => {
let new_fields = deserialize_value_to_fields(value);
Some(DmlWriteInfo {
collection: collection.clone(),
document_id: Some(document_id.clone()),
event: DmlEvent::Insert,
new_fields: Some(new_fields),
needs_existence_probe: true,
})
}
DocumentOp::PointDelete {
collection,
document_id,
..
} => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: Some(document_id.clone()),
event: DmlEvent::Delete,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::PointUpdate {
collection,
document_id,
..
} => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: Some(document_id.clone()),
event: DmlEvent::Update,
new_fields: None, needs_existence_probe: false,
}),
DocumentOp::BatchInsert { collection, .. } => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: None,
event: DmlEvent::Insert,
new_fields: None, needs_existence_probe: false,
}),
DocumentOp::BulkUpdate { collection, .. } => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: None,
event: DmlEvent::Update,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::BulkDelete { collection, .. } => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: None,
event: DmlEvent::Delete,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::Truncate { collection, .. } => Some(DmlWriteInfo {
collection: collection.clone(),
document_id: None,
event: DmlEvent::Delete,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::InsertSelect {
target_collection, ..
} => Some(DmlWriteInfo {
collection: target_collection.clone(),
document_id: None,
event: DmlEvent::Insert,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::UpdateFromJoin {
target_collection, ..
} => Some(DmlWriteInfo {
collection: target_collection.clone(),
document_id: None,
event: DmlEvent::Update,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::Merge {
target_collection, ..
} => Some(DmlWriteInfo {
collection: target_collection.clone(),
document_id: None,
event: DmlEvent::Update,
new_fields: None,
needs_existence_probe: false,
}),
DocumentOp::PointGet { .. }
| DocumentOp::Scan { .. }
| DocumentOp::RangeScan { .. }
| DocumentOp::Register { .. }
| DocumentOp::IndexLookup { .. }
| DocumentOp::IndexedFetch { .. }
| DocumentOp::DropIndex { .. }
| DocumentOp::BackfillIndex { .. }
| DocumentOp::EstimateCount { .. }
| DocumentOp::MaterializeScan { .. } => None,
}
}
fn deserialize_value_to_fields(value: &[u8]) -> HashMap<String, nodedb_types::Value> {
if let Ok(serde_json::Value::Object(map)) = nodedb_types::json_from_msgpack(value) {
return map
.into_iter()
.map(|(k, v)| (k, nodedb_types::Value::from(v)))
.collect();
}
if let Ok(serde_json::Value::Object(map)) = sonic_rs::from_slice::<serde_json::Value>(value) {
return map
.into_iter()
.map(|(k, v)| (k, nodedb_types::Value::from(v)))
.collect();
}
HashMap::new()
}
pub fn patch_task_with_mutated_fields(
task: &mut nodedb_physical::physical_task::PhysicalTask,
mutated: &HashMap<String, nodedb_types::Value>,
) {
use crate::bridge::envelope::PhysicalPlan;
let json_obj: serde_json::Map<String, serde_json::Value> = mutated
.iter()
.map(|(k, v)| (k.clone(), serde_json::Value::from(v.clone())))
.collect();
let json_val = serde_json::Value::Object(json_obj);
let new_bytes = match nodedb_types::value_to_msgpack(&nodedb_types::Value::from(json_val)) {
Ok(b) => b,
Err(_) => return,
};
match &mut task.plan {
PhysicalPlan::Document(DocumentOp::PointPut { value, .. })
| PhysicalPlan::Document(DocumentOp::PointInsert { value, .. })
| PhysicalPlan::Document(DocumentOp::Upsert { value, .. }) => {
*value = new_bytes;
}
PhysicalPlan::Document(DocumentOp::PointUpdate { updates, .. }) => {
*updates = mutated
.iter()
.filter_map(|(k, v)| {
nodedb_types::value_to_msgpack(v).ok().map(|b| {
(
k.clone(),
nodedb_physical::physical_plan::UpdateValue::Literal(b),
)
})
})
.collect();
}
_ => {}
}
}
pub async fn fetch_old_row(
state: &SharedState,
tenant_id: TenantId,
collection: &str,
document_id: &str,
) -> HashMap<String, nodedb_types::Value> {
let pk_bytes = document_id.as_bytes().to_vec();
let surrogate = state
.surrogate_assigner
.lookup(collection, &pk_bytes)
.ok()
.flatten()
.unwrap_or(nodedb_types::Surrogate::ZERO);
let plan = crate::bridge::envelope::PhysicalPlan::Document(DocumentOp::PointGet {
collection: collection.to_string(),
document_id: document_id.to_string(),
surrogate,
pk_bytes,
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
});
let vshard_id = crate::types::VShardId::from_key(document_id.as_bytes());
let resp = match crate::control::server::dispatch_utils::dispatch_to_data_plane(
state,
tenant_id,
vshard_id,
plan,
TraceId::ZERO,
)
.await
{
Ok(r) => r,
Err(_) => return HashMap::new(),
};
if resp.payload.is_empty() {
return HashMap::new();
}
let bytes = resp.payload.as_ref();
if let Ok(serde_json::Value::Object(map)) = nodedb_types::json_from_msgpack(bytes) {
return map
.into_iter()
.map(|(k, v)| (k, nodedb_types::Value::from(v)))
.collect();
}
if let Ok(serde_json::Value::Object(map)) = sonic_rs::from_slice::<serde_json::Value>(bytes) {
return map
.into_iter()
.map(|(k, v)| (k, nodedb_types::Value::from(v)))
.collect();
}
HashMap::new()
}
pub fn has_triggers(state: &SharedState, tenant_id: TenantId, collection: &str) -> bool {
let tid = tenant_id.as_u64();
!state
.trigger_registry
.get_matching(tid, collection, DmlEvent::Insert)
.is_empty()
|| !state
.trigger_registry
.get_matching(tid, collection, DmlEvent::Update)
.is_empty()
|| !state
.trigger_registry
.get_matching(tid, collection, DmlEvent::Delete)
.is_empty()
}