Skip to main content

meshdb_executor/
apoc_trigger.rs

1//! Implementation of the `apoc.trigger.*` procedure family.
2//!
3//! Triggers are Cypher snippets that fire on commit boundaries.
4//! Each trigger receives the transaction's diff as a set of
5//! well-known parameters that mirror Neo4j APOC's surface:
6//!
7//! * `$createdNodes` / `$createdRelationships` — newly-created
8//!   elements (id wasn't in the store pre-tx)
9//! * `$deletedNodes` / `$deletedRelationships` — deleted
10//!   elements, captured pre-tx so the trigger sees their final
11//!   shape before deletion
12//! * `$assignedNodeProperties` /
13//!   `$assignedRelationshipProperties` — property changes on
14//!   existing elements, list of `{key, old, new, node|relationship}`
15//! * `$removedNodeProperties` /
16//!   `$removedRelationshipProperties` — property removals,
17//!   list of `{key, old, node|relationship}`
18//! * `$assignedLabels` / `$removedLabels` — label changes on
19//!   existing nodes, list of `{label, node}`
20//!
21//! All four phases work — `before` (pre-commit; errors abort
22//! the tx; trigger writes commit atomically with the user's
23//! writes), `after` (post-commit, sync; recurses through the
24//! cluster commit path so trigger writes replicate),
25//! `afterAsync` (post-commit, spawned in a background tokio
26//! task so it doesn't block the response), and `rollback`
27//! (fires when the commit fails, including before-trigger
28//! aborts).
29//!
30//! # Procedure surface
31//!
32//! Five procedures: `install` / `drop` / `list` / `start` /
33//! `stop`. Install/drop emit `GraphCommand::InstallTrigger` /
34//! `DropTrigger` so the cluster commit machinery replicates
35//! the registry across every peer (Raft log + routing-mode
36//! DDL fan-out). Firing is leader/coordinator-only — exactly
37//! one peer per logical commit fires.
38//!
39//! Recursion safety has two layers: the thread-local
40//! `SUPPRESSING` flag catches the sync-call path, and the
41//! `from_trigger: bool` parameter on `commit_buffered_commands`
42//! catches the async-recursive cluster commit path. Either
43//! alone is sufficient; both together close every observed
44//! self-firing scenario.
45//!
46//! # Multi-db
47//!
48//! `databaseName` is accepted on every procedure but ignored —
49//! Mesh is single-db today. Callers can pass any string.
50
51use crate::error::{Error, Result};
52use crate::procedures::{ProcRow, ProcedureRegistry};
53use crate::reader::GraphReader;
54use crate::value::{ParamMap, Value};
55use crate::writer::GraphWriter;
56use meshdb_core::{Edge, Node, Property};
57use meshdb_storage::StorageEngine;
58use serde::{Deserialize, Serialize};
59use std::cell::Cell;
60use std::collections::HashMap;
61use std::sync::{Arc, RwLock};
62
63/// Persisted trigger spec. Roundtrips through `serde_json` for
64/// storage in the `trigger_meta` column family. The `name` field
65/// duplicates the storage key — kept on the value too so list /
66/// show output is self-describing.
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct TriggerSpec {
69    pub name: String,
70    pub query: String,
71    /// `phase`. One of `"before"` / `"after"` / `"afterAsync"` /
72    /// `"rollback"`.
73    pub phase: String,
74    /// Pre-evaluated extra params from the install-time `config`
75    /// map's `params` entry. Captured at install time to keep
76    /// the firing path purely local.
77    pub extra_params: HashMap<String, Property>,
78    /// Wall-clock millis since UNIX epoch when the trigger was
79    /// registered.
80    pub installed_at_ms: i64,
81    /// `true` when paused via `apoc.trigger.stop`. Paused
82    /// triggers stay in the registry (and replicate) but are
83    /// skipped on every commit until `apoc.trigger.start` flips
84    /// the flag back. Defaults to `false` for backward
85    /// compatibility with pre-pause stored specs.
86    #[serde(default)]
87    pub paused: bool,
88}
89
90/// In-memory cache + persistent backing of the trigger
91/// registry. Mutating operations (install / drop) write
92/// through to storage; reads served from the cache. The
93/// `Arc<dyn StorageEngine>` is held internally so the
94/// procedure call surface only needs the registry handle —
95/// not a separate store reference — to mutate the persistent
96/// state.
97#[derive(Clone)]
98pub struct TriggerRegistry {
99    inner: Arc<RwLock<HashMap<String, TriggerSpec>>>,
100    store: Arc<dyn StorageEngine>,
101}
102
103impl std::fmt::Debug for TriggerRegistry {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        let count = self.inner.read().map(|g| g.len()).unwrap_or(0);
106        f.debug_struct("TriggerRegistry")
107            .field("triggers", &count)
108            .finish()
109    }
110}
111
112impl TriggerRegistry {
113    /// Hydrate from the storage backend on startup. Skips any
114    /// blob that fails to deserialise — a corrupted entry should
115    /// surface a warning but not refuse to start the server.
116    pub fn from_storage(store: Arc<dyn StorageEngine>) -> Result<Self> {
117        let registry = Self {
118            inner: Arc::new(RwLock::new(HashMap::new())),
119            store,
120        };
121        let entries = registry
122            .store
123            .list_triggers()
124            .map_err(|e| Error::Procedure(format!("loading triggers from storage: {e}")))?;
125        let mut guard = registry.inner.write().expect("trigger registry lock");
126        for (name, blob) in entries {
127            match serde_json::from_slice::<TriggerSpec>(&blob) {
128                Ok(spec) => {
129                    guard.insert(name, spec);
130                }
131                Err(e) => {
132                    tracing::warn!(trigger = %name, error = %e, "skipping corrupt trigger spec");
133                }
134            }
135        }
136        drop(guard);
137        Ok(registry)
138    }
139
140    /// Persist a new trigger and add it to the cache. Returns
141    /// the previous spec under the same name, if any (so the
142    /// caller can report install-replaced-existing).
143    pub fn install(&self, spec: TriggerSpec) -> Result<Option<TriggerSpec>> {
144        let blob = serde_json::to_vec(&spec)
145            .map_err(|e| Error::Procedure(format!("encoding trigger spec: {e}")))?;
146        self.store
147            .put_trigger(&spec.name, &blob)
148            .map_err(|e| Error::Procedure(format!("persisting trigger: {e}")))?;
149        let mut guard = self.inner.write().expect("trigger registry lock");
150        Ok(guard.insert(spec.name.clone(), spec))
151    }
152
153    /// Drop by name. Returns the previous spec when one
154    /// existed; `Ok(None)` is the no-op case.
155    pub fn drop(&self, name: &str) -> Result<Option<TriggerSpec>> {
156        self.store
157            .delete_trigger(name)
158            .map_err(|e| Error::Procedure(format!("removing trigger: {e}")))?;
159        let mut guard = self.inner.write().expect("trigger registry lock");
160        Ok(guard.remove(name))
161    }
162
163    /// Snapshot every registered trigger in deterministic
164    /// (name-sorted) order.
165    pub fn list(&self) -> Vec<TriggerSpec> {
166        let guard = self.inner.read().expect("trigger registry lock");
167        let mut specs: Vec<TriggerSpec> = guard.values().cloned().collect();
168        specs.sort_by(|a, b| a.name.cmp(&b.name));
169        specs
170    }
171
172    /// `true` when at least one trigger is registered. Lets the
173    /// commit hook short-circuit the diff computation when no
174    /// trigger could possibly fire.
175    pub fn is_empty(&self) -> bool {
176        self.inner.read().expect("trigger registry lock").is_empty()
177    }
178
179    /// Reload the in-memory cache from storage. Called by the
180    /// Raft applier and the routing-mode trigger fan-out after
181    /// they apply an `InstallTrigger` / `DropTrigger` command,
182    /// so a peer that wasn't the original install/drop site
183    /// still sees the change immediately. Failures are surfaced
184    /// to the caller — the applier logs them, since by that
185    /// point the storage write has already succeeded.
186    pub fn refresh(&self) -> Result<()> {
187        let entries = self
188            .store
189            .list_triggers()
190            .map_err(|e| Error::Procedure(format!("refreshing trigger registry: {e}")))?;
191        let mut next: HashMap<String, TriggerSpec> = HashMap::new();
192        for (name, blob) in entries {
193            match serde_json::from_slice::<TriggerSpec>(&blob) {
194                Ok(spec) => {
195                    next.insert(name, spec);
196                }
197                Err(e) => {
198                    tracing::warn!(
199                        trigger = %name,
200                        error = %e,
201                        "skipping corrupt trigger spec on refresh"
202                    );
203                }
204            }
205        }
206        let mut guard = self.inner.write().expect("trigger registry lock");
207        *guard = next;
208        Ok(())
209    }
210}
211
212thread_local! {
213    /// Reentrancy guard. Set to `true` for the duration of a
214    /// trigger's body execution; the commit hook checks it and
215    /// skips trigger firing while the flag is on. Stops the
216    /// classic "trigger writes a node, which fires the trigger
217    /// again, which writes another node, ..." infinite loop.
218    /// Local to the firing thread because the executor runs
219    /// synchronously inside `spawn_blocking`.
220    static SUPPRESSING: Cell<bool> = const { Cell::new(false) };
221}
222
223/// Run `body` with the suppression flag set. The flag resets to
224/// the prior value on drop — even on panic — so a panicking
225/// trigger doesn't leave the calling thread permanently
226/// suppressed.
227pub fn with_suppression<R, F: FnOnce() -> R>(body: F) -> R {
228    struct Guard(bool);
229    impl Drop for Guard {
230        fn drop(&mut self) {
231            SUPPRESSING.with(|s| s.set(self.0));
232        }
233    }
234    let prev = SUPPRESSING.with(|s| s.replace(true));
235    let _guard = Guard(prev);
236    body()
237}
238
239/// Whether the calling thread is currently inside a trigger
240/// invocation. The commit hook calls this before firing.
241pub fn is_suppressed() -> bool {
242    SUPPRESSING.with(|s| s.get())
243}
244
245/// One property-change record (assignment or removal) emitted
246/// per (element, key) tuple touched by the tx. The `element`
247/// field is a `Value::Node` or `Value::Edge` depending on
248/// scope so the trigger body can inspect the wider shape.
249#[derive(Debug, Clone)]
250pub struct PropertyChange {
251    pub key: String,
252    pub old: Option<Property>,
253    pub new: Option<Property>,
254    pub element: Value,
255}
256
257/// One label-change record (assignment or removal). `element`
258/// is always a `Value::Node` since labels are node-scope.
259#[derive(Debug, Clone)]
260pub struct LabelChange {
261    pub label: String,
262    pub element: Value,
263}
264
265/// Per-tx diff handed to a trigger as `$created*` / `$deleted*` /
266/// `$assigned*` / `$removed*` params. Computed in `meshdb-rpc`
267/// (where `GraphCommand` lives) and passed in to the firing
268/// helpers. `into_param_map` shapes the lists into Neo4j APOC's
269/// expected param surface.
270#[derive(Debug, Default)]
271pub struct TriggerDiff {
272    pub created_nodes: Vec<Node>,
273    pub created_relationships: Vec<Edge>,
274    pub deleted_nodes: Vec<Node>,
275    pub deleted_relationships: Vec<Edge>,
276    pub assigned_node_properties: Vec<PropertyChange>,
277    pub removed_node_properties: Vec<PropertyChange>,
278    pub assigned_relationship_properties: Vec<PropertyChange>,
279    pub removed_relationship_properties: Vec<PropertyChange>,
280    pub assigned_labels: Vec<LabelChange>,
281    pub removed_labels: Vec<LabelChange>,
282}
283
284impl TriggerDiff {
285    /// Deep-clone the diff. Used by call sites that need to fan
286    /// the same diff out to multiple firing phases (e.g.
287    /// after-sync + afterAsync) since `into_param_map` consumes
288    /// `self`.
289    pub fn clone_diff(&self) -> Self {
290        Self {
291            created_nodes: self.created_nodes.clone(),
292            created_relationships: self.created_relationships.clone(),
293            deleted_nodes: self.deleted_nodes.clone(),
294            deleted_relationships: self.deleted_relationships.clone(),
295            assigned_node_properties: self.assigned_node_properties.clone(),
296            removed_node_properties: self.removed_node_properties.clone(),
297            assigned_relationship_properties: self.assigned_relationship_properties.clone(),
298            removed_relationship_properties: self.removed_relationship_properties.clone(),
299            assigned_labels: self.assigned_labels.clone(),
300            removed_labels: self.removed_labels.clone(),
301        }
302    }
303
304    /// Convert the diff into a [`ParamMap`] suitable for
305    /// passing to `execute_with_reader_and_procs`. The keys
306    /// match what Neo4j APOC's trigger system exposes:
307    /// `createdNodes` / `createdRelationships` /
308    /// `deletedNodes` / `deletedRelationships` (lists),
309    /// `assignedNodeProperties` / `removedNodeProperties` /
310    /// `assignedRelationshipProperties` /
311    /// `removedRelationshipProperties` (lists of
312    /// `{key, old, new, node|relationship}` maps), and
313    /// `assignedLabels` / `removedLabels` (lists of
314    /// `{label, node}` maps).
315    pub fn into_param_map(self, extra: &HashMap<String, Property>) -> ParamMap {
316        let mut params: ParamMap = HashMap::new();
317        params.insert(
318            "createdNodes".into(),
319            Value::List(self.created_nodes.into_iter().map(Value::Node).collect()),
320        );
321        params.insert(
322            "createdRelationships".into(),
323            Value::List(
324                self.created_relationships
325                    .into_iter()
326                    .map(Value::Edge)
327                    .collect(),
328            ),
329        );
330        params.insert(
331            "deletedNodes".into(),
332            Value::List(self.deleted_nodes.into_iter().map(Value::Node).collect()),
333        );
334        params.insert(
335            "deletedRelationships".into(),
336            Value::List(
337                self.deleted_relationships
338                    .into_iter()
339                    .map(Value::Edge)
340                    .collect(),
341            ),
342        );
343        params.insert(
344            "assignedNodeProperties".into(),
345            property_changes_to_value(self.assigned_node_properties, "node"),
346        );
347        params.insert(
348            "removedNodeProperties".into(),
349            property_changes_to_value(self.removed_node_properties, "node"),
350        );
351        params.insert(
352            "assignedRelationshipProperties".into(),
353            property_changes_to_value(self.assigned_relationship_properties, "relationship"),
354        );
355        params.insert(
356            "removedRelationshipProperties".into(),
357            property_changes_to_value(self.removed_relationship_properties, "relationship"),
358        );
359        params.insert(
360            "assignedLabels".into(),
361            label_changes_to_value(self.assigned_labels),
362        );
363        params.insert(
364            "removedLabels".into(),
365            label_changes_to_value(self.removed_labels),
366        );
367        for (k, p) in extra {
368            params.insert(k.clone(), Value::Property(p.clone()));
369        }
370        params
371    }
372}
373
374/// Lower a list of [`PropertyChange`] entries to a `Value::List`
375/// of `Value::Map` records with the keys Neo4j APOC expects:
376/// `key`, `old`, `new`, and the element under either `node` or
377/// `relationship` depending on scope.
378fn property_changes_to_value(changes: Vec<PropertyChange>, element_key: &str) -> Value {
379    Value::List(
380        changes
381            .into_iter()
382            .map(|c| {
383                let mut entry: HashMap<String, Value> = HashMap::new();
384                entry.insert("key".into(), Value::Property(Property::String(c.key)));
385                entry.insert(
386                    "old".into(),
387                    c.old.map(Value::Property).unwrap_or(Value::Null),
388                );
389                entry.insert(
390                    "new".into(),
391                    c.new.map(Value::Property).unwrap_or(Value::Null),
392                );
393                entry.insert(element_key.to_string(), c.element);
394                Value::Map(entry)
395            })
396            .collect(),
397    )
398}
399
400/// Lower a list of [`LabelChange`] entries to a `Value::List`
401/// of `Value::Map` records with `label` and `node` keys —
402/// matching Neo4j APOC's `assignedLabels` / `removedLabels`
403/// shape.
404fn label_changes_to_value(changes: Vec<LabelChange>) -> Value {
405    Value::List(
406        changes
407            .into_iter()
408            .map(|c| {
409                let mut entry: HashMap<String, Value> = HashMap::new();
410                entry.insert("label".into(), Value::Property(Property::String(c.label)));
411                entry.insert("node".into(), c.element);
412                Value::Map(entry)
413            })
414            .collect(),
415    )
416}
417
418/// Fire every registered before-phase trigger against the
419/// pre-commit diff. Returns Ok if every trigger ran cleanly,
420/// Err with the offending message if one failed — propagated
421/// to abort the originator's commit. Trigger writes are
422/// buffered into the supplied writer so the caller can merge
423/// them into the prepared command batch.
424///
425/// Unlike after-firing, before-firing stops at the first error:
426/// the user's commit hasn't happened yet, so a failing before-
427/// trigger cleanly aborts everything.
428pub fn fire_before_triggers(
429    registry: &TriggerRegistry,
430    diff: TriggerDiff,
431    reader: &dyn GraphReader,
432    writer: &dyn GraphWriter,
433    procedures: &ProcedureRegistry,
434) -> Result<()> {
435    if registry.is_empty() {
436        return Ok(());
437    }
438    if is_suppressed() {
439        return Ok(());
440    }
441    let triggers: Vec<TriggerSpec> = registry
442        .list()
443        .into_iter()
444        .filter(|t| !t.paused && t.phase == "before")
445        .collect();
446    if triggers.is_empty() {
447        return Ok(());
448    }
449    let extras: Vec<HashMap<String, Property>> =
450        triggers.iter().map(|t| t.extra_params.clone()).collect();
451    let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
452    let mut first_error: Option<Error> = None;
453    with_suppression(|| {
454        for ((trigger, extra), diff) in triggers
455            .iter()
456            .zip(extras.iter())
457            .zip(diff_clones.into_iter())
458        {
459            let params = diff.into_param_map(extra);
460            let stmt = match meshdb_cypher::parse(&trigger.query) {
461                Ok(s) => s,
462                Err(e) => {
463                    first_error = Some(Error::Procedure(format!(
464                        "before-trigger '{}' parse error: {e}",
465                        trigger.name
466                    )));
467                    return;
468                }
469            };
470            let plan = match meshdb_cypher::plan(&stmt) {
471                Ok(p) => p,
472                Err(e) => {
473                    first_error = Some(Error::Procedure(format!(
474                        "before-trigger '{}' plan error: {e}",
475                        trigger.name
476                    )));
477                    return;
478                }
479            };
480            if let Err(e) = crate::ops::execute_with_reader_and_procs(
481                &plan, reader, writer, &params, procedures,
482            ) {
483                first_error = Some(Error::Procedure(format!(
484                    "before-trigger '{}' aborted commit: {e}",
485                    trigger.name
486                )));
487                return;
488            }
489        }
490    });
491    match first_error {
492        Some(e) => Err(e),
493        None => Ok(()),
494    }
495}
496
497/// Fire every registered trigger of `phase` against the given
498/// diff. Errors are logged and swallowed — the original tx has
499/// already committed (for `after` / `afterAsync` / `rollback`)
500/// or already aborted (for `rollback`), so a failing trigger
501/// can't undo anything. Used as the workhorse for the three
502/// non-blocking phases; `before` has its own entry point with
503/// abort semantics.
504pub fn fire_phase_triggers(
505    registry: &TriggerRegistry,
506    phase: &str,
507    diff: TriggerDiff,
508    reader: &dyn GraphReader,
509    writer: &dyn GraphWriter,
510    procedures: &ProcedureRegistry,
511) {
512    if registry.is_empty() {
513        return;
514    }
515    if is_suppressed() {
516        return;
517    }
518    let triggers: Vec<TriggerSpec> = registry
519        .list()
520        .into_iter()
521        .filter(|t| !t.paused && t.phase == phase)
522        .collect();
523    if triggers.is_empty() {
524        return;
525    }
526    let extras: Vec<HashMap<String, Property>> =
527        triggers.iter().map(|t| t.extra_params.clone()).collect();
528    let diff_clones: Vec<TriggerDiff> = (0..triggers.len()).map(|_| diff.clone_diff()).collect();
529    with_suppression(|| {
530        for ((trigger, extra), diff) in triggers
531            .iter()
532            .zip(extras.iter())
533            .zip(diff_clones.into_iter())
534        {
535            let params = diff.into_param_map(extra);
536            let stmt = match meshdb_cypher::parse(&trigger.query) {
537                Ok(s) => s,
538                Err(e) => {
539                    tracing::warn!(
540                        trigger = %trigger.name,
541                        phase = phase,
542                        error = %e,
543                        "trigger Cypher failed to parse — skipping"
544                    );
545                    continue;
546                }
547            };
548            let plan = match meshdb_cypher::plan(&stmt) {
549                Ok(p) => p,
550                Err(e) => {
551                    tracing::warn!(
552                        trigger = %trigger.name,
553                        phase = phase,
554                        error = %e,
555                        "trigger Cypher failed to plan — skipping"
556                    );
557                    continue;
558                }
559            };
560            if let Err(e) = crate::ops::execute_with_reader_and_procs(
561                &plan, reader, writer, &params, procedures,
562            ) {
563                tracing::warn!(
564                    trigger = %trigger.name,
565                    phase = phase,
566                    error = %e,
567                    "trigger body returned an error — skipping"
568                );
569            }
570        }
571    });
572}
573
574/// Fire every registered after-phase trigger. Backwards-compat
575/// shim for callers that haven't migrated to
576/// [`fire_phase_triggers`] yet.
577pub fn fire_after_triggers(
578    registry: &TriggerRegistry,
579    diff: TriggerDiff,
580    reader: &dyn GraphReader,
581    writer: &dyn GraphWriter,
582    procedures: &ProcedureRegistry,
583) {
584    fire_phase_triggers(registry, "after", diff, reader, writer, procedures);
585}
586
587/// Helper: wall-clock millis since the UNIX epoch. Used to
588/// stamp `installed_at_ms` on a fresh trigger spec.
589pub fn now_ms() -> i64 {
590    std::time::SystemTime::now()
591        .duration_since(std::time::UNIX_EPOCH)
592        .map(|d| d.as_millis() as i64)
593        .unwrap_or(0)
594}
595
596// ---------------- Procedure call entry points ----------------
597
598/// `apoc.trigger.install(databaseName, name, statement, selector, config)`.
599/// `databaseName` is accepted but ignored — Mesh is
600/// single-database today. `selector` and `config` are inspected
601/// for a `phase` (defaults to "after") and `params` (defaults to
602/// empty); other keys are ignored for forward-compat.
603///
604/// Emits the install through the writer (as a
605/// [`GraphCommand::InstallTrigger`](meshdb_cluster::GraphCommand::InstallTrigger))
606/// so the commit path replicates it across the cluster. The
607/// in-memory registry refreshes on each peer when the storage
608/// CF lands its updated entry.
609pub fn install_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
610    if args.len() < 3 {
611        return Err(Error::Procedure(
612            "apoc.trigger.install: expects (databaseName, name, statement[, selector[, config]])"
613                .into(),
614        ));
615    }
616    let _db = expect_string(&args[0], "first argument (databaseName)")?;
617    let name = expect_string(&args[1], "second argument (name)")?;
618    let query = expect_string(&args[2], "third argument (statement)")?;
619    let phase = if args.len() > 3 {
620        match &args[3] {
621            Value::Property(Property::Map(m)) => match m.get("phase") {
622                Some(Property::String(s)) => s.clone(),
623                Some(Property::Null) | None => "after".to_string(),
624                Some(other) => {
625                    return Err(Error::Procedure(format!(
626                        "apoc.trigger.install: selector.phase must be a string, got {other:?}"
627                    )));
628                }
629            },
630            Value::Null | Value::Property(Property::Null) => "after".to_string(),
631            other => {
632                return Err(Error::Procedure(format!(
633                    "apoc.trigger.install: selector must be a map or null, got {other:?}"
634                )));
635            }
636        }
637    } else {
638        "after".to_string()
639    };
640    if !matches!(
641        phase.as_str(),
642        "before" | "after" | "afterAsync" | "rollback"
643    ) {
644        return Err(Error::Procedure(format!(
645            "apoc.trigger.install: phase must be one of \
646             'before' / 'after' / 'afterAsync' / 'rollback', got {phase:?}"
647        )));
648    }
649    let extra_params: HashMap<String, Property> = if args.len() > 4 {
650        match &args[4] {
651            Value::Property(Property::Map(m)) => match m.get("params") {
652                Some(Property::Map(p)) => p.clone(),
653                Some(Property::Null) | None => HashMap::new(),
654                Some(other) => {
655                    return Err(Error::Procedure(format!(
656                        "apoc.trigger.install: config.params must be a map, got {other:?}"
657                    )));
658                }
659            },
660            Value::Null | Value::Property(Property::Null) => HashMap::new(),
661            other => {
662                return Err(Error::Procedure(format!(
663                    "apoc.trigger.install: config must be a map or null, got {other:?}"
664                )));
665            }
666        }
667    } else {
668        HashMap::new()
669    };
670    // Validate the query parses + plans before persisting so
671    // `install` rejects garbage at registration time rather than
672    // silently failing on every commit afterwards.
673    let stmt = meshdb_cypher::parse(&query)
674        .map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher parse error: {e}")))?;
675    meshdb_cypher::plan(&stmt)
676        .map_err(|e| Error::Procedure(format!("apoc.trigger.install: Cypher plan error: {e}")))?;
677    let spec = TriggerSpec {
678        name: name.clone(),
679        query: query.clone(),
680        phase,
681        extra_params,
682        installed_at_ms: now_ms(),
683        paused: false,
684    };
685    let blob = serde_json::to_vec(&spec)
686        .map_err(|e| Error::Procedure(format!("apoc.trigger.install: encoding spec: {e}")))?;
687    writer.install_trigger(&name, &blob)?;
688    let mut row: ProcRow = HashMap::new();
689    row.insert("name".into(), Value::Property(Property::String(name)));
690    row.insert("query".into(), Value::Property(Property::String(query)));
691    row.insert("installed".into(), Value::Property(Property::Bool(true)));
692    // V2 drops the synchronous `previous` column: the actual
693    // apply happens through the cluster commit path, so we
694    // can't synchronously report whether a prior trigger
695    // existed under the same name. Callers that need the
696    // before/after pair can `apoc.trigger.list` first.
697    row.insert("previous".into(), Value::Null);
698    Ok(vec![row])
699}
700
701/// `apoc.trigger.start(databaseName, name)` — un-pause a
702/// previously stopped trigger. Looks up the existing spec
703/// through the registry, sets `paused = false`, and re-emits
704/// the spec via the writer so the cluster commit replicates
705/// the unpause. Errors if the trigger doesn't exist.
706pub fn start_call(
707    registry: &TriggerRegistry,
708    writer: &dyn GraphWriter,
709    args: &[Value],
710) -> Result<Vec<ProcRow>> {
711    set_paused(registry, writer, args, false, "apoc.trigger.start")
712}
713
714/// `apoc.trigger.stop(databaseName, name)` — pause a registered
715/// trigger. Mirrors `start_call`; sets `paused = true`.
716pub fn stop_call(
717    registry: &TriggerRegistry,
718    writer: &dyn GraphWriter,
719    args: &[Value],
720) -> Result<Vec<ProcRow>> {
721    set_paused(registry, writer, args, true, "apoc.trigger.stop")
722}
723
724/// Shared body for start_call / stop_call.
725fn set_paused(
726    registry: &TriggerRegistry,
727    writer: &dyn GraphWriter,
728    args: &[Value],
729    paused: bool,
730    proc_name: &'static str,
731) -> Result<Vec<ProcRow>> {
732    if args.len() < 2 {
733        return Err(Error::Procedure(format!(
734            "{proc_name}: expects (databaseName, name)"
735        )));
736    }
737    let _db = expect_string(&args[0], "first argument (databaseName)")?;
738    let name = expect_string(&args[1], "second argument (name)")?;
739    let mut spec = registry
740        .list()
741        .into_iter()
742        .find(|s| s.name == name)
743        .ok_or_else(|| Error::Procedure(format!("{proc_name}: no trigger named '{name}'")))?;
744    spec.paused = paused;
745    let blob = serde_json::to_vec(&spec)
746        .map_err(|e| Error::Procedure(format!("{proc_name}: encoding spec: {e}")))?;
747    writer.install_trigger(&name, &blob)?;
748    let mut row: ProcRow = HashMap::new();
749    row.insert("name".into(), Value::Property(Property::String(name)));
750    row.insert("paused".into(), Value::Property(Property::Bool(paused)));
751    Ok(vec![row])
752}
753
754/// `apoc.trigger.drop(databaseName, name)`. Same write-path
755/// shape as `install_call` — the drop rides through the cluster
756/// commit so every peer's storage drops the entry. The yielded
757/// `removed` column is always `true` in V2 because the actual
758/// apply happens on the leader/applier path; the install/drop
759/// procedure itself only buffers the command.
760pub fn drop_call(writer: &dyn GraphWriter, args: &[Value]) -> Result<Vec<ProcRow>> {
761    if args.len() < 2 {
762        return Err(Error::Procedure(
763            "apoc.trigger.drop: expects (databaseName, name)".into(),
764        ));
765    }
766    let _db = expect_string(&args[0], "first argument (databaseName)")?;
767    let name = expect_string(&args[1], "second argument (name)")?;
768    writer.drop_trigger(&name)?;
769    let mut row: ProcRow = HashMap::new();
770    row.insert("name".into(), Value::Property(Property::String(name)));
771    row.insert("removed".into(), Value::Property(Property::Bool(true)));
772    Ok(vec![row])
773}
774
775/// `apoc.trigger.list()`.
776pub fn list_call(registry: &TriggerRegistry) -> Result<Vec<ProcRow>> {
777    Ok(registry
778        .list()
779        .into_iter()
780        .map(|spec| {
781            let mut row: ProcRow = HashMap::new();
782            row.insert("name".into(), Value::Property(Property::String(spec.name)));
783            row.insert(
784                "query".into(),
785                Value::Property(Property::String(spec.query)),
786            );
787            row.insert(
788                "phase".into(),
789                Value::Property(Property::String(spec.phase)),
790            );
791            row.insert(
792                "installed_at".into(),
793                Value::Property(Property::Int64(spec.installed_at_ms)),
794            );
795            row.insert(
796                "paused".into(),
797                Value::Property(Property::Bool(spec.paused)),
798            );
799            row
800        })
801        .collect())
802}
803
804fn expect_string(v: &Value, position: &str) -> Result<String> {
805    match v {
806        Value::Property(Property::String(s)) => Ok(s.clone()),
807        Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
808            "apoc.trigger.*: {position} must be a string, got null"
809        ))),
810        other => Err(Error::Procedure(format!(
811            "apoc.trigger.*: {position} must be a string, got {other:?}"
812        ))),
813    }
814}