Skip to main content

uni_plugin/traits/
trigger.rs

1//! Fine-grained triggers — APOC `apoc.trigger.*` analogue.
2
3use std::sync::Arc;
4
5use datafusion::arrow::record_batch::RecordBatch;
6use smol_str::SmolStr;
7
8use crate::errors::FnError;
9
10/// A fine-grained mutation trigger.
11pub trait TriggerPlugin: Send + Sync {
12    /// Subscription describing which events this trigger receives.
13    fn subscription(&self) -> &TriggerSubscription;
14
15    /// Fire the trigger with a batch of matching mutation events.
16    ///
17    /// # Threading policy
18    ///
19    /// `fire` is synchronous; the host wraps it differently depending
20    /// on the subscription's [`FireMode`]:
21    ///
22    /// - [`FireMode::Synchronous`] — invoked inline on the transaction
23    ///   commit path, on a `tokio::task::spawn_blocking` worker
24    ///   thread. Returning [`TriggerOutcome::Reject`] aborts the
25    ///   transaction; long-running work blocks the committer, so keep
26    ///   the body tight.
27    /// - [`FireMode::Async`] — fires off a separate `spawn_blocking`
28    ///   task after the transaction commits; cannot reject (the
29    ///   transaction has already landed). Failures are logged but do
30    ///   not roll back.
31    /// - [`FireMode::EventualConsistency`] — batched via the
32    ///   `BackgroundJobProvider` machinery; the same blocking-worker
33    ///   contract from [`crate::traits::background::BackgroundJobProvider::execute`] applies.
34    ///
35    /// In every mode the body must not call `block_on` against the
36    /// host runtime; panics are caught at the dispatcher boundary.
37    ///
38    /// See `docs/PLUGIN_THREADING.md` for the long-form rationale.
39    ///
40    /// # Errors
41    ///
42    /// Returns [`FnError`] if the fire cannot complete. For `Synchronous`
43    /// triggers this aborts the surrounding transaction.
44    fn fire(
45        &self,
46        ctx: TriggerContext<'_>,
47        events: &MutationBatch,
48    ) -> Result<TriggerOutcome, FnError>;
49
50    /// Re-fire after a [`TriggerOutcome::Defer`] previously returned.
51    ///
52    /// The host's deferral queue invokes this with the original
53    /// `payload` once the `delay` has elapsed. The default
54    /// implementation delegates back to [`Self::fire`] with the
55    /// original [`MutationBatch`] — existing trigger plugins keep
56    /// working without changes. Plugins that need access to the
57    /// `payload` (e.g., to resume a long-running aggregation) override
58    /// this method.
59    ///
60    /// Returning [`TriggerOutcome::Defer`] from `on_deferred` re-queues
61    /// the item with `attempt + 1`, capped at the host's
62    /// `DEFER_MAX_ATTEMPTS`.
63    ///
64    /// # Errors
65    ///
66    /// Returns [`FnError`] when the deferred fire cannot complete.
67    /// The error is logged at warn and the item is dropped.
68    fn on_deferred(
69        &self,
70        ctx: TriggerContext<'_>,
71        events: &MutationBatch,
72        _payload: &str,
73    ) -> Result<TriggerOutcome, FnError> {
74        self.fire(ctx, events)
75    }
76}
77
78/// Selectors describing the events this trigger subscribes to.
79#[derive(Clone, Debug)]
80pub struct TriggerSubscription {
81    /// Phase in the mutation lifecycle.
82    pub phase: TriggerPhase,
83    /// Event-kind bitmask (`NodeCreate | NodeUpdate | EdgeDelete | ...`).
84    pub events: TriggerEventMask,
85    /// Optional label allow-list; `None` means all labels.
86    pub labels: Option<Vec<SmolStr>>,
87    /// Optional edge-type allow-list.
88    pub edge_types: Option<Vec<SmolStr>>,
89    /// Optional property allow-list — for `*Update` events, restrict to
90    /// updates touching these properties.
91    pub properties: Option<Vec<SmolStr>>,
92    /// Cypher boolean expression evaluated per event (parsed by host).
93    pub predicate_source: Option<String>,
94    /// Firing mode (Sync / Async / Eventual).
95    pub fire_mode: FireMode,
96    /// Markdown docs.
97    pub docs: String,
98}
99
100/// Lifecycle phase a trigger subscribes to.
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102#[non_exhaustive]
103pub enum TriggerPhase {
104    /// Before the mutation is applied — may reject.
105    BeforeMutation,
106    /// After the mutation is applied, in the same transaction.
107    AfterMutation,
108    /// Before transaction commit — may reject.
109    BeforeCommit,
110    /// After transaction commit; cannot reject.
111    AfterCommit,
112}
113
114/// Bitmask of event kinds.
115#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
116pub struct TriggerEventMask(pub u32);
117
118impl TriggerEventMask {
119    /// Node creation.
120    pub const NODE_CREATE: Self = Self(1 << 0);
121    /// Node update.
122    pub const NODE_UPDATE: Self = Self(1 << 1);
123    /// Node deletion.
124    pub const NODE_DELETE: Self = Self(1 << 2);
125    /// Edge creation.
126    pub const EDGE_CREATE: Self = Self(1 << 3);
127    /// Edge update.
128    pub const EDGE_UPDATE: Self = Self(1 << 4);
129    /// Edge deletion.
130    pub const EDGE_DELETE: Self = Self(1 << 5);
131    /// Property change (covered by Node/Edge Update — independent bit for
132    /// finer-grained matching).
133    pub const PROPERTY_CHANGE: Self = Self(1 << 6);
134    /// Label added.
135    pub const LABEL_ADDED: Self = Self(1 << 7);
136    /// Label removed.
137    pub const LABEL_REMOVED: Self = Self(1 << 8);
138
139    /// Combine two masks.
140    #[must_use]
141    pub const fn union(self, other: Self) -> Self {
142        Self(self.0 | other.0)
143    }
144
145    /// Check whether this mask is a superset of `other`.
146    #[must_use]
147    pub const fn contains(self, other: Self) -> bool {
148        (self.0 & other.0) == other.0
149    }
150}
151
152/// Firing mode.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154#[non_exhaustive]
155pub enum FireMode {
156    /// Synchronous — blocks the mutation; may reject.
157    Synchronous,
158    /// Fires after commit; cannot reject.
159    Async,
160    /// Eventually consistent — batched via `BackgroundJobProvider`.
161    EventualConsistency,
162}
163
164/// Outcome returned by a trigger.
165#[derive(Debug)]
166#[non_exhaustive]
167pub enum TriggerOutcome {
168    /// Continue normally.
169    Continue,
170    /// Reject the surrounding mutation / transaction (valid only in
171    /// `Before*` phases).
172    Reject {
173        /// Human-readable rejection reason.
174        reason: String,
175    },
176    /// Defer this trigger's firing (e.g., for batched aggregation).
177    Defer {
178        /// Deferral metadata understood by the trigger implementation.
179        until: TriggerDeferral,
180    },
181}
182
183/// Deferral marker returned by [`TriggerOutcome::Defer`].
184///
185/// Carries an implementation-defined `payload` plus an optional
186/// `delay` (FU-5). When `delay` is `None` the deferred item re-fires
187/// on the next scheduler tick (legacy "any moment now" semantics);
188/// when `Some(d)` the host's deferral queue waits at least `d` before
189/// re-invoking the trigger.
190#[derive(Clone, Debug)]
191#[non_exhaustive]
192pub struct TriggerDeferral {
193    /// Implementation-defined payload — opaque to the host. Persisted
194    /// across `Uni` restarts when the host's durable defer queue is
195    /// enabled.
196    pub payload: String,
197    /// Wait at least this duration before re-firing. `None` means "as
198    /// soon as the next tick fires" (~50–100 ms).
199    pub delay: Option<std::time::Duration>,
200}
201
202impl TriggerDeferral {
203    /// Construct a deferral with no delay.
204    ///
205    /// Use this when the trigger is simply asking "re-queue me for
206    /// the next tick" — e.g., when an external prerequisite resource
207    /// might become available at any moment.
208    #[must_use]
209    pub fn from_payload(payload: impl Into<String>) -> Self {
210        Self {
211            payload: payload.into(),
212            delay: None,
213        }
214    }
215
216    /// Construct a deferral with an explicit `delay`.
217    ///
218    /// The host's deferral queue waits at least `delay` before
219    /// re-invoking [`TriggerPlugin::on_deferred`] (or, when the host
220    /// has not adopted the `on_deferred` callback, [`TriggerPlugin::fire`]
221    /// with the original [`MutationBatch`]).
222    #[must_use]
223    pub fn after(payload: impl Into<String>, delay: std::time::Duration) -> Self {
224        Self {
225            payload: payload.into(),
226            delay: Some(delay),
227        }
228    }
229}
230
231/// Per-fire context.
232#[derive(Debug)]
233#[non_exhaustive]
234pub struct TriggerContext<'a> {
235    /// Session identifier.
236    pub session_id: &'a str,
237    /// Transaction identifier.
238    pub tx_id: u64,
239}
240
241impl<'a> TriggerContext<'a> {
242    /// Construct a fresh context. The struct is `#[non_exhaustive]` so
243    /// external callers can't use struct-literal syntax; this
244    /// constructor is the supported path. Future fields ship via
245    /// `with_*` builder methods to preserve API compatibility.
246    #[must_use]
247    pub fn new(session_id: &'a str, tx_id: u64) -> Self {
248        Self { session_id, tx_id }
249    }
250}
251
252/// Batch of mutation events delivered to a trigger.
253///
254/// The batch's `RecordBatch` schema is host-defined and stable:
255/// `event_kind | vid_or_eid | label | property | old_value | new_value | …`.
256#[derive(Clone, Debug)]
257pub struct MutationBatch {
258    /// The events as a typed columnar batch.
259    pub events: Arc<RecordBatch>,
260}