use crate::error::{Error, Result};
use crate::procedures::{ProcRow, ProcedureRegistry};
use crate::reader::GraphReader;
use crate::value::{ParamMap, Value};
use crate::writer::GraphWriter;
use meshdb_core::{Edge, Node, Property};
use meshdb_storage::StorageEngine;
use serde::{Deserialize, Serialize};
use std::cell::Cell;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerSpec {
pub name: String,
pub query: String,
pub phase: String,
pub extra_params: HashMap<String, Property>,
pub installed_at_ms: i64,
#[serde(default)]
pub paused: bool,
}
#[derive(Clone)]
pub struct TriggerRegistry {
inner: Arc<RwLock<HashMap<String, TriggerSpec>>>,
store: Arc<dyn StorageEngine>,
}
impl std::fmt::Debug for TriggerRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let count = self.inner.read().map(|g| g.len()).unwrap_or(0);
f.debug_struct("TriggerRegistry")
.field("triggers", &count)
.finish()
}
}
impl TriggerRegistry {
pub fn from_storage(store: Arc<dyn StorageEngine>) -> Result<Self> {
let registry = Self {
inner: Arc::new(RwLock::new(HashMap::new())),
store,
};
let entries = registry
.store
.list_triggers()
.map_err(|e| Error::Procedure(format!("loading triggers from storage: {e}")))?;
let mut guard = registry.inner.write().expect("trigger registry lock");
for (name, blob) in entries {
match serde_json::from_slice::<TriggerSpec>(&blob) {
Ok(spec) => {
guard.insert(name, spec);
}
Err(e) => {
tracing::warn!(trigger = %name, error = %e, "skipping corrupt trigger spec");
}
}
}
drop(guard);
Ok(registry)
}
pub fn install(&self, spec: TriggerSpec) -> Result<Option<TriggerSpec>> {
let blob = serde_json::to_vec(&spec)
.map_err(|e| Error::Procedure(format!("encoding trigger spec: {e}")))?;
self.store
.put_trigger(&spec.name, &blob)
.map_err(|e| Error::Procedure(format!("persisting trigger: {e}")))?;
let mut guard = self.inner.write().expect("trigger registry lock");
Ok(guard.insert(spec.name.clone(), spec))
}
pub fn drop(&self, name: &str) -> Result<Option<TriggerSpec>> {
self.store
.delete_trigger(name)
.map_err(|e| Error::Procedure(format!("removing trigger: {e}")))?;
let mut guard = self.inner.write().expect("trigger registry lock");
Ok(guard.remove(name))
}
pub fn list(&self) -> Vec<TriggerSpec> {
let guard = self.inner.read().expect("trigger registry lock");
let mut specs: Vec<TriggerSpec> = guard.values().cloned().collect();
specs.sort_by(|a, b| a.name.cmp(&b.name));
specs
}
pub fn is_empty(&self) -> bool {
self.inner.read().expect("trigger registry lock").is_empty()
}
pub fn refresh(&self) -> Result<()> {
let entries = self
.store
.list_triggers()
.map_err(|e| Error::Procedure(format!("refreshing trigger registry: {e}")))?;
let mut next: HashMap<String, TriggerSpec> = HashMap::new();
for (name, blob) in entries {
match serde_json::from_slice::<TriggerSpec>(&blob) {
Ok(spec) => {
next.insert(name, spec);
}
Err(e) => {
tracing::warn!(
trigger = %name,
error = %e,
"skipping corrupt trigger spec on refresh"
);
}
}
}
let mut guard = self.inner.write().expect("trigger registry lock");
*guard = next;
Ok(())
}
}
thread_local! {
static SUPPRESSING: Cell<bool> = const { Cell::new(false) };
}
pub fn with_suppression<R, F: FnOnce() -> R>(body: F) -> R {
struct Guard(bool);
impl Drop for Guard {
fn drop(&mut self) {
SUPPRESSING.with(|s| s.set(self.0));
}
}
let prev = SUPPRESSING.with(|s| s.replace(true));
let _guard = Guard(prev);
body()
}
pub fn is_suppressed() -> bool {
SUPPRESSING.with(|s| s.get())
}
#[derive(Debug, Clone)]
pub struct PropertyChange {
pub key: String,
pub old: Option<Property>,
pub new: Option<Property>,
pub element: Value,
}
#[derive(Debug, Clone)]
pub struct LabelChange {
pub label: String,
pub element: Value,
}
#[derive(Debug, Default)]
pub struct TriggerDiff {
pub created_nodes: Vec<Node>,
pub created_relationships: Vec<Edge>,
pub deleted_nodes: Vec<Node>,
pub deleted_relationships: Vec<Edge>,
pub assigned_node_properties: Vec<PropertyChange>,
pub removed_node_properties: Vec<PropertyChange>,
pub assigned_relationship_properties: Vec<PropertyChange>,
pub removed_relationship_properties: Vec<PropertyChange>,
pub assigned_labels: Vec<LabelChange>,
pub removed_labels: Vec<LabelChange>,
}
impl TriggerDiff {
pub fn clone_diff(&self) -> Self {
Self {
created_nodes: self.created_nodes.clone(),
created_relationships: self.created_relationships.clone(),
deleted_nodes: self.deleted_nodes.clone(),
deleted_relationships: self.deleted_relationships.clone(),
assigned_node_properties: self.assigned_node_properties.clone(),
removed_node_properties: self.removed_node_properties.clone(),
assigned_relationship_properties: self.assigned_relationship_properties.clone(),
removed_relationship_properties: self.removed_relationship_properties.clone(),
assigned_labels: self.assigned_labels.clone(),
removed_labels: self.removed_labels.clone(),
}
}
pub fn into_param_map(self, extra: &HashMap<String, Property>) -> ParamMap {
let mut params: ParamMap = HashMap::new();
params.insert(
"createdNodes".into(),
Value::List(self.created_nodes.into_iter().map(Value::Node).collect()),
);
params.insert(
"createdRelationships".into(),
Value::List(
self.created_relationships
.into_iter()
.map(Value::Edge)
.collect(),
),
);
params.insert(
"deletedNodes".into(),
Value::List(self.deleted_nodes.into_iter().map(Value::Node).collect()),
);
params.insert(
"deletedRelationships".into(),
Value::List(
self.deleted_relationships
.into_iter()
.map(Value::Edge)
.collect(),
),
);
params.insert(
"assignedNodeProperties".into(),
property_changes_to_value(self.assigned_node_properties, "node"),
);
params.insert(
"removedNodeProperties".into(),
property_changes_to_value(self.removed_node_properties, "node"),
);
params.insert(
"assignedRelationshipProperties".into(),
property_changes_to_value(self.assigned_relationship_properties, "relationship"),
);
params.insert(
"removedRelationshipProperties".into(),
property_changes_to_value(self.removed_relationship_properties, "relationship"),
);
params.insert(
"assignedLabels".into(),
label_changes_to_value(self.assigned_labels),
);
params.insert(
"removedLabels".into(),
label_changes_to_value(self.removed_labels),
);
for (k, p) in extra {
params.insert(k.clone(), Value::Property(p.clone()));
}
params
}
}
fn property_changes_to_value(changes: Vec<PropertyChange>, element_key: &str) -> Value {
Value::List(
changes
.into_iter()
.map(|c| {
let mut entry: HashMap<String, Value> = HashMap::new();
entry.insert("key".into(), Value::Property(Property::String(c.key)));
entry.insert(
"old".into(),
c.old.map(Value::Property).unwrap_or(Value::Null),
);
entry.insert(
"new".into(),
c.new.map(Value::Property).unwrap_or(Value::Null),
);
entry.insert(element_key.to_string(), c.element);
Value::Map(entry)
})
.collect(),
)
}
fn label_changes_to_value(changes: Vec<LabelChange>) -> Value {
Value::List(
changes
.into_iter()
.map(|c| {
let mut entry: HashMap<String, Value> = HashMap::new();
entry.insert("label".into(), Value::Property(Property::String(c.label)));
entry.insert("node".into(), c.element);
Value::Map(entry)
})
.collect(),
)
}
pub fn fire_before_triggers(
registry: &TriggerRegistry,
diff: TriggerDiff,
reader: &dyn GraphReader,
writer: &dyn GraphWriter,
procedures: &ProcedureRegistry,
) -> Result<()> {
if registry.is_empty() {
return Ok(());
}
if is_suppressed() {
return Ok(());
}
let triggers: Vec<TriggerSpec> = registry
.list()
.into_iter()
.filter(|t| !t.paused && t.phase == "before")
.collect();
if triggers.is_empty() {
return Ok(());
}
let extras: Vec<HashMap<String, Property>> =
triggers.iter().map(|t| t.extra_params.clone()).collect();
let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
let mut first_error: Option<Error> = None;
with_suppression(|| {
for ((trigger, extra), diff) in triggers
.iter()
.zip(extras.iter())
.zip(diff_clones.into_iter())
{
let params = diff.into_param_map(extra);
let stmt = match meshdb_cypher::parse(&trigger.query) {
Ok(s) => s,
Err(e) => {
first_error = Some(Error::Procedure(format!(
"before-trigger '{}' parse error: {e}",
trigger.name
)));
return;
}
};
let plan = match meshdb_cypher::plan(&stmt) {
Ok(p) => p,
Err(e) => {
first_error = Some(Error::Procedure(format!(
"before-trigger '{}' plan error: {e}",
trigger.name
)));
return;
}
};
if let Err(e) = crate::ops::execute_with_reader_and_procs(
&plan, reader, writer, ¶ms, procedures,
) {
first_error = Some(Error::Procedure(format!(
"before-trigger '{}' aborted commit: {e}",
trigger.name
)));
return;
}
}
});
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
}
pub fn fire_phase_triggers(
registry: &TriggerRegistry,
phase: &str,
diff: TriggerDiff,
reader: &dyn GraphReader,
writer: &dyn GraphWriter,
procedures: &ProcedureRegistry,
) {
if registry.is_empty() {
return;
}
if is_suppressed() {
return;
}
let triggers: Vec<TriggerSpec> = registry
.list()
.into_iter()
.filter(|t| !t.paused && t.phase == phase)
.collect();
if triggers.is_empty() {
return;
}
let extras: Vec<HashMap<String, Property>> =
triggers.iter().map(|t| t.extra_params.clone()).collect();
let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
with_suppression(|| {
for ((trigger, extra), diff) in triggers
.iter()
.zip(extras.iter())
.zip(diff_clones.into_iter())
{
let params = diff.into_param_map(extra);
let stmt = match meshdb_cypher::parse(&trigger.query) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
trigger = %trigger.name,
phase = phase,
error = %e,
"trigger Cypher failed to parse — skipping"
);
continue;
}
};
let plan = match meshdb_cypher::plan(&stmt) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
trigger = %trigger.name,
phase = phase,
error = %e,
"trigger Cypher failed to plan — skipping"
);
continue;
}
};
if let Err(e) = crate::ops::execute_with_reader_and_procs(
&plan, reader, writer, ¶ms, procedures,
) {
tracing::warn!(
trigger = %trigger.name,
phase = phase,
error = %e,
"trigger body returned an error — skipping"
);
}
}
});
}
pub fn fire_after_triggers(
registry: &TriggerRegistry,
diff: TriggerDiff,
reader: &dyn GraphReader,
writer: &dyn GraphWriter,
procedures: &ProcedureRegistry,
) {
fire_phase_triggers(registry, "after", diff, reader, writer, procedures);
}
pub fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
pub fn install_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
if args.len() < 3 {
return Err(Error::Procedure(
"apoc.trigger.install: expects (databaseName, name, statement[, selector[, config]])"
.into(),
));
}
let _db = expect_string(&args[0], "first argument (databaseName)")?;
let name = expect_string(&args[1], "second argument (name)")?;
let query = expect_string(&args[2], "third argument (statement)")?;
let phase = if args.len() > 3 {
match &args[3] {
Value::Property(Property::Map(m)) => match m.get("phase") {
Some(Property::String(s)) => s.clone(),
Some(Property::Null) | None => "after".to_string(),
Some(other) => {
return Err(Error::Procedure(format!(
"apoc.trigger.install: selector.phase must be a string, got {other:?}"
)));
}
},
Value::Null | Value::Property(Property::Null) => "after".to_string(),
other => {
return Err(Error::Procedure(format!(
"apoc.trigger.install: selector must be a map or null, got {other:?}"
)));
}
}
} else {
"after".to_string()
};
if !matches!(
phase.as_str(),
"before" | "after" | "afterAsync" | "rollback"
) {
return Err(Error::Procedure(format!(
"apoc.trigger.install: phase must be one of \
'before' / 'after' / 'afterAsync' / 'rollback', got {phase:?}"
)));
}
let extra_params: HashMap<String, Property> = if args.len() > 4 {
match &args[4] {
Value::Property(Property::Map(m)) => match m.get("params") {
Some(Property::Map(p)) => p.clone(),
Some(Property::Null) | None => HashMap::new(),
Some(other) => {
return Err(Error::Procedure(format!(
"apoc.trigger.install: config.params must be a map, got {other:?}"
)));
}
},
Value::Null | Value::Property(Property::Null) => HashMap::new(),
other => {
return Err(Error::Procedure(format!(
"apoc.trigger.install: config must be a map or null, got {other:?}"
)));
}
}
} else {
HashMap::new()
};
let stmt = meshdb_cypher::parse(&query)
.map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher parse error: {e}")))?;
meshdb_cypher::plan(&stmt)
.map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher plan error: {e}")))?;
let spec = TriggerSpec {
name: name.clone(),
query: query.clone(),
phase,
extra_params,
installed_at_ms: now_ms(),
paused: false,
};
let blob = serde_json::to_vec(&spec)
.map_err(|e| Error::Procedure(format!("apoc.trigger.install: encoding spec: {e}")))?;
writer.install_trigger(&name, &blob)?;
let mut row: ProcRow = HashMap::new();
row.insert("name".into(), Value::Property(Property::String(name)));
row.insert("query".into(), Value::Property(Property::String(query)));
row.insert("installed".into(), Value::Property(Property::Bool(true)));
row.insert("previous".into(), Value::Null);
Ok(vec![row])
}
pub fn start_call(
registry: &TriggerRegistry,
writer: &dyn GraphWriter,
args: &[Value],
) -> Result<Vec<ProcRow>> {
set_paused(registry, writer, args, false, "apoc.trigger.start")
}
pub fn stop_call(
registry: &TriggerRegistry,
writer: &dyn GraphWriter,
args: &[Value],
) -> Result<Vec<ProcRow>> {
set_paused(registry, writer, args, true, "apoc.trigger.stop")
}
fn set_paused(
registry: &TriggerRegistry,
writer: &dyn GraphWriter,
args: &[Value],
paused: bool,
proc_name: &'static str,
) -> Result<Vec<ProcRow>> {
if args.len() < 2 {
return Err(Error::Procedure(format!(
"{proc_name}: expects (databaseName, name)"
)));
}
let _db = expect_string(&args[0], "first argument (databaseName)")?;
let name = expect_string(&args[1], "second argument (name)")?;
let mut spec = registry
.list()
.into_iter()
.find(|s| s.name == name)
.ok_or_else(|| Error::Procedure(format!("{proc_name}: no trigger named '{name}'")))?;
spec.paused = paused;
let blob = serde_json::to_vec(&spec)
.map_err(|e| Error::Procedure(format!("{proc_name}: encoding spec: {e}")))?;
writer.install_trigger(&name, &blob)?;
let mut row: ProcRow = HashMap::new();
row.insert("name".into(), Value::Property(Property::String(name)));
row.insert("paused".into(), Value::Property(Property::Bool(paused)));
Ok(vec![row])
}
pub fn drop_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
if args.len() < 2 {
return Err(Error::Procedure(
"apoc.trigger.drop: expects (databaseName, name)".into(),
));
}
let _db = expect_string(&args[0], "first argument (databaseName)")?;
let name = expect_string(&args[1], "second argument (name)")?;
writer.drop_trigger(&name)?;
let mut row: ProcRow = HashMap::new();
row.insert("name".into(), Value::Property(Property::String(name)));
row.insert("removed".into(), Value::Property(Property::Bool(true)));
Ok(vec![row])
}
pub fn list_call(registry: &TriggerRegistry) -> Result<Vec<ProcRow>> {
Ok(registry
.list()
.into_iter()
.map(|spec| {
let mut row: ProcRow = HashMap::new();
row.insert("name".into(), Value::Property(Property::String(spec.name)));
row.insert(
"query".into(),
Value::Property(Property::String(spec.query)),
);
row.insert(
"phase".into(),
Value::Property(Property::String(spec.phase)),
);
row.insert(
"installed_at".into(),
Value::Property(Property::Int64(spec.installed_at_ms)),
);
row.insert(
"paused".into(),
Value::Property(Property::Bool(spec.paused)),
);
row
})
.collect())
}
fn expect_string(v: &Value, position: &str) -> Result<String> {
match v {
Value::Property(Property::String(s)) => Ok(s.clone()),
Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
"apoc.trigger.*: {position} must be a string, got null"
))),
other => Err(Error::Procedure(format!(
"apoc.trigger.*: {position} must be a string, got {other:?}"
))),
}
}