use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant as StdInstant};
use arrow_array::{BooleanArray, Int64Array, LargeBinaryArray, RecordBatch, UInt8Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_plan::PhysicalExpr;
use tokio::runtime::Handle;
use tracing::warn;
use uni_common::cypher_value_codec;
use uni_common::{Properties, UniError, Value};
use uni_plugin::PluginRegistry;
use uni_plugin::traits::trigger::{
FireMode, MutationBatch, TriggerContext, TriggerEventMask, TriggerOutcome, TriggerPhase,
TriggerPlugin, TriggerSubscription,
};
use uni_store::runtime::L0Manager;
use uni_store::runtime::l0::L0Buffer;
const PHASE_COUNT: usize = 4;
pub fn event_row_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("event_kind", DataType::UInt8, false),
Field::new("vid_or_eid", DataType::Int64, false),
Field::new("label", DataType::Utf8, false),
Field::new("property", DataType::Utf8, false),
Field::new("old_value", DataType::LargeBinary, true),
Field::new("new_value", DataType::LargeBinary, true),
Field::new("properties_new", DataType::LargeBinary, true),
Field::new("properties_old", DataType::LargeBinary, true),
]))
}
fn compile_predicate(source: &str) -> Result<(Arc<dyn PhysicalExpr>, HashSet<String>), String> {
use datafusion::common::DFSchema;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::optimizer::AnalyzerRule;
use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion::physical_expr::create_physical_expr;
use datafusion::prelude::SessionContext;
let mut cypher_expr =
uni_cypher::parse_expression(source).map_err(|e| format!("parse: {e}"))?;
let mut props_referenced: HashSet<String> = HashSet::new();
rewrite_property_refs(&mut cypher_expr, &mut props_referenced);
let df_expr_raw = uni_query::query::df_expr::cypher_expr_to_df(&cypher_expr, None)
.map_err(|e| format!("translate: {e}"))?;
let schema = event_row_schema();
let df_schema = DFSchema::try_from(schema.as_ref().clone())
.map_err(|e| format!("schema-conversion: {e}"))?;
let ctx = SessionContext::new();
uni_query::query::df_udfs::register_cypher_udfs(&ctx)
.map_err(|e| format!("udf-register: {e}"))?;
let state = ctx.state();
let config = state.config_options().clone();
let props = state.execution_props();
let df_expr_resolved = resolve_dummy_udfs(df_expr_raw, &state)
.map_err(|e| format!("resolve-udfs (pre-coerce): {e}"))?;
let df_expr = uni_query::query::df_expr::apply_type_coercion(&df_expr_resolved, &df_schema)
.map_err(|e| format!("cypher-coercion: {e}"))?;
let empty = datafusion::logical_expr::LogicalPlan::EmptyRelation(
datafusion::logical_expr::EmptyRelation {
produce_one_row: false,
schema: Arc::new(df_schema.clone()),
},
);
let filter_plan = LogicalPlanBuilder::from(empty)
.filter(df_expr.clone())
.map_err(|e| format!("filter-plan: {e}"))?
.build()
.map_err(|e| format!("plan-build: {e}"))?;
let coerced_expr = match TypeCoercion::new().analyze(filter_plan, &config) {
Ok(datafusion::logical_expr::LogicalPlan::Filter(f)) => f.predicate,
_ => df_expr,
};
let resolved_expr =
resolve_dummy_udfs(coerced_expr, &state).map_err(|e| format!("resolve-udfs: {e}"))?;
let physical = create_physical_expr(&resolved_expr, &df_schema, props)
.map_err(|e| format!("physical-expr: {e}"))?;
Ok((physical, props_referenced))
}
fn resolve_dummy_udfs(
expr: datafusion::logical_expr::Expr,
state: &datafusion::execution::SessionState,
) -> Result<datafusion::logical_expr::Expr, String> {
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::logical_expr::Expr as DfExpr;
let result = expr
.transform_up(|node| {
if let DfExpr::ScalarFunction(ref func) = node {
let udf_name = func.func.name();
if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
return Ok(Transformed::yes(DfExpr::ScalarFunction(
datafusion::logical_expr::expr::ScalarFunction {
func: registered_udf.clone(),
args: func.args.clone(),
},
)));
}
}
Ok(Transformed::no(node))
})
.map_err(|e| format!("udf-resolve walk: {e}"))?;
Ok(result.data)
}
fn rewrite_property_refs(expr: &mut uni_cypher::ast::Expr, referenced: &mut HashSet<String>) {
use uni_cypher::ast::Expr;
match expr {
Expr::Property(base, prop) => {
rewrite_property_refs(base, referenced);
if let Expr::Variable(name) = base.as_ref() {
match name.as_str() {
"n" => {
referenced.insert(prop.clone());
**base = Expr::Variable("properties_new".to_owned());
}
"old" => {
referenced.insert(prop.clone());
**base = Expr::Variable("properties_old".to_owned());
}
_ => {}
}
}
}
Expr::BinaryOp { left, right, .. } => {
rewrite_property_refs(left, referenced);
rewrite_property_refs(right, referenced);
}
Expr::UnaryOp { expr: inner, .. } => rewrite_property_refs(inner, referenced),
Expr::FunctionCall { args, .. } => {
for a in args {
rewrite_property_refs(a, referenced);
}
}
Expr::Case {
expr: case_expr,
when_then,
else_expr,
} => {
if let Some(e) = case_expr.as_deref_mut() {
rewrite_property_refs(e, referenced);
}
for (w, t) in when_then {
rewrite_property_refs(w, referenced);
rewrite_property_refs(t, referenced);
}
if let Some(e) = else_expr.as_deref_mut() {
rewrite_property_refs(e, referenced);
}
}
Expr::IsNull(inner) | Expr::IsNotNull(inner) | Expr::IsUnique(inner) => {
rewrite_property_refs(inner, referenced);
}
Expr::In { expr: e, list } => {
rewrite_property_refs(e, referenced);
rewrite_property_refs(list, referenced);
}
Expr::List(items) => {
for i in items {
rewrite_property_refs(i, referenced);
}
}
Expr::Map(pairs) => {
for (_, v) in pairs {
rewrite_property_refs(v, referenced);
}
}
Expr::ArrayIndex { array, index } => {
rewrite_property_refs(array, referenced);
rewrite_property_refs(index, referenced);
}
Expr::ArraySlice { array, start, end } => {
rewrite_property_refs(array, referenced);
if let Some(s) = start.as_deref_mut() {
rewrite_property_refs(s, referenced);
}
if let Some(e) = end.as_deref_mut() {
rewrite_property_refs(e, referenced);
}
}
_ => {}
}
}
fn phase_index(p: TriggerPhase) -> usize {
match p {
TriggerPhase::BeforeMutation => 0,
TriggerPhase::AfterMutation => 1,
TriggerPhase::BeforeCommit => 2,
TriggerPhase::AfterCommit => 3,
_ => 0,
}
}
struct RouteEntry {
plugin: Arc<dyn TriggerPlugin>,
name: String,
event_mask: u32,
label_filter: Option<Vec<String>>,
edge_type_filter: Option<Vec<String>>,
property_filter: Option<Vec<String>>,
fire_mode: FireMode,
compiled_predicate: Option<Arc<dyn PhysicalExpr>>,
properties_referenced: HashSet<String>,
}
impl RouteEntry {
fn matches(&self, kind: TriggerEventMask, label_or_type: &str) -> bool {
if (self.event_mask & kind.0) == 0 {
return false;
}
if let Some(ref labels) = self.label_filter
&& kind_is_node(kind)
&& !labels.iter().any(|l| l.as_str() == label_or_type)
{
return false;
}
if let Some(ref ets) = self.edge_type_filter
&& kind_is_edge(kind)
&& !ets.iter().any(|e| e.as_str() == label_or_type)
{
return false;
}
true
}
}
fn kind_is_node(kind: TriggerEventMask) -> bool {
let mask = TriggerEventMask::NODE_CREATE
.union(TriggerEventMask::NODE_UPDATE)
.union(TriggerEventMask::NODE_DELETE)
.union(TriggerEventMask::LABEL_ADDED)
.union(TriggerEventMask::LABEL_REMOVED);
(kind.0 & mask.0) != 0
}
fn kind_is_edge(kind: TriggerEventMask) -> bool {
let mask = TriggerEventMask::EDGE_CREATE
.union(TriggerEventMask::EDGE_UPDATE)
.union(TriggerEventMask::EDGE_DELETE);
(kind.0 & mask.0) != 0
}
pub struct TriggerRouter {
by_phase: [Vec<RouteEntry>; PHASE_COUNT],
defer_queue: Option<Arc<DeferralQueue>>,
}
impl TriggerRouter {
pub fn from_registry(reg: &PluginRegistry) -> Result<Self, UniError> {
Self::from_registry_with_queue(reg, None)
}
pub fn from_registry_with_queue(
reg: &PluginRegistry,
defer_queue: Option<Arc<DeferralQueue>>,
) -> Result<Self, UniError> {
let triggers = reg.triggers();
let mut by_phase: [Vec<RouteEntry>; PHASE_COUNT] =
[Vec::new(), Vec::new(), Vec::new(), Vec::new()];
for plugin in triggers.iter() {
let sub: &TriggerSubscription = plugin.subscription();
let name = subscription_name(sub);
let (compiled_predicate, properties_referenced) = match sub.predicate_source.as_deref()
{
Some(src) => {
let (expr, refs) =
compile_predicate(src).map_err(|e| UniError::TriggerRejected {
trigger: name.clone(),
reason: format!(
"predicate_source compile failed: {e}. \
Supported references: event-row columns \
(event_kind, vid_or_eid, label, property, \
old_value, new_value) and entity property \
references `n.<prop>` (post-mutation) / \
`old.<prop>` (pre-image)."
),
})?;
(Some(expr), refs)
}
None => (None, HashSet::new()),
};
let entry = RouteEntry {
plugin: Arc::clone(plugin),
name,
event_mask: sub.events.0,
label_filter: sub
.labels
.as_ref()
.map(|v| v.iter().map(|s| s.to_string()).collect()),
edge_type_filter: sub
.edge_types
.as_ref()
.map(|v| v.iter().map(|s| s.to_string()).collect()),
property_filter: sub
.properties
.as_ref()
.map(|v| v.iter().map(|s| s.to_string()).collect()),
fire_mode: sub.fire_mode,
compiled_predicate,
properties_referenced,
};
by_phase[phase_index(sub.phase)].push(entry);
}
Ok(Self {
by_phase,
defer_queue,
})
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.by_phase.iter().all(|v| v.is_empty())
}
#[must_use]
pub fn properties_referenced(&self) -> HashSet<String> {
let mut out: HashSet<String> = HashSet::new();
for routes in &self.by_phase {
for entry in routes {
for p in &entry.properties_referenced {
out.insert(p.clone());
}
}
}
out
}
pub fn dispatch_before(
&self,
ctx: TriggerContext<'_>,
events: &MutationEvents,
) -> Result<(), UniError> {
for &phase in &[TriggerPhase::BeforeMutation, TriggerPhase::BeforeCommit] {
let routes = &self.by_phase[phase_index(phase)];
for entry in routes {
if !matches!(entry.fire_mode, FireMode::Synchronous) {
continue;
}
let Some(batch) = events.filter_for(entry) else {
continue;
};
let mb = MutationBatch {
events: Arc::new(batch),
};
let ctx_ref = TriggerContext::new(ctx.session_id, ctx.tx_id);
match entry.plugin.fire(ctx_ref, &mb) {
Ok(TriggerOutcome::Continue) => {}
Ok(TriggerOutcome::Reject { reason }) => {
return Err(UniError::TriggerRejected {
trigger: entry.name.to_string(),
reason,
});
}
Ok(TriggerOutcome::Defer { until }) => {
enqueue_deferral(
&self.defer_queue,
Arc::clone(&entry.plugin),
entry.name.clone(),
mb.clone(),
ctx.session_id.to_owned(),
ctx.tx_id,
until,
);
}
Ok(_) => {
}
Err(e) => {
return Err(UniError::TriggerRejected {
trigger: entry.name.to_string(),
reason: e.to_string(),
});
}
}
}
}
Ok(())
}
pub fn dispatch_after(
&self,
ctx: TriggerContext<'_>,
events: &MutationEvents,
runtime: &Handle,
) {
for &phase in &[TriggerPhase::AfterMutation, TriggerPhase::AfterCommit] {
let routes = &self.by_phase[phase_index(phase)];
for entry in routes {
let Some(batch) = events.filter_for(entry) else {
continue;
};
let mb = MutationBatch {
events: Arc::new(batch),
};
match entry.fire_mode {
FireMode::Synchronous => {
fire_caught(entry, ctx.session_id, ctx.tx_id, &mb, &self.defer_queue);
}
_ => {
let plugin = Arc::clone(&entry.plugin);
let name = entry.name.clone();
let session_id = ctx.session_id.to_owned();
let tx_id = ctx.tx_id;
let queue = self.defer_queue.clone();
runtime.spawn(async move {
let mb_inner = mb;
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
plugin.fire(TriggerContext::new(&session_id, tx_id), &mb_inner)
}));
handle_fire_outcome(result, &name, "async trigger", |until| {
enqueue_deferral(
&queue,
Arc::clone(&plugin),
name.clone(),
mb_inner,
session_id.clone(),
tx_id,
until,
);
});
});
}
}
}
}
}
}
fn enqueue_deferral(
queue: &Option<Arc<DeferralQueue>>,
plugin: Arc<dyn TriggerPlugin>,
name: String,
mb: MutationBatch,
session_id: String,
tx_id: u64,
until: uni_plugin::traits::trigger::TriggerDeferral,
) {
let Some(queue) = queue else {
warn!(trigger = %name, "Defer with no queue wired; dropping");
return;
};
let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
queue.push(
DeferredItem {
plugin,
name,
batch: mb,
session_id,
tx_id,
attempts: 0,
payload: until.payload,
},
fire_at,
);
}
fn handle_fire_outcome<E: std::fmt::Display>(
outcome: Result<Result<TriggerOutcome, E>, Box<dyn std::any::Any + Send>>,
name: &str,
label: &str,
on_defer: impl FnOnce(uni_plugin::traits::trigger::TriggerDeferral),
) {
match outcome {
Ok(Ok(TriggerOutcome::Defer { until })) => on_defer(until),
Ok(Ok(_)) => {}
Ok(Err(e)) => warn!(trigger = %name, error = %e, "{label} errored"),
Err(_) => warn!(trigger = %name, "{label} panicked"),
}
}
fn fire_caught(
entry: &RouteEntry,
session_id: &str,
tx_id: u64,
mb: &MutationBatch,
defer_queue: &Option<Arc<DeferralQueue>>,
) {
let plugin = Arc::clone(&entry.plugin);
let name = entry.name.clone();
let mb_clone = mb.clone();
let session_id_owned = session_id.to_owned();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
plugin.fire(TriggerContext::new(&session_id_owned, tx_id), &mb_clone)
}));
handle_fire_outcome(result, &name, "after-phase trigger", |until| {
enqueue_deferral(
defer_queue,
plugin,
name.clone(),
mb_clone,
session_id_owned,
tx_id,
until,
);
});
}
fn subscription_name(sub: &TriggerSubscription) -> String {
sub.docs
.lines()
.next()
.map(str::to_owned)
.unwrap_or_else(|| "<unnamed trigger>".to_owned())
}
pub struct MutationEvents {
rows: Vec<MutationRow>,
}
struct MutationRow {
event_kind: TriggerEventMask,
vid_or_eid: i64,
label_or_type: String,
old_value: Option<Vec<u8>>,
new_properties: Option<Properties>,
old_properties: Option<Properties>,
}
#[derive(Default)]
pub struct PreExistingProbe {
vertices: HashMap<uni_common::Vid, Properties>,
edges: HashMap<uni_common::Eid, Properties>,
}
impl PreExistingProbe {
#[must_use]
pub fn from_l0_chain(l0_manager: &L0Manager, tx_l0: &L0Buffer) -> Self {
let mut vertices: HashMap<uni_common::Vid, Properties> = HashMap::new();
let mut edges: HashMap<uni_common::Eid, Properties> = HashMap::new();
let candidate_vids: Vec<uni_common::Vid> = tx_l0
.vertex_properties
.keys()
.copied()
.chain(tx_l0.vertex_tombstones.iter().copied())
.collect();
let candidate_eids: Vec<uni_common::Eid> = tx_l0
.edge_endpoints
.keys()
.copied()
.chain(tx_l0.tombstones.keys().copied())
.collect();
let mut probe_buffer = |buf: &L0Buffer| {
for vid in &candidate_vids {
if vertices.contains_key(vid) {
continue;
}
if buf.vertex_tombstones.contains(vid) {
continue;
}
if let Some(props) = buf.vertex_properties.get(vid) {
vertices.insert(*vid, props.clone());
}
}
for eid in &candidate_eids {
if edges.contains_key(eid) {
continue;
}
if buf.tombstones.contains_key(eid) {
continue;
}
if buf.edge_endpoints.contains_key(eid) {
let props = buf.edge_properties.get(eid).cloned().unwrap_or_default();
edges.insert(*eid, props);
}
}
};
{
let current = l0_manager.get_current();
let g = current.read();
probe_buffer(&g);
}
for pending in l0_manager.get_pending_flush() {
let g = pending.read();
probe_buffer(&g);
}
Self { vertices, edges }
}
#[must_use]
pub fn pending_l1_candidates(&self, tx_l0: &L0Buffer) -> Vec<(uni_common::Vid, String)> {
let mut out: Vec<(uni_common::Vid, String)> = Vec::new();
for vid in tx_l0
.vertex_properties
.keys()
.chain(tx_l0.vertex_tombstones.iter())
{
if self.vertices.contains_key(vid) {
continue;
}
let label = tx_l0
.vertex_labels
.get(vid)
.and_then(|labels| labels.first())
.cloned();
if let Some(label) = label {
out.push((*vid, label));
}
}
out
}
pub async fn extend_with_l1(
&mut self,
candidates: Vec<(uni_common::Vid, String)>,
storage: &uni_store::storage::manager::StorageManager,
) {
use arrow_array::Array;
use std::collections::HashMap as StdHashMap;
use uni_store::storage::arrow_convert::arrow_to_value;
const CHUNK_SIZE: usize = 1024;
let mut by_label: StdHashMap<String, Vec<uni_common::Vid>> = StdHashMap::new();
for (vid, label) in candidates {
by_label.entry(label).or_default().push(vid);
}
for (label, vids) in by_label {
let table_name = uni_store::backend::table_names::vertex_table_name(&label);
let column_names: Vec<String> =
match storage.backend().get_table_schema(&table_name).await {
Ok(Some(schema)) => schema.fields().iter().map(|f| f.name().clone()).collect(),
Ok(None) => {
continue;
}
Err(e) => {
warn!(label = %label, error = %e, "L1 pre-image probe: \
schema lookup failed; vids fall back to CREATE");
continue;
}
};
let col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
for chunk in vids.chunks(CHUNK_SIZE) {
let in_list = chunk
.iter()
.map(|v| v.as_u64().to_string())
.collect::<Vec<_>>()
.join(",");
let filter = format!("_vid IN ({in_list})");
let batch = match storage
.scan_vertex_table(&label, &col_refs, Some(&filter))
.await
{
Ok(Some(b)) => b,
Ok(None) => continue,
Err(e) => {
warn!(label = %label, error = %e, "L1 pre-image probe failed; \
affected vids fall back to NODE_CREATE classification");
continue;
}
};
let vid_col = match batch
.column_by_name("_vid")
.and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
{
Some(c) => c,
None => {
warn!(label = %label, "L1 probe returned batch without _vid column");
continue;
}
};
let schema = batch.schema();
let property_cols: Vec<(usize, String)> = schema
.fields()
.iter()
.enumerate()
.filter_map(|(idx, f)| {
let name = f.name();
if name == "_vid"
|| name == "_version"
|| name == "_label"
|| name == "_labels"
{
None
} else {
Some((idx, name.clone()))
}
})
.collect();
for row in 0..vid_col.len() {
if vid_col.is_null(row) {
continue;
}
let raw = vid_col.value(row);
let vid = uni_common::Vid::from(raw);
let mut props = Properties::new();
for (col_idx, col_name) in &property_cols {
let col = batch.column(*col_idx);
let value = arrow_to_value(col.as_ref(), row, None);
if !matches!(value, uni_common::Value::Null) {
props.insert(col_name.clone(), value);
}
}
self.vertices.entry(vid).or_insert(props);
}
}
}
}
#[must_use]
pub fn vertex_pre_existed(&self, vid: uni_common::Vid) -> bool {
self.vertices.contains_key(&vid)
}
#[must_use]
pub fn edge_pre_existed(&self, eid: uni_common::Eid) -> bool {
self.edges.contains_key(&eid)
}
fn edge_old_bytes(&self, eid: uni_common::Eid) -> Option<Vec<u8>> {
self.edges.get(&eid).map(serialize_properties)
}
fn vertex_old_bytes(&self, vid: uni_common::Vid) -> Option<Vec<u8>> {
self.vertices.get(&vid).map(serialize_properties)
}
#[must_use]
pub fn vertex_properties(&self, vid: uni_common::Vid) -> Option<&Properties> {
self.vertices.get(&vid)
}
#[must_use]
pub fn edge_properties(&self, eid: uni_common::Eid) -> Option<&Properties> {
self.edges.get(&eid)
}
}
fn serialize_properties(props: &Properties) -> Vec<u8> {
serde_json::to_vec(props).unwrap_or_default()
}
impl MutationEvents {
#[must_use]
pub fn from_l0(l0: &L0Buffer) -> Self {
Self::from_l0_with_probe(l0, None, &HashSet::new())
}
#[must_use]
pub fn from_l0_with_probe(
l0: &L0Buffer,
probe: Option<&PreExistingProbe>,
properties_referenced: &HashSet<String>,
) -> Self {
let mut rows: Vec<MutationRow> = Vec::with_capacity(l0.mutation_count);
let track_props = !properties_referenced.is_empty();
let filtered = |props: &Properties| -> Option<Properties> {
if !track_props {
return None;
}
let mut out: Properties = Properties::new();
for k in properties_referenced {
if let Some(v) = props.get(k) {
out.insert(k.clone(), v.clone());
}
}
Some(out)
};
for (vid, props) in &l0.vertex_properties {
if l0.vertex_tombstones.contains(vid) {
continue;
}
let id = vid_to_i64(*vid);
let labels = l0.vertex_labels.get(vid);
let (kind, old, old_props_map) = match probe {
Some(p) if p.vertex_pre_existed(*vid) => (
TriggerEventMask::NODE_UPDATE,
p.vertex_old_bytes(*vid),
p.vertex_properties(*vid).and_then(&filtered),
),
Some(_) => (TriggerEventMask::NODE_CREATE, None, None),
None => (TriggerEventMask::NODE_UPDATE, None, None),
};
let new_props_map = filtered(props);
match labels {
Some(ls) if !ls.is_empty() => {
for l in ls {
rows.push(MutationRow {
event_kind: kind,
vid_or_eid: id,
label_or_type: l.clone(),
old_value: old.clone(),
new_properties: new_props_map.clone(),
old_properties: old_props_map.clone(),
});
}
}
_ => {
rows.push(MutationRow {
event_kind: kind,
vid_or_eid: id,
label_or_type: String::new(),
old_value: old,
new_properties: new_props_map,
old_properties: old_props_map,
});
}
}
}
for vid in &l0.vertex_tombstones {
let id = vid_to_i64(*vid);
let labels = l0.vertex_labels.get(vid);
let old = probe.and_then(|p| p.vertex_old_bytes(*vid));
let old_props_map = probe
.and_then(|p| p.vertex_properties(*vid))
.and_then(&filtered);
match labels {
Some(ls) if !ls.is_empty() => {
for l in ls {
rows.push(MutationRow {
event_kind: TriggerEventMask::NODE_DELETE,
vid_or_eid: id,
label_or_type: l.clone(),
old_value: old.clone(),
new_properties: None,
old_properties: old_props_map.clone(),
});
}
}
_ => {
rows.push(MutationRow {
event_kind: TriggerEventMask::NODE_DELETE,
vid_or_eid: id,
label_or_type: String::new(),
old_value: old,
new_properties: None,
old_properties: old_props_map,
});
}
}
}
for eid in l0.edge_endpoints.keys() {
if l0.tombstones.contains_key(eid) {
continue;
}
let etype = l0.edge_types.get(eid).cloned().unwrap_or_default();
let (kind, old, old_props_map) = match probe {
Some(p) if p.edge_pre_existed(*eid) => (
TriggerEventMask::EDGE_UPDATE,
p.edge_old_bytes(*eid),
p.edge_properties(*eid).and_then(&filtered),
),
Some(_) => (TriggerEventMask::EDGE_CREATE, None, None),
None => (TriggerEventMask::EDGE_UPDATE, None, None),
};
let new_props_map = l0.edge_properties.get(eid).and_then(&filtered);
rows.push(MutationRow {
event_kind: kind,
vid_or_eid: eid_to_i64(*eid),
label_or_type: etype,
old_value: old,
new_properties: new_props_map,
old_properties: old_props_map,
});
}
for (eid, ts) in &l0.tombstones {
let etype = l0
.edge_types
.get(eid)
.cloned()
.unwrap_or_else(|| format!("type:{}", ts.edge_type));
let old = probe.and_then(|p| p.edge_old_bytes(*eid));
let old_props_map = probe
.and_then(|p| p.edge_properties(*eid))
.and_then(&filtered);
rows.push(MutationRow {
event_kind: TriggerEventMask::EDGE_DELETE,
vid_or_eid: eid_to_i64(*eid),
label_or_type: etype,
old_value: old,
new_properties: None,
old_properties: old_props_map,
});
}
Self { rows }
}
#[must_use]
pub fn materialize_all(&self) -> Option<RecordBatch> {
if self.rows.is_empty() {
return None;
}
let mut cols = EventRowColumns::with_capacity(self.rows.len());
for row in &self.rows {
cols.push_row(row);
}
cols.into_batch()
}
fn filter_for(&self, entry: &RouteEntry) -> Option<RecordBatch> {
let _ = &entry.property_filter;
let mut cols = EventRowColumns::default();
for row in &self.rows {
if entry.matches(row.event_kind, &row.label_or_type) {
cols.push_row(row);
}
}
let batch = cols.into_batch()?;
let batch = match &entry.compiled_predicate {
Some(predicate) => apply_predicate(predicate, batch)?,
None => batch,
};
if batch.num_rows() == 0 {
return None;
}
Some(batch)
}
}
#[derive(Default)]
struct EventRowColumns {
kinds: Vec<u8>,
ids: Vec<i64>,
labels: Vec<String>,
properties: Vec<String>,
olds: Vec<Option<Vec<u8>>>,
news: Vec<Option<Vec<u8>>>,
props_new: Vec<Option<Vec<u8>>>,
props_old: Vec<Option<Vec<u8>>>,
}
impl EventRowColumns {
fn with_capacity(cap: usize) -> Self {
Self {
kinds: Vec::with_capacity(cap),
ids: Vec::with_capacity(cap),
labels: Vec::with_capacity(cap),
properties: Vec::with_capacity(cap),
olds: Vec::with_capacity(cap),
news: Vec::with_capacity(cap),
props_new: Vec::with_capacity(cap),
props_old: Vec::with_capacity(cap),
}
}
fn push_row(&mut self, row: &MutationRow) {
self.kinds.push(mask_to_discriminant(row.event_kind));
self.ids.push(row.vid_or_eid);
self.labels.push(row.label_or_type.clone());
self.properties.push(String::new());
self.olds.push(row.old_value.clone());
self.news.push(None);
self.props_new.push(
row.new_properties
.as_ref()
.map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
);
self.props_old.push(
row.old_properties
.as_ref()
.map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
);
}
fn into_batch(self) -> Option<RecordBatch> {
if self.kinds.is_empty() {
return None;
}
let kind_arr: Arc<dyn arrow_array::Array> = Arc::new(UInt8Array::from(self.kinds));
let id_arr: Arc<dyn arrow_array::Array> = Arc::new(Int64Array::from(self.ids));
let label_arr: Arc<dyn arrow_array::Array> =
Arc::new(arrow_array::StringArray::from(self.labels));
let prop_arr: Arc<dyn arrow_array::Array> =
Arc::new(arrow_array::StringArray::from(self.properties));
let olds_iter: Vec<Option<&[u8]>> = self.olds.iter().map(|o| o.as_deref()).collect();
let news_iter: Vec<Option<&[u8]>> = self.news.iter().map(|o| o.as_deref()).collect();
let old_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(olds_iter));
let new_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(news_iter));
let pnew_iter: Vec<Option<&[u8]>> = self.props_new.iter().map(|o| o.as_deref()).collect();
let pold_iter: Vec<Option<&[u8]>> = self.props_old.iter().map(|o| o.as_deref()).collect();
let pnew_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pnew_iter));
let pold_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pold_iter));
RecordBatch::try_new(
event_row_schema(),
vec![
kind_arr, id_arr, label_arr, prop_arr, old_arr, new_arr, pnew_arr, pold_arr,
],
)
.ok()
}
}
fn apply_predicate(predicate: &Arc<dyn PhysicalExpr>, batch: RecordBatch) -> Option<RecordBatch> {
use datafusion::arrow::compute::filter_record_batch;
use datafusion::logical_expr::ColumnarValue;
let value = match predicate.evaluate(&batch) {
Ok(v) => v,
Err(e) => {
warn!(error = %e, "trigger predicate evaluation failed; dropping batch");
return None;
}
};
let array = match value {
ColumnarValue::Array(a) => a,
ColumnarValue::Scalar(s) => match s.to_array_of_size(batch.num_rows()) {
Ok(a) => a,
Err(e) => {
warn!(error = %e, "trigger predicate scalar→array failed");
return None;
}
},
};
let bool_arr = match array.as_any().downcast_ref::<BooleanArray>() {
Some(b) => b,
None => {
warn!("trigger predicate must yield Boolean; dropping batch");
return None;
}
};
filter_record_batch(&batch, bool_arr).ok()
}
fn mask_to_discriminant(m: TriggerEventMask) -> u8 {
if m.0 == 0 {
return 0;
}
m.0.trailing_zeros() as u8 + 1
}
fn vid_to_i64(vid: uni_common::Vid) -> i64 {
vid.as_u64() as i64
}
fn eid_to_i64(eid: uni_common::Eid) -> i64 {
eid.as_u64() as i64
}
const DEFER_MAX_ATTEMPTS: u32 = 10;
struct DeferredItem {
plugin: Arc<dyn TriggerPlugin>,
name: String,
batch: MutationBatch,
session_id: String,
tx_id: u64,
attempts: u32,
payload: String,
}
#[derive(Default)]
pub struct DeferralQueue {
inner: parking_lot::Mutex<BTreeMap<StdInstant, Vec<DeferredItem>>>,
sidecar: parking_lot::Mutex<Option<DeferralSidecar>>,
}
impl std::fmt::Debug for DeferralQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let len: usize = self.inner.lock().values().map(|v| v.len()).sum();
f.debug_struct("DeferralQueue").field("size", &len).finish()
}
}
impl DeferralQueue {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
#[must_use]
pub fn with_persistence(data_path: std::path::PathBuf) -> Arc<Self> {
let queue = Arc::new(Self::default());
*queue.sidecar.lock() = Some(DeferralSidecar::new(data_path));
queue
}
#[must_use]
pub fn sidecar_path(&self) -> Option<std::path::PathBuf> {
self.sidecar.lock().as_ref().map(|s| s.path().to_path_buf())
}
pub fn load_from_sidecar(
self: &Arc<Self>,
registry: &Arc<uni_plugin::PluginRegistry>,
) -> usize {
let Some(sidecar) = self.sidecar.lock().clone() else {
return 0;
};
let now_wall = std::time::SystemTime::now();
let now_mono = StdInstant::now();
let rows = match sidecar.read_all() {
Ok(rows) => rows,
Err(e) => {
tracing::debug!(error = %e, "DeferralQueue: sidecar read failed");
return 0;
}
};
let mut restored = 0usize;
for row in rows {
let Some(entry) = registry
.triggers()
.iter()
.find(|t| subscription_name(t.subscription()) == row.name)
.cloned()
else {
tracing::warn!(
trigger = %row.name,
"DeferralQueue: dropping persisted item; trigger no longer registered"
);
continue;
};
let batch = match arrow_ipc_decode(&row.batch_ipc) {
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "DeferralQueue: drop persisted item; IPC decode failed");
continue;
}
};
let fire_at_wall = std::time::UNIX_EPOCH + Duration::from_millis(row.fire_at_epoch_ms);
let mono_delta = fire_at_wall
.duration_since(now_wall)
.unwrap_or(Duration::ZERO);
let fire_at_mono = now_mono + mono_delta;
let item = DeferredItem {
plugin: entry,
name: row.name,
batch: MutationBatch {
events: Arc::new(batch),
},
session_id: row.session_id,
tx_id: row.tx_id,
attempts: row.attempts,
payload: row.payload,
};
self.inner
.lock()
.entry(fire_at_mono)
.or_default()
.push(item);
restored += 1;
}
restored
}
fn persist_locked(
&self,
guard: &parking_lot::MutexGuard<'_, BTreeMap<StdInstant, Vec<DeferredItem>>>,
) {
let Some(sidecar) = self.sidecar.lock().clone() else {
return;
};
let now_wall = std::time::SystemTime::now();
let now_mono = StdInstant::now();
let mut rows: Vec<PersistedDeferral> = Vec::new();
for (fire_at_mono, items) in guard.iter() {
for item in items {
let fire_at_wall = if *fire_at_mono <= now_mono {
now_wall
} else {
now_wall + fire_at_mono.duration_since(now_mono)
};
let fire_at_epoch_ms = fire_at_wall
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let batch_ipc = match arrow_ipc_encode(&item.batch.events) {
Ok(b) => b,
Err(e) => {
tracing::debug!(error = %e, "DeferralQueue: IPC encode failed; skipping row");
continue;
}
};
rows.push(PersistedDeferral {
name: item.name.clone(),
session_id: item.session_id.clone(),
tx_id: item.tx_id,
attempts: item.attempts,
payload: item.payload.clone(),
batch_ipc,
fire_at_epoch_ms,
});
}
}
if let Err(e) = sidecar.write_all(&rows) {
tracing::debug!(error = %e, "DeferralQueue: sidecar write failed");
}
}
fn push(&self, item: DeferredItem, fire_at: StdInstant) {
let mut guard = self.inner.lock();
guard.entry(fire_at).or_default().push(item);
self.persist_locked(&guard);
}
fn drain_due(&self, now: StdInstant) -> Vec<DeferredItem> {
let mut guard = self.inner.lock();
let mut due = Vec::new();
let mut to_keep = guard.split_off(&(now + Duration::from_nanos(1)));
std::mem::swap(&mut *guard, &mut to_keep);
for (_, mut items) in to_keep {
due.append(&mut items);
}
self.persist_locked(&guard);
due
}
#[must_use]
pub fn pending(&self) -> usize {
self.inner.lock().values().map(|v| v.len()).sum()
}
pub fn tick(self: &Arc<Self>) {
let due = self.drain_due(StdInstant::now());
for mut item in due {
let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
item.plugin.on_deferred(
TriggerContext::new(&item.session_id, item.tx_id),
&item.batch,
&item.payload,
)
}));
let name = item.name.clone();
handle_fire_outcome(outcome, &name, "deferred trigger", |until| {
item.attempts += 1;
if item.attempts >= DEFER_MAX_ATTEMPTS {
warn!(
trigger = %item.name,
attempts = item.attempts,
"deferred trigger exceeded DEFER_MAX_ATTEMPTS; dropping"
);
return;
}
let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
item.payload = until.payload;
self.push(item, fire_at);
});
}
}
}
#[must_use]
pub fn tx_id_to_u64(tx_id: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
tx_id.hash(&mut hasher);
hasher.finish()
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct PersistedDeferral {
name: String,
session_id: String,
tx_id: u64,
attempts: u32,
payload: String,
#[serde(with = "serde_bytes")]
batch_ipc: Vec<u8>,
fire_at_epoch_ms: u64,
}
#[derive(Clone, Debug)]
struct DeferralSidecar {
sidecar: uni_sidecar::VecSidecar<PersistedDeferral>,
}
impl DeferralSidecar {
fn new(data_path: std::path::PathBuf) -> Self {
Self {
sidecar: uni_sidecar::VecSidecar::new(data_path, "deferred_triggers.json"),
}
}
fn path(&self) -> &std::path::Path {
self.sidecar.path()
}
fn read_all(&self) -> Result<Vec<PersistedDeferral>, String> {
self.sidecar.load().map_err(|e| e.to_string())
}
fn write_all(&self, rows: &[PersistedDeferral]) -> Result<(), String> {
self.sidecar.store(rows).map_err(|e| e.to_string())
}
}
fn arrow_ipc_encode(batch: &arrow_array::RecordBatch) -> Result<Vec<u8>, String> {
let schema = batch.schema();
let mut buf: Vec<u8> = Vec::with_capacity(2048);
{
let mut w = arrow_ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())
.map_err(|e| format!("ipc writer: {e}"))?;
w.write(batch).map_err(|e| format!("ipc write: {e}"))?;
w.finish().map_err(|e| format!("ipc finish: {e}"))?;
}
Ok(buf)
}
fn arrow_ipc_decode(bytes: &[u8]) -> Result<arrow_array::RecordBatch, String> {
let reader = arrow_ipc::reader::StreamReader::try_new(bytes, None)
.map_err(|e| format!("ipc reader: {e}"))?;
let batches: Vec<arrow_array::RecordBatch> = reader
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("ipc collect: {e}"))?;
batches
.into_iter()
.next()
.ok_or_else(|| "ipc decode: empty stream".to_owned())
}
#[cfg(test)]
mod tests {
use super::*;
use uni_plugin::traits::trigger::TriggerEventMask;
#[test]
fn mask_discriminants_are_stable() {
assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_CREATE), 1);
assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_UPDATE), 2);
assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_DELETE), 3);
assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_CREATE), 4);
assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_UPDATE), 5);
assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_DELETE), 6);
}
#[test]
fn empty_router_is_empty() {
let by_phase = [Vec::new(), Vec::new(), Vec::new(), Vec::new()];
let router = TriggerRouter {
by_phase,
defer_queue: None,
};
assert!(router.is_empty());
}
#[test]
fn tx_id_to_u64_is_deterministic() {
let a = tx_id_to_u64("tx-1");
let b = tx_id_to_u64("tx-1");
let c = tx_id_to_u64("tx-2");
assert_eq!(a, b);
assert_ne!(a, c);
}
}