1use crate::error::{Error, Result};
52use crate::procedures::{ProcRow, ProcedureRegistry};
53use crate::reader::GraphReader;
54use crate::value::{ParamMap, Value};
55use crate::writer::GraphWriter;
56use meshdb_core::{Edge, Node, Property};
57use meshdb_storage::StorageEngine;
58use serde::{Deserialize, Serialize};
59use std::cell::Cell;
60use std::collections::HashMap;
61use std::sync::{Arc, RwLock};
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TriggerSpec {
69 pub name: String,
70 pub query: String,
71 pub phase: String,
74 pub extra_params: HashMap<String, Property>,
78 pub installed_at_ms: i64,
81 #[serde(default)]
87 pub paused: bool,
88}
89
90#[derive(Clone)]
98pub struct TriggerRegistry {
99 inner: Arc<RwLock<HashMap<String, TriggerSpec>>>,
100 store: Arc<dyn StorageEngine>,
101}
102
103impl std::fmt::Debug for TriggerRegistry {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 let count = self.inner.read().map(|g| g.len()).unwrap_or(0);
106 f.debug_struct("TriggerRegistry")
107 .field("triggers", &count)
108 .finish()
109 }
110}
111
112impl TriggerRegistry {
113 pub fn from_storage(store: Arc<dyn StorageEngine>) -> Result<Self> {
117 let registry = Self {
118 inner: Arc::new(RwLock::new(HashMap::new())),
119 store,
120 };
121 let entries = registry
122 .store
123 .list_triggers()
124 .map_err(|e| Error::Procedure(format!("loading triggers from storage: {e}")))?;
125 let mut guard = registry.inner.write().expect("trigger registry lock");
126 for (name, blob) in entries {
127 match serde_json::from_slice::<TriggerSpec>(&blob) {
128 Ok(spec) => {
129 guard.insert(name, spec);
130 }
131 Err(e) => {
132 tracing::warn!(trigger = %name, error = %e, "skipping corrupt trigger spec");
133 }
134 }
135 }
136 drop(guard);
137 Ok(registry)
138 }
139
140 pub fn install(&self, spec: TriggerSpec) -> Result<Option<TriggerSpec>> {
144 let blob = serde_json::to_vec(&spec)
145 .map_err(|e| Error::Procedure(format!("encoding trigger spec: {e}")))?;
146 self.store
147 .put_trigger(&spec.name, &blob)
148 .map_err(|e| Error::Procedure(format!("persisting trigger: {e}")))?;
149 let mut guard = self.inner.write().expect("trigger registry lock");
150 Ok(guard.insert(spec.name.clone(), spec))
151 }
152
153 pub fn drop(&self, name: &str) -> Result<Option<TriggerSpec>> {
156 self.store
157 .delete_trigger(name)
158 .map_err(|e| Error::Procedure(format!("removing trigger: {e}")))?;
159 let mut guard = self.inner.write().expect("trigger registry lock");
160 Ok(guard.remove(name))
161 }
162
163 pub fn list(&self) -> Vec<TriggerSpec> {
166 let guard = self.inner.read().expect("trigger registry lock");
167 let mut specs: Vec<TriggerSpec> = guard.values().cloned().collect();
168 specs.sort_by(|a, b| a.name.cmp(&b.name));
169 specs
170 }
171
172 pub fn is_empty(&self) -> bool {
176 self.inner.read().expect("trigger registry lock").is_empty()
177 }
178
179 pub fn refresh(&self) -> Result<()> {
187 let entries = self
188 .store
189 .list_triggers()
190 .map_err(|e| Error::Procedure(format!("refreshing trigger registry: {e}")))?;
191 let mut next: HashMap<String, TriggerSpec> = HashMap::new();
192 for (name, blob) in entries {
193 match serde_json::from_slice::<TriggerSpec>(&blob) {
194 Ok(spec) => {
195 next.insert(name, spec);
196 }
197 Err(e) => {
198 tracing::warn!(
199 trigger = %name,
200 error = %e,
201 "skipping corrupt trigger spec on refresh"
202 );
203 }
204 }
205 }
206 let mut guard = self.inner.write().expect("trigger registry lock");
207 *guard = next;
208 Ok(())
209 }
210}
211
212thread_local! {
213 static SUPPRESSING: Cell<bool> = const { Cell::new(false) };
221}
222
223pub fn with_suppression<R, F: FnOnce() -> R>(body: F) -> R {
228 struct Guard(bool);
229 impl Drop for Guard {
230 fn drop(&mut self) {
231 SUPPRESSING.with(|s| s.set(self.0));
232 }
233 }
234 let prev = SUPPRESSING.with(|s| s.replace(true));
235 let _guard = Guard(prev);
236 body()
237}
238
239pub fn is_suppressed() -> bool {
242 SUPPRESSING.with(|s| s.get())
243}
244
245#[derive(Debug, Clone)]
250pub struct PropertyChange {
251 pub key: String,
252 pub old: Option<Property>,
253 pub new: Option<Property>,
254 pub element: Value,
255}
256
257#[derive(Debug, Clone)]
260pub struct LabelChange {
261 pub label: String,
262 pub element: Value,
263}
264
265#[derive(Debug, Default)]
271pub struct TriggerDiff {
272 pub created_nodes: Vec<Node>,
273 pub created_relationships: Vec<Edge>,
274 pub deleted_nodes: Vec<Node>,
275 pub deleted_relationships: Vec<Edge>,
276 pub assigned_node_properties: Vec<PropertyChange>,
277 pub removed_node_properties: Vec<PropertyChange>,
278 pub assigned_relationship_properties: Vec<PropertyChange>,
279 pub removed_relationship_properties: Vec<PropertyChange>,
280 pub assigned_labels: Vec<LabelChange>,
281 pub removed_labels: Vec<LabelChange>,
282}
283
284impl TriggerDiff {
285 pub fn clone_diff(&self) -> Self {
290 Self {
291 created_nodes: self.created_nodes.clone(),
292 created_relationships: self.created_relationships.clone(),
293 deleted_nodes: self.deleted_nodes.clone(),
294 deleted_relationships: self.deleted_relationships.clone(),
295 assigned_node_properties: self.assigned_node_properties.clone(),
296 removed_node_properties: self.removed_node_properties.clone(),
297 assigned_relationship_properties: self.assigned_relationship_properties.clone(),
298 removed_relationship_properties: self.removed_relationship_properties.clone(),
299 assigned_labels: self.assigned_labels.clone(),
300 removed_labels: self.removed_labels.clone(),
301 }
302 }
303
304 pub fn into_param_map(self, extra: &HashMap<String, Property>) -> ParamMap {
316 let mut params: ParamMap = HashMap::new();
317 params.insert(
318 "createdNodes".into(),
319 Value::List(self.created_nodes.into_iter().map(Value::Node).collect()),
320 );
321 params.insert(
322 "createdRelationships".into(),
323 Value::List(
324 self.created_relationships
325 .into_iter()
326 .map(Value::Edge)
327 .collect(),
328 ),
329 );
330 params.insert(
331 "deletedNodes".into(),
332 Value::List(self.deleted_nodes.into_iter().map(Value::Node).collect()),
333 );
334 params.insert(
335 "deletedRelationships".into(),
336 Value::List(
337 self.deleted_relationships
338 .into_iter()
339 .map(Value::Edge)
340 .collect(),
341 ),
342 );
343 params.insert(
344 "assignedNodeProperties".into(),
345 property_changes_to_value(self.assigned_node_properties, "node"),
346 );
347 params.insert(
348 "removedNodeProperties".into(),
349 property_changes_to_value(self.removed_node_properties, "node"),
350 );
351 params.insert(
352 "assignedRelationshipProperties".into(),
353 property_changes_to_value(self.assigned_relationship_properties, "relationship"),
354 );
355 params.insert(
356 "removedRelationshipProperties".into(),
357 property_changes_to_value(self.removed_relationship_properties, "relationship"),
358 );
359 params.insert(
360 "assignedLabels".into(),
361 label_changes_to_value(self.assigned_labels),
362 );
363 params.insert(
364 "removedLabels".into(),
365 label_changes_to_value(self.removed_labels),
366 );
367 for (k, p) in extra {
368 params.insert(k.clone(), Value::Property(p.clone()));
369 }
370 params
371 }
372}
373
374fn property_changes_to_value(changes: Vec<PropertyChange>, element_key: &str) -> Value {
379 Value::List(
380 changes
381 .into_iter()
382 .map(|c| {
383 let mut entry: HashMap<String, Value> = HashMap::new();
384 entry.insert("key".into(), Value::Property(Property::String(c.key)));
385 entry.insert(
386 "old".into(),
387 c.old.map(Value::Property).unwrap_or(Value::Null),
388 );
389 entry.insert(
390 "new".into(),
391 c.new.map(Value::Property).unwrap_or(Value::Null),
392 );
393 entry.insert(element_key.to_string(), c.element);
394 Value::Map(entry)
395 })
396 .collect(),
397 )
398}
399
400fn label_changes_to_value(changes: Vec<LabelChange>) -> Value {
405 Value::List(
406 changes
407 .into_iter()
408 .map(|c| {
409 let mut entry: HashMap<String, Value> = HashMap::new();
410 entry.insert("label".into(), Value::Property(Property::String(c.label)));
411 entry.insert("node".into(), c.element);
412 Value::Map(entry)
413 })
414 .collect(),
415 )
416}
417
418pub fn fire_before_triggers(
429 registry: &TriggerRegistry,
430 diff: TriggerDiff,
431 reader: &dyn GraphReader,
432 writer: &dyn GraphWriter,
433 procedures: &ProcedureRegistry,
434) -> Result<()> {
435 if registry.is_empty() {
436 return Ok(());
437 }
438 if is_suppressed() {
439 return Ok(());
440 }
441 let triggers: Vec<TriggerSpec> = registry
442 .list()
443 .into_iter()
444 .filter(|t| !t.paused && t.phase == "before")
445 .collect();
446 if triggers.is_empty() {
447 return Ok(());
448 }
449 let extras: Vec<HashMap<String, Property>> =
450 triggers.iter().map(|t| t.extra_params.clone()).collect();
451 let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
452 let mut first_error: Option<Error> = None;
453 with_suppression(|| {
454 for ((trigger, extra), diff) in triggers
455 .iter()
456 .zip(extras.iter())
457 .zip(diff_clones.into_iter())
458 {
459 let params = diff.into_param_map(extra);
460 let stmt = match meshdb_cypher::parse(&trigger.query) {
461 Ok(s) => s,
462 Err(e) => {
463 first_error = Some(Error::Procedure(format!(
464 "before-trigger '{}' parse error: {e}",
465 trigger.name
466 )));
467 return;
468 }
469 };
470 let plan = match meshdb_cypher::plan(&stmt) {
471 Ok(p) => p,
472 Err(e) => {
473 first_error = Some(Error::Procedure(format!(
474 "before-trigger '{}' plan error: {e}",
475 trigger.name
476 )));
477 return;
478 }
479 };
480 if let Err(e) = crate::ops::execute_with_reader_and_procs(
481 &plan, reader, writer, ¶ms, procedures,
482 ) {
483 first_error = Some(Error::Procedure(format!(
484 "before-trigger '{}' aborted commit: {e}",
485 trigger.name
486 )));
487 return;
488 }
489 }
490 });
491 match first_error {
492 Some(e) => Err(e),
493 None => Ok(()),
494 }
495}
496
497pub fn fire_phase_triggers(
505 registry: &TriggerRegistry,
506 phase: &str,
507 diff: TriggerDiff,
508 reader: &dyn GraphReader,
509 writer: &dyn GraphWriter,
510 procedures: &ProcedureRegistry,
511) {
512 if registry.is_empty() {
513 return;
514 }
515 if is_suppressed() {
516 return;
517 }
518 let triggers: Vec<TriggerSpec> = registry
519 .list()
520 .into_iter()
521 .filter(|t| !t.paused && t.phase == phase)
522 .collect();
523 if triggers.is_empty() {
524 return;
525 }
526 let extras: Vec<HashMap<String, Property>> =
527 triggers.iter().map(|t| t.extra_params.clone()).collect();
528 let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
529 with_suppression(|| {
530 for ((trigger, extra), diff) in triggers
531 .iter()
532 .zip(extras.iter())
533 .zip(diff_clones.into_iter())
534 {
535 let params = diff.into_param_map(extra);
536 let stmt = match meshdb_cypher::parse(&trigger.query) {
537 Ok(s) => s,
538 Err(e) => {
539 tracing::warn!(
540 trigger = %trigger.name,
541 phase = phase,
542 error = %e,
543 "trigger Cypher failed to parse — skipping"
544 );
545 continue;
546 }
547 };
548 let plan = match meshdb_cypher::plan(&stmt) {
549 Ok(p) => p,
550 Err(e) => {
551 tracing::warn!(
552 trigger = %trigger.name,
553 phase = phase,
554 error = %e,
555 "trigger Cypher failed to plan — skipping"
556 );
557 continue;
558 }
559 };
560 if let Err(e) = crate::ops::execute_with_reader_and_procs(
561 &plan, reader, writer, ¶ms, procedures,
562 ) {
563 tracing::warn!(
564 trigger = %trigger.name,
565 phase = phase,
566 error = %e,
567 "trigger body returned an error — skipping"
568 );
569 }
570 }
571 });
572}
573
574pub fn fire_after_triggers(
578 registry: &TriggerRegistry,
579 diff: TriggerDiff,
580 reader: &dyn GraphReader,
581 writer: &dyn GraphWriter,
582 procedures: &ProcedureRegistry,
583) {
584 fire_phase_triggers(registry, "after", diff, reader, writer, procedures);
585}
586
587pub fn now_ms() -> i64 {
590 std::time::SystemTime::now()
591 .duration_since(std::time::UNIX_EPOCH)
592 .map(|d| d.as_millis() as i64)
593 .unwrap_or(0)
594}
595
596pub fn install_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
610 if args.len() < 3 {
611 return Err(Error::Procedure(
612 "apoc.trigger.install: expects (databaseName, name, statement[, selector[, config]])"
613 .into(),
614 ));
615 }
616 let _db = expect_string(&args[0], "first argument (databaseName)")?;
617 let name = expect_string(&args[1], "second argument (name)")?;
618 let query = expect_string(&args[2], "third argument (statement)")?;
619 let phase = if args.len() > 3 {
620 match &args[3] {
621 Value::Property(Property::Map(m)) => match m.get("phase") {
622 Some(Property::String(s)) => s.clone(),
623 Some(Property::Null) | None => "after".to_string(),
624 Some(other) => {
625 return Err(Error::Procedure(format!(
626 "apoc.trigger.install: selector.phase must be a string, got {other:?}"
627 )));
628 }
629 },
630 Value::Null | Value::Property(Property::Null) => "after".to_string(),
631 other => {
632 return Err(Error::Procedure(format!(
633 "apoc.trigger.install: selector must be a map or null, got {other:?}"
634 )));
635 }
636 }
637 } else {
638 "after".to_string()
639 };
640 if !matches!(
641 phase.as_str(),
642 "before" | "after" | "afterAsync" | "rollback"
643 ) {
644 return Err(Error::Procedure(format!(
645 "apoc.trigger.install: phase must be one of \
646 'before' / 'after' / 'afterAsync' / 'rollback', got {phase:?}"
647 )));
648 }
649 let extra_params: HashMap<String, Property> = if args.len() > 4 {
650 match &args[4] {
651 Value::Property(Property::Map(m)) => match m.get("params") {
652 Some(Property::Map(p)) => p.clone(),
653 Some(Property::Null) | None => HashMap::new(),
654 Some(other) => {
655 return Err(Error::Procedure(format!(
656 "apoc.trigger.install: config.params must be a map, got {other:?}"
657 )));
658 }
659 },
660 Value::Null | Value::Property(Property::Null) => HashMap::new(),
661 other => {
662 return Err(Error::Procedure(format!(
663 "apoc.trigger.install: config must be a map or null, got {other:?}"
664 )));
665 }
666 }
667 } else {
668 HashMap::new()
669 };
670 let stmt = meshdb_cypher::parse(&query)
674 .map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher parse error: {e}")))?;
675 meshdb_cypher::plan(&stmt)
676 .map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher plan error: {e}")))?;
677 let spec = TriggerSpec {
678 name: name.clone(),
679 query: query.clone(),
680 phase,
681 extra_params,
682 installed_at_ms: now_ms(),
683 paused: false,
684 };
685 let blob = serde_json::to_vec(&spec)
686 .map_err(|e| Error::Procedure(format!("apoc.trigger.install: encoding spec: {e}")))?;
687 writer.install_trigger(&name, &blob)?;
688 let mut row: ProcRow = HashMap::new();
689 row.insert("name".into(), Value::Property(Property::String(name)));
690 row.insert("query".into(), Value::Property(Property::String(query)));
691 row.insert("installed".into(), Value::Property(Property::Bool(true)));
692 row.insert("previous".into(), Value::Null);
698 Ok(vec![row])
699}
700
701pub fn start_call(
707 registry: &TriggerRegistry,
708 writer: &dyn GraphWriter,
709 args: &[Value],
710) -> Result<Vec<ProcRow>> {
711 set_paused(registry, writer, args, false, "apoc.trigger.start")
712}
713
714pub fn stop_call(
717 registry: &TriggerRegistry,
718 writer: &dyn GraphWriter,
719 args: &[Value],
720) -> Result<Vec<ProcRow>> {
721 set_paused(registry, writer, args, true, "apoc.trigger.stop")
722}
723
724fn set_paused(
726 registry: &TriggerRegistry,
727 writer: &dyn GraphWriter,
728 args: &[Value],
729 paused: bool,
730 proc_name: &'static str,
731) -> Result<Vec<ProcRow>> {
732 if args.len() < 2 {
733 return Err(Error::Procedure(format!(
734 "{proc_name}: expects (databaseName, name)"
735 )));
736 }
737 let _db = expect_string(&args[0], "first argument (databaseName)")?;
738 let name = expect_string(&args[1], "second argument (name)")?;
739 let mut spec = registry
740 .list()
741 .into_iter()
742 .find(|s| s.name == name)
743 .ok_or_else(|| Error::Procedure(format!("{proc_name}: no trigger named '{name}'")))?;
744 spec.paused = paused;
745 let blob = serde_json::to_vec(&spec)
746 .map_err(|e| Error::Procedure(format!("{proc_name}: encoding spec: {e}")))?;
747 writer.install_trigger(&name, &blob)?;
748 let mut row: ProcRow = HashMap::new();
749 row.insert("name".into(), Value::Property(Property::String(name)));
750 row.insert("paused".into(), Value::Property(Property::Bool(paused)));
751 Ok(vec![row])
752}
753
754pub fn drop_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
761 if args.len() < 2 {
762 return Err(Error::Procedure(
763 "apoc.trigger.drop: expects (databaseName, name)".into(),
764 ));
765 }
766 let _db = expect_string(&args[0], "first argument (databaseName)")?;
767 let name = expect_string(&args[1], "second argument (name)")?;
768 writer.drop_trigger(&name)?;
769 let mut row: ProcRow = HashMap::new();
770 row.insert("name".into(), Value::Property(Property::String(name)));
771 row.insert("removed".into(), Value::Property(Property::Bool(true)));
772 Ok(vec![row])
773}
774
775pub fn list_call(registry: &TriggerRegistry) -> Result<Vec<ProcRow>> {
777 Ok(registry
778 .list()
779 .into_iter()
780 .map(|spec| {
781 let mut row: ProcRow = HashMap::new();
782 row.insert("name".into(), Value::Property(Property::String(spec.name)));
783 row.insert(
784 "query".into(),
785 Value::Property(Property::String(spec.query)),
786 );
787 row.insert(
788 "phase".into(),
789 Value::Property(Property::String(spec.phase)),
790 );
791 row.insert(
792 "installed_at".into(),
793 Value::Property(Property::Int64(spec.installed_at_ms)),
794 );
795 row.insert(
796 "paused".into(),
797 Value::Property(Property::Bool(spec.paused)),
798 );
799 row
800 })
801 .collect())
802}
803
804fn expect_string(v: &Value, position: &str) -> Result<String> {
805 match v {
806 Value::Property(Property::String(s)) => Ok(s.clone()),
807 Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
808 "apoc.trigger.*: {position} must be a string, got null"
809 ))),
810 other => Err(Error::Procedure(format!(
811 "apoc.trigger.*: {position} must be a string, got {other:?}"
812 ))),
813 }
814}