uni_plugin/traits/trigger.rs
1//! Fine-grained triggers — APOC `apoc.trigger.*` analogue.
2
3use std::sync::Arc;
4
5use datafusion::arrow::record_batch::RecordBatch;
6use smol_str::SmolStr;
7
8use crate::errors::FnError;
9
10/// A fine-grained mutation trigger.
11pub trait TriggerPlugin: Send + Sync {
12 /// Subscription describing which events this trigger receives.
13 fn subscription(&self) -> &TriggerSubscription;
14
15 /// Fire the trigger with a batch of matching mutation events.
16 ///
17 /// # Threading policy
18 ///
19 /// `fire` is synchronous; the host wraps it differently depending
20 /// on the subscription's [`FireMode`]:
21 ///
22 /// - [`FireMode::Synchronous`] — invoked inline on the transaction
23 /// commit path, on a `tokio::task::spawn_blocking` worker
24 /// thread. Returning [`TriggerOutcome::Reject`] aborts the
25 /// transaction; long-running work blocks the committer, so keep
26 /// the body tight.
27 /// - [`FireMode::Async`] — fires off a separate `spawn_blocking`
28 /// task after the transaction commits; cannot reject (the
29 /// transaction has already landed). Failures are logged but do
30 /// not roll back.
31 /// - [`FireMode::EventualConsistency`] — batched via the
32 /// `BackgroundJobProvider` machinery; the same blocking-worker
33 /// contract from [`crate::traits::background::BackgroundJobProvider::execute`] applies.
34 ///
35 /// In every mode the body must not call `block_on` against the
36 /// host runtime; panics are caught at the dispatcher boundary.
37 ///
38 /// See `docs/PLUGIN_THREADING.md` for the long-form rationale.
39 ///
40 /// # Errors
41 ///
42 /// Returns [`FnError`] if the fire cannot complete. For `Synchronous`
43 /// triggers this aborts the surrounding transaction.
44 fn fire(
45 &self,
46 ctx: TriggerContext<'_>,
47 events: &MutationBatch,
48 ) -> Result<TriggerOutcome, FnError>;
49
50 /// Re-fire after a [`TriggerOutcome::Defer`] previously returned.
51 ///
52 /// The host's deferral queue invokes this with the original
53 /// `payload` once the `delay` has elapsed. The default
54 /// implementation delegates back to [`Self::fire`] with the
55 /// original [`MutationBatch`] — existing trigger plugins keep
56 /// working without changes. Plugins that need access to the
57 /// `payload` (e.g., to resume a long-running aggregation) override
58 /// this method.
59 ///
60 /// Returning [`TriggerOutcome::Defer`] from `on_deferred` re-queues
61 /// the item with `attempt + 1`, capped at the host's
62 /// `DEFER_MAX_ATTEMPTS`.
63 ///
64 /// # Errors
65 ///
66 /// Returns [`FnError`] when the deferred fire cannot complete.
67 /// The error is logged at warn and the item is dropped.
68 fn on_deferred(
69 &self,
70 ctx: TriggerContext<'_>,
71 events: &MutationBatch,
72 _payload: &str,
73 ) -> Result<TriggerOutcome, FnError> {
74 self.fire(ctx, events)
75 }
76}
77
78/// Selectors describing the events this trigger subscribes to.
79#[derive(Clone, Debug)]
80pub struct TriggerSubscription {
81 /// Phase in the mutation lifecycle.
82 pub phase: TriggerPhase,
83 /// Event-kind bitmask (`NodeCreate | NodeUpdate | EdgeDelete | ...`).
84 pub events: TriggerEventMask,
85 /// Optional label allow-list; `None` means all labels.
86 pub labels: Option<Vec<SmolStr>>,
87 /// Optional edge-type allow-list.
88 pub edge_types: Option<Vec<SmolStr>>,
89 /// Optional property allow-list — for `*Update` events, restrict to
90 /// updates touching these properties.
91 pub properties: Option<Vec<SmolStr>>,
92 /// Cypher boolean expression evaluated per event (parsed by host).
93 pub predicate_source: Option<String>,
94 /// Firing mode (Sync / Async / Eventual).
95 pub fire_mode: FireMode,
96 /// Markdown docs.
97 pub docs: String,
98}
99
100/// Lifecycle phase a trigger subscribes to.
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102#[non_exhaustive]
103pub enum TriggerPhase {
104 /// Before the mutation is applied — may reject.
105 BeforeMutation,
106 /// After the mutation is applied, in the same transaction.
107 AfterMutation,
108 /// Before transaction commit — may reject.
109 BeforeCommit,
110 /// After transaction commit; cannot reject.
111 AfterCommit,
112}
113
114/// Bitmask of event kinds.
115#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
116pub struct TriggerEventMask(pub u32);
117
118impl TriggerEventMask {
119 /// Node creation.
120 pub const NODE_CREATE: Self = Self(1 << 0);
121 /// Node update.
122 pub const NODE_UPDATE: Self = Self(1 << 1);
123 /// Node deletion.
124 pub const NODE_DELETE: Self = Self(1 << 2);
125 /// Edge creation.
126 pub const EDGE_CREATE: Self = Self(1 << 3);
127 /// Edge update.
128 pub const EDGE_UPDATE: Self = Self(1 << 4);
129 /// Edge deletion.
130 pub const EDGE_DELETE: Self = Self(1 << 5);
131 /// Property change (covered by Node/Edge Update — independent bit for
132 /// finer-grained matching).
133 pub const PROPERTY_CHANGE: Self = Self(1 << 6);
134 /// Label added.
135 pub const LABEL_ADDED: Self = Self(1 << 7);
136 /// Label removed.
137 pub const LABEL_REMOVED: Self = Self(1 << 8);
138
139 /// Combine two masks.
140 #[must_use]
141 pub const fn union(self, other: Self) -> Self {
142 Self(self.0 | other.0)
143 }
144
145 /// Check whether this mask is a superset of `other`.
146 #[must_use]
147 pub const fn contains(self, other: Self) -> bool {
148 (self.0 & other.0) == other.0
149 }
150}
151
152/// Firing mode.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154#[non_exhaustive]
155pub enum FireMode {
156 /// Synchronous — blocks the mutation; may reject.
157 Synchronous,
158 /// Fires after commit; cannot reject.
159 Async,
160 /// Eventually consistent — batched via `BackgroundJobProvider`.
161 EventualConsistency,
162}
163
164/// Outcome returned by a trigger.
165#[derive(Debug)]
166#[non_exhaustive]
167pub enum TriggerOutcome {
168 /// Continue normally.
169 Continue,
170 /// Reject the surrounding mutation / transaction (valid only in
171 /// `Before*` phases).
172 Reject {
173 /// Human-readable rejection reason.
174 reason: String,
175 },
176 /// Defer this trigger's firing (e.g., for batched aggregation).
177 Defer {
178 /// Deferral metadata understood by the trigger implementation.
179 until: TriggerDeferral,
180 },
181}
182
183/// Deferral marker returned by [`TriggerOutcome::Defer`].
184///
185/// Carries an implementation-defined `payload` plus an optional
186/// `delay` (FU-5). When `delay` is `None` the deferred item re-fires
187/// on the next scheduler tick (legacy "any moment now" semantics);
188/// when `Some(d)` the host's deferral queue waits at least `d` before
189/// re-invoking the trigger.
190#[derive(Clone, Debug)]
191#[non_exhaustive]
192pub struct TriggerDeferral {
193 /// Implementation-defined payload — opaque to the host. Persisted
194 /// across `Uni` restarts when the host's durable defer queue is
195 /// enabled.
196 pub payload: String,
197 /// Wait at least this duration before re-firing. `None` means "as
198 /// soon as the next tick fires" (~50–100 ms).
199 pub delay: Option<std::time::Duration>,
200}
201
202impl TriggerDeferral {
203 /// Construct a deferral with no delay.
204 ///
205 /// Use this when the trigger is simply asking "re-queue me for
206 /// the next tick" — e.g., when an external prerequisite resource
207 /// might become available at any moment.
208 #[must_use]
209 pub fn from_payload(payload: impl Into<String>) -> Self {
210 Self {
211 payload: payload.into(),
212 delay: None,
213 }
214 }
215
216 /// Construct a deferral with an explicit `delay`.
217 ///
218 /// The host's deferral queue waits at least `delay` before
219 /// re-invoking [`TriggerPlugin::on_deferred`] (or, when the host
220 /// has not adopted the `on_deferred` callback, [`TriggerPlugin::fire`]
221 /// with the original [`MutationBatch`]).
222 #[must_use]
223 pub fn after(payload: impl Into<String>, delay: std::time::Duration) -> Self {
224 Self {
225 payload: payload.into(),
226 delay: Some(delay),
227 }
228 }
229}
230
231/// Per-fire context.
232#[derive(Debug)]
233#[non_exhaustive]
234pub struct TriggerContext<'a> {
235 /// Session identifier.
236 pub session_id: &'a str,
237 /// Transaction identifier.
238 pub tx_id: u64,
239}
240
241impl<'a> TriggerContext<'a> {
242 /// Construct a fresh context. The struct is `#[non_exhaustive]` so
243 /// external callers can't use struct-literal syntax; this
244 /// constructor is the supported path. Future fields ship via
245 /// `with_*` builder methods to preserve API compatibility.
246 #[must_use]
247 pub fn new(session_id: &'a str, tx_id: u64) -> Self {
248 Self { session_id, tx_id }
249 }
250}
251
252/// Batch of mutation events delivered to a trigger.
253///
254/// The batch's `RecordBatch` schema is host-defined and stable:
255/// `event_kind | vid_or_eid | label | property | old_value | new_value | …`.
256#[derive(Clone, Debug)]
257pub struct MutationBatch {
258 /// The events as a typed columnar batch.
259 pub events: Arc<RecordBatch>,
260}