Skip to main content

uni_plugin_host/
triggers.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Host-side dispatch for `TriggerPlugin` registrations (M5f).
5//!
6//! Bridges `PluginRegistry::triggers()` into the transaction commit
7//! path. The dispatcher builds a per-phase routing table once per
8//! commit, drains mutation events from the transaction's private L0
9//! buffer into a stable Arrow `RecordBatch`, applies subscription
10//! selectors (event-kind mask + label / edge-type / property filter),
11//! and invokes each matching trigger at the appropriate phase.
12//!
13//! Phase ordering inside a single commit:
14//! 1. `BeforeMutation` then `BeforeCommit` — fired before the writer
15//!    lock is taken. `Synchronous` reject aborts the transaction.
16//! 2. WAL flush + L1 merge run.
17//! 3. `AfterMutation` then `AfterCommit` — fired after publish. `Async`
18//!    fire-mode triggers are spawned onto the tokio runtime so the
19//!    writer's hot path stays untouched.
20//!
21//! Behavior contract:
22//! - `predicate_source` is compiled at router build (per-commit) via
23//!   `uni_cypher::parse_expression` → AST property-ref rewrite →
24//!   `cypher_expr_to_df` → DataFusion `PhysicalExpr`, and evaluated
25//!   against the per-row event batch in `filter_for`. Predicates may
26//!   reference the event-row columns (`event_kind`, `vid_or_eid`,
27//!   `label`, `property`, `old_value`, `new_value`) as well as
28//!   per-entity properties: `n.foo` reads the new (post-mutation)
29//!   property value, `old.foo` reads the pre-image. Referenced
30//!   property keys are tracked in `RouteEntry::properties_referenced`
31//!   so [`MutationEvents::from_l0_with_probe`] materializes exactly
32//!   those keys into the per-row property bags — predicate-gated
33//!   cost, no work for property-free predicates.
34//! - `TriggerOutcome::Defer` enqueues the trigger fire into the
35//!   per-`Uni` [`DeferralQueue`] (in-memory, ticked at 50ms by the
36//!   background task spawned in `Uni::build`). Items re-fire on the
37//!   next tick; re-deferring is capped at `DEFER_MAX_ATTEMPTS`.
38//!   Restart-durable persistence lives with the M11 CDC scheduler.
39//! - `NODE_CREATE` / `NODE_UPDATE` / `NODE_DELETE` (and the edge
40//!   analogs) are distinguished via a committed-state probe
41//!   ([`PreExistingProbe`]) passed to
42//!   [`MutationEvents::from_l0_with_probe`]. The probe covers (a) the
43//!   current L0 buffer + pending-flush L0s via
44//!   [`PreExistingProbe::from_l0_chain`] (sync, no I/O) and (b) the
45//!   L1 storage layer via [`PreExistingProbe::extend_with_l1`] (async,
46//!   batched `_vid IN (…)` scan per label, chunked at 1024 VIDs).
47//!   Callers that construct [`MutationEvents`] without a probe
48//!   ([`MutationEvents::from_l0`]) fall back to emitting `NODE_UPDATE`
49//!   / `EDGE_UPDATE` for every non-tombstoned write.
50//! - `old_value` is populated from the L0-chain probe for vertices
51//!   and edges visible there, and from the L1 probe (which now
52//!   projects every property column on the candidate label) for
53//!   vertices that were drained out of L0 by a previous flush. Edge
54//!   pre-images are captured in the L0 chain via `edge_properties`.
55
56use std::collections::{BTreeMap, HashMap, HashSet};
57use std::sync::Arc;
58use std::time::{Duration, Instant as StdInstant};
59
60use arrow_array::{BooleanArray, Int64Array, LargeBinaryArray, RecordBatch, UInt8Array};
61use arrow_schema::{DataType, Field, Schema, SchemaRef};
62use datafusion::physical_plan::PhysicalExpr;
63use tokio::runtime::Handle;
64use tracing::warn;
65use uni_common::cypher_value_codec;
66use uni_common::{Properties, UniError, Value};
67use uni_plugin::PluginRegistry;
68use uni_plugin::traits::trigger::{
69    FireMode, MutationBatch, TriggerContext, TriggerEventMask, TriggerOutcome, TriggerPhase,
70    TriggerPlugin, TriggerSubscription,
71};
72use uni_store::runtime::L0Manager;
73use uni_store::runtime::l0::L0Buffer;
74
75/// Number of distinct `TriggerPhase` variants (`BeforeMutation`,
76/// `AfterMutation`, `BeforeCommit`, `AfterCommit`).
77const PHASE_COUNT: usize = 4;
78
79/// Canonical Arrow schema for the per-row event batch handed to each
80/// `TriggerPlugin::fire` call. Kept in one place so `filter_for` and
81/// the `predicate_source` compiler agree on column names + types.
82///
83/// Also used by the CDC delivery path (M11 FU-4) so subscribers
84/// receive events in the same shape triggers do.
85pub fn event_row_schema() -> SchemaRef {
86    Arc::new(Schema::new(vec![
87        Field::new("event_kind", DataType::UInt8, false),
88        Field::new("vid_or_eid", DataType::Int64, false),
89        Field::new("label", DataType::Utf8, false),
90        Field::new("property", DataType::Utf8, false),
91        Field::new("old_value", DataType::LargeBinary, true),
92        Field::new("new_value", DataType::LargeBinary, true),
93        // Per-row property bags carrying a CypherValue-encoded
94        // `Value::Map` of the (selected) post-mutation and pre-image
95        // property values. The Cypher predicate compiler rewrites
96        // `n.foo` / `old.foo` references against these columns, which
97        // the existing `index(map, key)` UDF handles via the
98        // CypherValue codec — no bespoke map-access path required.
99        Field::new("properties_new", DataType::LargeBinary, true),
100        Field::new("properties_old", DataType::LargeBinary, true),
101    ]))
102}
103
104/// Compile a Cypher boolean expression (`predicate_source`) into a
105/// DataFusion `PhysicalExpr` that evaluates against [`event_row_schema`],
106/// together with the set of node/edge property keys the predicate
107/// references (used downstream to predicate-gate property-bag
108/// materialization).
109///
110/// Pipeline: `uni_cypher::parse_expression` → in-place AST rewrite of
111/// `n.foo` / `old.foo` into `properties_new.foo` / `properties_old.foo`
112/// → `cypher_expr_to_df` (whose property-access translator emits
113/// `index(col, "foo")` for non-graph-entity bases — the existing
114/// `index` UDF then performs map lookup on the CypherValue-encoded
115/// `LargeBinary` bag) → DataFusion `TypeCoercion` →
116/// `create_physical_expr`. Same pattern as `apply_having_filter` in
117/// `crates/uni-query/src/query/df_graph/locy_fixpoint.rs:2734-2810`,
118/// just narrowed to a single expression against a fixed schema.
119///
120/// # Errors
121///
122/// Returns an error string if the predicate fails to parse, references
123/// columns not present in the event-row schema (event-row columns or
124/// `n.<prop>` / `old.<prop>` property references), or fails type
125/// coercion.
126fn compile_predicate(source: &str) -> Result<(Arc<dyn PhysicalExpr>, HashSet<String>), String> {
127    use datafusion::common::DFSchema;
128    use datafusion::logical_expr::LogicalPlanBuilder;
129    use datafusion::optimizer::AnalyzerRule;
130    use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
131    use datafusion::physical_expr::create_physical_expr;
132    use datafusion::prelude::SessionContext;
133
134    let mut cypher_expr =
135        uni_cypher::parse_expression(source).map_err(|e| format!("parse: {e}"))?;
136    let mut props_referenced: HashSet<String> = HashSet::new();
137    rewrite_property_refs(&mut cypher_expr, &mut props_referenced);
138    let df_expr_raw = uni_query::query::df_expr::cypher_expr_to_df(&cypher_expr, None)
139        .map_err(|e| format!("translate: {e}"))?;
140
141    let schema = event_row_schema();
142    let df_schema = DFSchema::try_from(schema.as_ref().clone())
143        .map_err(|e| format!("schema-conversion: {e}"))?;
144
145    let ctx = SessionContext::new();
146    // Register Cypher UDFs (`index`, `_cypher_gt`, ...) so (a) UDF
147    // resolution below can swap placeholder `DummyUdf` nodes (which
148    // declare `return_type = Null`) for the real impls (which declare
149    // `LargeBinary` etc.), and (b) the resulting physical-expr can
150    // invoke them at evaluation time.
151    uni_query::query::df_udfs::register_cypher_udfs(&ctx)
152        .map_err(|e| format!("udf-register: {e}"))?;
153    let state = ctx.state();
154    let config = state.config_options().clone();
155    let props = state.execution_props();
156
157    // Resolve UDFs first so the type-system sees the *real* return
158    // types (e.g. `index` → LargeBinary) when the Cypher coercion pass
159    // below decides whether `LargeBinary > Int64` needs to be rewritten
160    // to `_cypher_gt`. Without this, `apply_type_coercion` sees Null and
161    // routes through the bogus cast-to-Int64 path.
162    let df_expr_resolved = resolve_dummy_udfs(df_expr_raw, &state)
163        .map_err(|e| format!("resolve-udfs (pre-coerce): {e}"))?;
164
165    // Apply Cypher-aware type coercion: rewrites `LargeBinary <op>
166    // <native>` (e.g. `index(properties_new, "balance") > 100`) into
167    // `_cypher_gt(left, right)` so the property-bag access path works
168    // for native operands.
169    let df_expr = uni_query::query::df_expr::apply_type_coercion(&df_expr_resolved, &df_schema)
170        .map_err(|e| format!("cypher-coercion: {e}"))?;
171
172    // Wrap in a Filter plan so TypeCoercion can align literals against
173    // the event-row column types (e.g. `event_kind = 1` coerces `1`
174    // from Int64 literal to UInt8 to match the column).
175    let empty = datafusion::logical_expr::LogicalPlan::EmptyRelation(
176        datafusion::logical_expr::EmptyRelation {
177            produce_one_row: false,
178            schema: Arc::new(df_schema.clone()),
179        },
180    );
181    let filter_plan = LogicalPlanBuilder::from(empty)
182        .filter(df_expr.clone())
183        .map_err(|e| format!("filter-plan: {e}"))?
184        .build()
185        .map_err(|e| format!("plan-build: {e}"))?;
186    let coerced_expr = match TypeCoercion::new().analyze(filter_plan, &config) {
187        Ok(datafusion::logical_expr::LogicalPlan::Filter(f)) => f.predicate,
188        _ => df_expr,
189    };
190
191    // Resolve placeholder `DummyUdf` scalar-function nodes (produced by
192    // `cypher_expr_to_df` / `apply_type_coercion`) into the real UDF
193    // impls registered on the SessionContext. Mirrors
194    // `QueryExecutor::resolve_udfs` (`df_planner.rs:5168`) — without
195    // this pass, `index` and `_cypher_gt` evaluation fails at runtime
196    // with "UDF '<name>' is not registered".
197    let resolved_expr =
198        resolve_dummy_udfs(coerced_expr, &state).map_err(|e| format!("resolve-udfs: {e}"))?;
199
200    let physical = create_physical_expr(&resolved_expr, &df_schema, props)
201        .map_err(|e| format!("physical-expr: {e}"))?;
202    Ok((physical, props_referenced))
203}
204
205/// Walk `expr` and replace every `ScalarFunction` whose name matches a
206/// UDF registered on `state.scalar_functions()` with the registered
207/// implementation. The Cypher translator (`cypher_expr_to_df`) emits
208/// placeholder `DummyUdf` wrappers carrying only the name; the real
209/// `IndexUdf` / `_cypher_gt` / ... impls live on the SessionContext.
210fn resolve_dummy_udfs(
211    expr: datafusion::logical_expr::Expr,
212    state: &datafusion::execution::SessionState,
213) -> Result<datafusion::logical_expr::Expr, String> {
214    use datafusion::common::tree_node::{Transformed, TreeNode};
215    use datafusion::logical_expr::Expr as DfExpr;
216
217    let result = expr
218        .transform_up(|node| {
219            if let DfExpr::ScalarFunction(ref func) = node {
220                let udf_name = func.func.name();
221                if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
222                    return Ok(Transformed::yes(DfExpr::ScalarFunction(
223                        datafusion::logical_expr::expr::ScalarFunction {
224                            func: registered_udf.clone(),
225                            args: func.args.clone(),
226                        },
227                    )));
228                }
229            }
230            Ok(Transformed::no(node))
231        })
232        .map_err(|e| format!("udf-resolve walk: {e}"))?;
233    Ok(result.data)
234}
235
236/// Walk a parsed Cypher expression and rewrite property references on
237/// the canonical entity aliases (`n` for the post-mutation row,
238/// `old` for the pre-image) so they resolve against the per-row
239/// `properties_new` / `properties_old` columns of [`event_row_schema`].
240///
241/// `n.foo` → `properties_new.foo` (translates downstream to
242/// `index(col("properties_new"), "foo")` via the standard
243/// non-graph-entity property-access path in `cypher_expr_to_df`).
244/// `old.foo` → `properties_old.foo`. All referenced property names
245/// are collected into `referenced` for predicate-gated materialization
246/// in [`MutationEvents::from_l0_with_probe`].
247///
248/// Other Cypher expressions are walked recursively so a predicate like
249/// `n.balance > 100 AND old.status <> n.status` is fully rewritten.
250fn rewrite_property_refs(expr: &mut uni_cypher::ast::Expr, referenced: &mut HashSet<String>) {
251    use uni_cypher::ast::Expr;
252    match expr {
253        Expr::Property(base, prop) => {
254            // First recurse into the base — supports chained access like
255            // `n.address.city` (the inner `n.address` is rewritten to
256            // `properties_new.address`, then `index(...)` chains).
257            rewrite_property_refs(base, referenced);
258            if let Expr::Variable(name) = base.as_ref() {
259                match name.as_str() {
260                    "n" => {
261                        referenced.insert(prop.clone());
262                        **base = Expr::Variable("properties_new".to_owned());
263                    }
264                    "old" => {
265                        referenced.insert(prop.clone());
266                        **base = Expr::Variable("properties_old".to_owned());
267                    }
268                    _ => {}
269                }
270            }
271        }
272        Expr::BinaryOp { left, right, .. } => {
273            rewrite_property_refs(left, referenced);
274            rewrite_property_refs(right, referenced);
275        }
276        Expr::UnaryOp { expr: inner, .. } => rewrite_property_refs(inner, referenced),
277        Expr::FunctionCall { args, .. } => {
278            for a in args {
279                rewrite_property_refs(a, referenced);
280            }
281        }
282        Expr::Case {
283            expr: case_expr,
284            when_then,
285            else_expr,
286        } => {
287            if let Some(e) = case_expr.as_deref_mut() {
288                rewrite_property_refs(e, referenced);
289            }
290            for (w, t) in when_then {
291                rewrite_property_refs(w, referenced);
292                rewrite_property_refs(t, referenced);
293            }
294            if let Some(e) = else_expr.as_deref_mut() {
295                rewrite_property_refs(e, referenced);
296            }
297        }
298        Expr::IsNull(inner) | Expr::IsNotNull(inner) | Expr::IsUnique(inner) => {
299            rewrite_property_refs(inner, referenced);
300        }
301        Expr::In { expr: e, list } => {
302            rewrite_property_refs(e, referenced);
303            rewrite_property_refs(list, referenced);
304        }
305        Expr::List(items) => {
306            for i in items {
307                rewrite_property_refs(i, referenced);
308            }
309        }
310        Expr::Map(pairs) => {
311            for (_, v) in pairs {
312                rewrite_property_refs(v, referenced);
313            }
314        }
315        Expr::ArrayIndex { array, index } => {
316            rewrite_property_refs(array, referenced);
317            rewrite_property_refs(index, referenced);
318        }
319        Expr::ArraySlice { array, start, end } => {
320            rewrite_property_refs(array, referenced);
321            if let Some(s) = start.as_deref_mut() {
322                rewrite_property_refs(s, referenced);
323            }
324            if let Some(e) = end.as_deref_mut() {
325                rewrite_property_refs(e, referenced);
326            }
327        }
328        // Literal / Parameter / Variable / Wildcard / subquery variants
329        // do not carry rewritable property refs at the surface level.
330        _ => {}
331    }
332}
333
334fn phase_index(p: TriggerPhase) -> usize {
335    // `TriggerPhase` is `#[non_exhaustive]` — fall back to BeforeMutation
336    // bucket so a future variant can't silently slot into an existing
337    // route's phase by accident.
338    match p {
339        TriggerPhase::BeforeMutation => 0,
340        TriggerPhase::AfterMutation => 1,
341        TriggerPhase::BeforeCommit => 2,
342        TriggerPhase::AfterCommit => 3,
343        _ => 0,
344    }
345}
346
347/// A single route in the per-phase dispatch table.
348struct RouteEntry {
349    plugin: Arc<dyn TriggerPlugin>,
350    name: String,
351    event_mask: u32,
352    label_filter: Option<Vec<String>>,
353    edge_type_filter: Option<Vec<String>>,
354    property_filter: Option<Vec<String>>,
355    fire_mode: FireMode,
356    /// Compiled `predicate_source` expression, evaluated per-row in
357    /// `filter_for` to drop rows where the predicate is false. `None`
358    /// when the subscription has no predicate. The compile is done
359    /// once per [`TriggerRouter::from_registry`] call.
360    compiled_predicate: Option<Arc<dyn PhysicalExpr>>,
361    /// Property names that the compiled predicate references via
362    /// `n.<prop>` or `old.<prop>`. Used to predicate-gate the
363    /// property-bag materialization in
364    /// [`MutationEvents::from_l0_with_probe`] — when this set is
365    /// empty the event-row pipeline does no per-property work for
366    /// this route.
367    properties_referenced: HashSet<String>,
368}
369
370impl RouteEntry {
371    fn matches(&self, kind: TriggerEventMask, label_or_type: &str) -> bool {
372        if (self.event_mask & kind.0) == 0 {
373            return false;
374        }
375        if let Some(ref labels) = self.label_filter
376            && kind_is_node(kind)
377            && !labels.iter().any(|l| l.as_str() == label_or_type)
378        {
379            return false;
380        }
381        if let Some(ref ets) = self.edge_type_filter
382            && kind_is_edge(kind)
383            && !ets.iter().any(|e| e.as_str() == label_or_type)
384        {
385            return false;
386        }
387        true
388    }
389}
390
391fn kind_is_node(kind: TriggerEventMask) -> bool {
392    let mask = TriggerEventMask::NODE_CREATE
393        .union(TriggerEventMask::NODE_UPDATE)
394        .union(TriggerEventMask::NODE_DELETE)
395        .union(TriggerEventMask::LABEL_ADDED)
396        .union(TriggerEventMask::LABEL_REMOVED);
397    (kind.0 & mask.0) != 0
398}
399
400fn kind_is_edge(kind: TriggerEventMask) -> bool {
401    let mask = TriggerEventMask::EDGE_CREATE
402        .union(TriggerEventMask::EDGE_UPDATE)
403        .union(TriggerEventMask::EDGE_DELETE);
404    (kind.0 & mask.0) != 0
405}
406
407/// Per-commit trigger dispatcher.
408pub struct TriggerRouter {
409    by_phase: [Vec<RouteEntry>; PHASE_COUNT],
410    /// Per-`Uni` deferral queue. `None` for read-only / test setups
411    /// without a queue — `TriggerOutcome::Defer` then falls back to
412    /// the legacy warn-and-collapse behavior.
413    defer_queue: Option<Arc<DeferralQueue>>,
414}
415
416impl TriggerRouter {
417    /// Snapshot the registered triggers into a routing table.
418    ///
419    /// Cheap for predicate-less subscriptions (one `Arc` clone for the
420    /// trigger vector, then one pass to bucket by phase). For
421    /// subscriptions carrying `predicate_source`, compiles the Cypher
422    /// predicate into a DataFusion `PhysicalExpr` once and stashes it
423    /// on the route — sub-millisecond per predicate, amortized against
424    /// commit overhead.
425    ///
426    /// # Errors
427    ///
428    /// Returns [`UniError::TriggerRejected`] (with a descriptive
429    /// `reason`) if any subscription's `predicate_source` fails to
430    /// parse, references unknown columns, or fails type coercion. The
431    /// error surfaces at commit time, not at registration — this is a
432    /// deliberate trade-off to keep `uni-plugin` free of a `uni-cypher`
433    /// dependency.
434    pub fn from_registry(reg: &PluginRegistry) -> Result<Self, UniError> {
435        Self::from_registry_with_queue(reg, None)
436    }
437
438    /// Variant that wires in a per-`Uni` deferral queue so
439    /// `TriggerOutcome::Defer` enqueues for re-firing instead of
440    /// being warned and dropped.
441    ///
442    /// # Errors
443    ///
444    /// Same as [`Self::from_registry`].
445    pub fn from_registry_with_queue(
446        reg: &PluginRegistry,
447        defer_queue: Option<Arc<DeferralQueue>>,
448    ) -> Result<Self, UniError> {
449        let triggers = reg.triggers();
450        let mut by_phase: [Vec<RouteEntry>; PHASE_COUNT] =
451            [Vec::new(), Vec::new(), Vec::new(), Vec::new()];
452        for plugin in triggers.iter() {
453            let sub: &TriggerSubscription = plugin.subscription();
454            let name = subscription_name(sub);
455            let (compiled_predicate, properties_referenced) = match sub.predicate_source.as_deref()
456            {
457                Some(src) => {
458                    let (expr, refs) =
459                        compile_predicate(src).map_err(|e| UniError::TriggerRejected {
460                            trigger: name.clone(),
461                            reason: format!(
462                                "predicate_source compile failed: {e}. \
463                                 Supported references: event-row columns \
464                                 (event_kind, vid_or_eid, label, property, \
465                                 old_value, new_value) and entity property \
466                                 references `n.<prop>` (post-mutation) / \
467                                 `old.<prop>` (pre-image)."
468                            ),
469                        })?;
470                    (Some(expr), refs)
471                }
472                None => (None, HashSet::new()),
473            };
474            let entry = RouteEntry {
475                plugin: Arc::clone(plugin),
476                name,
477                event_mask: sub.events.0,
478                label_filter: sub
479                    .labels
480                    .as_ref()
481                    .map(|v| v.iter().map(|s| s.to_string()).collect()),
482                edge_type_filter: sub
483                    .edge_types
484                    .as_ref()
485                    .map(|v| v.iter().map(|s| s.to_string()).collect()),
486                property_filter: sub
487                    .properties
488                    .as_ref()
489                    .map(|v| v.iter().map(|s| s.to_string()).collect()),
490                fire_mode: sub.fire_mode,
491                compiled_predicate,
492                properties_referenced,
493            };
494            by_phase[phase_index(sub.phase)].push(entry);
495        }
496        Ok(Self {
497            by_phase,
498            defer_queue,
499        })
500    }
501
502    /// True if no triggers are registered at any phase.
503    #[must_use]
504    pub fn is_empty(&self) -> bool {
505        self.by_phase.iter().all(|v| v.is_empty())
506    }
507
508    /// Union of node/edge property names that any compiled trigger
509    /// predicate references (across all phases). Empty when no
510    /// trigger has a `predicate_source` mentioning `n.<prop>` /
511    /// `old.<prop>`. Drives predicate-gated property-bag
512    /// materialization in [`MutationEvents::from_l0_with_probe`].
513    #[must_use]
514    pub fn properties_referenced(&self) -> HashSet<String> {
515        let mut out: HashSet<String> = HashSet::new();
516        for routes in &self.by_phase {
517            for entry in routes {
518                for p in &entry.properties_referenced {
519                    out.insert(p.clone());
520                }
521            }
522        }
523        out
524    }
525
526    /// Fire `BeforeMutation` then `BeforeCommit` phases in order.
527    ///
528    /// Returns `Err(UniError::TriggerRejected)` if a `Synchronous`
529    /// trigger returns `TriggerOutcome::Reject` or `Err`. `Async` /
530    /// `EventualConsistency` triggers are ignored at before-phases
531    /// (they ride on after-phases only — firing async work pre-commit
532    /// would let it observe a transaction that subsequently aborts).
533    ///
534    /// # Errors
535    ///
536    /// `UniError::TriggerRejected` on reject or fire error.
537    pub fn dispatch_before(
538        &self,
539        ctx: TriggerContext<'_>,
540        events: &MutationEvents,
541    ) -> Result<(), UniError> {
542        for &phase in &[TriggerPhase::BeforeMutation, TriggerPhase::BeforeCommit] {
543            let routes = &self.by_phase[phase_index(phase)];
544            for entry in routes {
545                if !matches!(entry.fire_mode, FireMode::Synchronous) {
546                    continue;
547                }
548                let Some(batch) = events.filter_for(entry) else {
549                    continue;
550                };
551                let mb = MutationBatch {
552                    events: Arc::new(batch),
553                };
554                let ctx_ref = TriggerContext::new(ctx.session_id, ctx.tx_id);
555                match entry.plugin.fire(ctx_ref, &mb) {
556                    Ok(TriggerOutcome::Continue) => {}
557                    Ok(TriggerOutcome::Reject { reason }) => {
558                        return Err(UniError::TriggerRejected {
559                            trigger: entry.name.to_string(),
560                            reason,
561                        });
562                    }
563                    Ok(TriggerOutcome::Defer { until }) => {
564                        // Memory-backed in-process deferral. FU-5 adds
565                        // an optional `delay` to `TriggerDeferral`;
566                        // `None` re-fires on the next queue tick,
567                        // `Some(d)` schedules at `now + d`.
568                        enqueue_deferral(
569                            &self.defer_queue,
570                            Arc::clone(&entry.plugin),
571                            entry.name.clone(),
572                            mb.clone(),
573                            ctx.session_id.to_owned(),
574                            ctx.tx_id,
575                            until,
576                        );
577                    }
578                    Ok(_) => {
579                        // `TriggerOutcome` is `#[non_exhaustive]`; an
580                        // unrecognised future variant is conservatively
581                        // treated as Continue.
582                    }
583                    Err(e) => {
584                        return Err(UniError::TriggerRejected {
585                            trigger: entry.name.to_string(),
586                            reason: e.to_string(),
587                        });
588                    }
589                }
590            }
591        }
592        Ok(())
593    }
594
595    /// Fire `AfterMutation` then `AfterCommit` phases. Cannot abort.
596    ///
597    /// `Synchronous` after-phase triggers run inline (panics caught and
598    /// logged). `Async` triggers are spawned on `runtime`.
599    /// `EventualConsistency` triggers are spawned the same as `Async`
600    /// in v1 (a real batched queue lands with M5g).
601    pub fn dispatch_after(
602        &self,
603        ctx: TriggerContext<'_>,
604        events: &MutationEvents,
605        runtime: &Handle,
606    ) {
607        for &phase in &[TriggerPhase::AfterMutation, TriggerPhase::AfterCommit] {
608            let routes = &self.by_phase[phase_index(phase)];
609            for entry in routes {
610                let Some(batch) = events.filter_for(entry) else {
611                    continue;
612                };
613                let mb = MutationBatch {
614                    events: Arc::new(batch),
615                };
616                match entry.fire_mode {
617                    FireMode::Synchronous => {
618                        fire_caught(entry, ctx.session_id, ctx.tx_id, &mb, &self.defer_queue);
619                    }
620                    // `FireMode::Async`, `EventualConsistency`, and any
621                    // future variant land on the spawn path. v1 collapses
622                    // EventualConsistency onto Async (no batched queue);
623                    // M5g adds the real queue.
624                    _ => {
625                        let plugin = Arc::clone(&entry.plugin);
626                        let name = entry.name.clone();
627                        let session_id = ctx.session_id.to_owned();
628                        let tx_id = ctx.tx_id;
629                        let queue = self.defer_queue.clone();
630                        runtime.spawn(async move {
631                            let mb_inner = mb;
632                            let result =
633                                std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
634                                    plugin.fire(TriggerContext::new(&session_id, tx_id), &mb_inner)
635                                }));
636                            handle_fire_outcome(result, &name, "async trigger", |until| {
637                                enqueue_deferral(
638                                    &queue,
639                                    Arc::clone(&plugin),
640                                    name.clone(),
641                                    mb_inner,
642                                    session_id.clone(),
643                                    tx_id,
644                                    until,
645                                );
646                            });
647                        });
648                    }
649                }
650            }
651        }
652    }
653}
654
655/// Enqueue a [`TriggerOutcome::Defer`] result into the host's
656/// in-memory [`DeferralQueue`]. When no queue is wired (read-only or
657/// test setups) the item is dropped with a warn — matches the legacy
658/// fallback behavior.
659///
660/// The fire instant honors `until.delay` (FU-5); `None` collapses to
661/// "now" so the item fires on the next tick.
662fn enqueue_deferral(
663    queue: &Option<Arc<DeferralQueue>>,
664    plugin: Arc<dyn TriggerPlugin>,
665    name: String,
666    mb: MutationBatch,
667    session_id: String,
668    tx_id: u64,
669    until: uni_plugin::traits::trigger::TriggerDeferral,
670) {
671    let Some(queue) = queue else {
672        warn!(trigger = %name, "Defer with no queue wired; dropping");
673        return;
674    };
675    let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
676    queue.push(
677        DeferredItem {
678            plugin,
679            name,
680            batch: mb,
681            session_id,
682            tx_id,
683            attempts: 0,
684            payload: until.payload,
685        },
686        fire_at,
687    );
688}
689
690/// Dispatch the result of a `catch_unwind`-wrapped trigger fire.
691///
692/// All three fire paths (`dispatch_after`'s spawned task, [`fire_caught`], and
693/// [`DeferralQueue::tick`]) share the same four-way ladder:
694/// `Ok(Ok(Defer))` / `Ok(Ok(_))` (Continue/Reject/future) / `Ok(Err)` (the
695/// plugin errored) / `Err` (the plugin panicked). They differ only in the log
696/// `label` and what to do on a `Defer` — captured by `on_defer`.
697fn handle_fire_outcome<E: std::fmt::Display>(
698    outcome: Result<Result<TriggerOutcome, E>, Box<dyn std::any::Any + Send>>,
699    name: &str,
700    label: &str,
701    on_defer: impl FnOnce(uni_plugin::traits::trigger::TriggerDeferral),
702) {
703    match outcome {
704        Ok(Ok(TriggerOutcome::Defer { until })) => on_defer(until),
705        Ok(Ok(_)) => {}
706        Ok(Err(e)) => warn!(trigger = %name, error = %e, "{label} errored"),
707        Err(_) => warn!(trigger = %name, "{label} panicked"),
708    }
709}
710
711fn fire_caught(
712    entry: &RouteEntry,
713    session_id: &str,
714    tx_id: u64,
715    mb: &MutationBatch,
716    defer_queue: &Option<Arc<DeferralQueue>>,
717) {
718    let plugin = Arc::clone(&entry.plugin);
719    let name = entry.name.clone();
720    let mb_clone = mb.clone();
721    let session_id_owned = session_id.to_owned();
722    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
723        plugin.fire(TriggerContext::new(&session_id_owned, tx_id), &mb_clone)
724    }));
725    handle_fire_outcome(result, &name, "after-phase trigger", |until| {
726        enqueue_deferral(
727            defer_queue,
728            plugin,
729            name.clone(),
730            mb_clone,
731            session_id_owned,
732            tx_id,
733            until,
734        );
735    });
736}
737
738fn subscription_name(sub: &TriggerSubscription) -> String {
739    // `TriggerSubscription` carries no explicit name field; use the
740    // first line of the docs as a stable identifier, falling back to
741    // a generic label. Keeps `UniError::TriggerRejected` human-readable
742    // without an ABI bump for a name field on the subscription struct.
743    sub.docs
744        .lines()
745        .next()
746        .map(str::to_owned)
747        .unwrap_or_else(|| "<unnamed trigger>".to_owned())
748}
749
750// ── Mutation event extraction ──────────────────────────────────────
751
752/// In-memory, untyped event log drained from `tx_l0`. Held by value
753/// across the commit boundary and filtered per-route on dispatch.
754pub struct MutationEvents {
755    rows: Vec<MutationRow>,
756}
757
758struct MutationRow {
759    event_kind: TriggerEventMask,
760    vid_or_eid: i64,
761    /// For NODE_* events: the affected label (one row per label).
762    /// For EDGE_* events: the edge type.
763    label_or_type: String,
764    /// Pre-image properties when known (probe was supplied and the
765    /// vertex/edge existed before this tx); `None` otherwise. The
766    /// `BeforeCommit` dispatch path serializes this into the
767    /// `old_value` Arrow column.
768    old_value: Option<Vec<u8>>,
769    /// Post-mutation property map filtered to the predicate-referenced
770    /// keys; serialized into the `properties_new` LargeBinary column.
771    /// `None` when no trigger references any property.
772    new_properties: Option<Properties>,
773    /// Pre-image property map filtered to the predicate-referenced
774    /// keys; serialized into the `properties_old` LargeBinary column.
775    /// `None` when no trigger references any property or the entity
776    /// did not pre-exist.
777    old_properties: Option<Properties>,
778}
779
780/// Snapshot of the committed graph state used to (a) distinguish
781/// CREATE from UPDATE in [`MutationEvents::from_l0_with_probe`] and
782/// (b) populate `old_value` for vertex and edge mutation events.
783///
784/// Built once per commit. The cheap [`Self::from_l0_chain`] scans the
785/// writer's `L0Manager` (current L0 + pending-flush L0s) — no I/O,
786/// runs before the writer write lock is acquired. The richer
787/// [`Self::extend_with_l1`] adds an async L1 storage probe for VIDs
788/// not found in the L0 chain — closes the gap where a vertex flushed
789/// to L1 in a previous commit would otherwise be misclassified as
790/// `NODE_CREATE` on its next mutation. The L1 probe also projects
791/// every property column on the target label so the resulting
792/// `old_value` carries the same pre-image fidelity as the L0-chain
793/// path. Edge pre-images are captured via the L0 chain's
794/// `edge_properties` snapshot.
795#[derive(Default)]
796pub struct PreExistingProbe {
797    /// VIDs known to exist in committed state (with their pre-image
798    /// properties, when captured — populated by L0 probe; empty
799    /// `Properties` map when added by L1 existence probe).
800    vertices: HashMap<uni_common::Vid, Properties>,
801    /// EIDs known to exist in committed state (with their pre-image
802    /// properties, when captured — populated by L0 probe). The map
803    /// uses `Properties::default()` for entries added through an
804    /// existence-only path.
805    edges: HashMap<uni_common::Eid, Properties>,
806}
807
808impl PreExistingProbe {
809    /// Build a probe by scanning the current L0 + pending-flush L0s
810    /// for vertices/edges referenced in `tx_l0`. Properties are cloned
811    /// from the committed L0 chain.
812    ///
813    /// Only mutations actually present in `tx_l0` are probed — keeping
814    /// the work proportional to the commit's mutation count rather
815    /// than to the total graph size.
816    #[must_use]
817    pub fn from_l0_chain(l0_manager: &L0Manager, tx_l0: &L0Buffer) -> Self {
818        let mut vertices: HashMap<uni_common::Vid, Properties> = HashMap::new();
819        let mut edges: HashMap<uni_common::Eid, Properties> = HashMap::new();
820
821        let candidate_vids: Vec<uni_common::Vid> = tx_l0
822            .vertex_properties
823            .keys()
824            .copied()
825            .chain(tx_l0.vertex_tombstones.iter().copied())
826            .collect();
827        let candidate_eids: Vec<uni_common::Eid> = tx_l0
828            .edge_endpoints
829            .keys()
830            .copied()
831            .chain(tx_l0.tombstones.keys().copied())
832            .collect();
833
834        let mut probe_buffer = |buf: &L0Buffer| {
835            for vid in &candidate_vids {
836                if vertices.contains_key(vid) {
837                    continue;
838                }
839                if buf.vertex_tombstones.contains(vid) {
840                    continue;
841                }
842                if let Some(props) = buf.vertex_properties.get(vid) {
843                    vertices.insert(*vid, props.clone());
844                }
845            }
846            for eid in &candidate_eids {
847                if edges.contains_key(eid) {
848                    continue;
849                }
850                if buf.tombstones.contains_key(eid) {
851                    continue;
852                }
853                if buf.edge_endpoints.contains_key(eid) {
854                    let props = buf.edge_properties.get(eid).cloned().unwrap_or_default();
855                    edges.insert(*eid, props);
856                }
857            }
858        };
859
860        {
861            let current = l0_manager.get_current();
862            let g = current.read();
863            probe_buffer(&g);
864        }
865        for pending in l0_manager.get_pending_flush() {
866            let g = pending.read();
867            probe_buffer(&g);
868        }
869
870        Self { vertices, edges }
871    }
872
873    /// Snapshot the (vid, label) pairs that should be probed against
874    /// L1 storage — VIDs in `tx_l0` not already marked pre-existing
875    /// by the L0 chain. Sync — must run under the `tx_l0` read lock.
876    /// Returned vector is sized by chunked-IN-list quota, ready to
877    /// hand to [`Self::extend_with_l1`] outside the lock.
878    #[must_use]
879    pub fn pending_l1_candidates(&self, tx_l0: &L0Buffer) -> Vec<(uni_common::Vid, String)> {
880        let mut out: Vec<(uni_common::Vid, String)> = Vec::new();
881        for vid in tx_l0
882            .vertex_properties
883            .keys()
884            .chain(tx_l0.vertex_tombstones.iter())
885        {
886            if self.vertices.contains_key(vid) {
887                continue;
888            }
889            let label = tx_l0
890                .vertex_labels
891                .get(vid)
892                .and_then(|labels| labels.first())
893                .cloned();
894            if let Some(label) = label {
895                out.push((*vid, label));
896            }
897        }
898        out
899    }
900
901    /// Extend an existing probe with an L1 storage scan for the
902    /// supplied `(vid, label)` candidates (typically the output of
903    /// [`Self::pending_l1_candidates`]). Async — runs outside the
904    /// tx_l0 read lock.
905    ///
906    /// Groups candidates by label, chunks each group into 1024-VID
907    /// batches, and issues one `scan_vertex_table` per chunk with a
908    /// `_vid IN (…)` filter — bounded I/O proportional to the
909    /// commit's mutation count, not the graph size. For every
910    /// returned VID, every non-vid column is converted via
911    /// [`uni_store::storage::arrow_convert::arrow_to_value`] and
912    /// stashed as the pre-image `Properties` map; this populates the
913    /// `old_value` column on `NODE_UPDATE` / `NODE_DELETE` events
914    /// emitted by [`MutationEvents::from_l0_with_probe`] for vertices
915    /// that were only visible after the last L0 flush.
916    ///
917    /// # Errors
918    ///
919    /// Per-chunk scan errors are logged and ignored — the L0 probe
920    /// already captured the high-fidelity subset, so a failed L1
921    /// probe degrades to "L1 vertices are misclassified as CREATE"
922    /// rather than failing the commit.
923    pub async fn extend_with_l1(
924        &mut self,
925        candidates: Vec<(uni_common::Vid, String)>,
926        storage: &uni_store::storage::manager::StorageManager,
927    ) {
928        use arrow_array::Array;
929        use std::collections::HashMap as StdHashMap;
930        use uni_store::storage::arrow_convert::arrow_to_value;
931        const CHUNK_SIZE: usize = 1024;
932
933        let mut by_label: StdHashMap<String, Vec<uni_common::Vid>> = StdHashMap::new();
934        for (vid, label) in candidates {
935            by_label.entry(label).or_default().push(vid);
936        }
937
938        for (label, vids) in by_label {
939            // Discover the table's full column set once per label so
940            // we can request every property (not just `_vid`).
941            let table_name = uni_store::backend::table_names::vertex_table_name(&label);
942            let column_names: Vec<String> =
943                match storage.backend().get_table_schema(&table_name).await {
944                    Ok(Some(schema)) => schema.fields().iter().map(|f| f.name().clone()).collect(),
945                    Ok(None) => {
946                        // Table absent: nothing to probe.
947                        continue;
948                    }
949                    Err(e) => {
950                        warn!(label = %label, error = %e, "L1 pre-image probe: \
951                          schema lookup failed; vids fall back to CREATE");
952                        continue;
953                    }
954                };
955            // Always include `_vid`; the column-filter inside
956            // `scan_vertex_table` is permissive about missing columns,
957            // so passing every name from the schema is safe.
958            let col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
959
960            for chunk in vids.chunks(CHUNK_SIZE) {
961                let in_list = chunk
962                    .iter()
963                    .map(|v| v.as_u64().to_string())
964                    .collect::<Vec<_>>()
965                    .join(",");
966                let filter = format!("_vid IN ({in_list})");
967                let batch = match storage
968                    .scan_vertex_table(&label, &col_refs, Some(&filter))
969                    .await
970                {
971                    Ok(Some(b)) => b,
972                    Ok(None) => continue,
973                    Err(e) => {
974                        warn!(label = %label, error = %e, "L1 pre-image probe failed; \
975                              affected vids fall back to NODE_CREATE classification");
976                        continue;
977                    }
978                };
979                let vid_col = match batch
980                    .column_by_name("_vid")
981                    .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
982                {
983                    Some(c) => c,
984                    None => {
985                        warn!(label = %label, "L1 probe returned batch without _vid column");
986                        continue;
987                    }
988                };
989                // Cache (column_index, column_name) pairs for the
990                // per-row property assembly. Skip storage-internal
991                // columns (`_vid`, `_version`, `_label`) — user
992                // properties are everything else.
993                let schema = batch.schema();
994                let property_cols: Vec<(usize, String)> = schema
995                    .fields()
996                    .iter()
997                    .enumerate()
998                    .filter_map(|(idx, f)| {
999                        let name = f.name();
1000                        if name == "_vid"
1001                            || name == "_version"
1002                            || name == "_label"
1003                            || name == "_labels"
1004                        {
1005                            None
1006                        } else {
1007                            Some((idx, name.clone()))
1008                        }
1009                    })
1010                    .collect();
1011
1012                for row in 0..vid_col.len() {
1013                    if vid_col.is_null(row) {
1014                        continue;
1015                    }
1016                    let raw = vid_col.value(row);
1017                    let vid = uni_common::Vid::from(raw);
1018                    let mut props = Properties::new();
1019                    for (col_idx, col_name) in &property_cols {
1020                        let col = batch.column(*col_idx);
1021                        let value = arrow_to_value(col.as_ref(), row, None);
1022                        if !matches!(value, uni_common::Value::Null) {
1023                            props.insert(col_name.clone(), value);
1024                        }
1025                    }
1026                    // First insert wins: L0-chain entries always come
1027                    // first and may already hold richer pre-image data.
1028                    self.vertices.entry(vid).or_insert(props);
1029                }
1030            }
1031        }
1032    }
1033
1034    /// True if `vid` was visible in committed state before this tx.
1035    #[must_use]
1036    pub fn vertex_pre_existed(&self, vid: uni_common::Vid) -> bool {
1037        self.vertices.contains_key(&vid)
1038    }
1039
1040    /// True if `eid` was visible in committed state before this tx.
1041    #[must_use]
1042    pub fn edge_pre_existed(&self, eid: uni_common::Eid) -> bool {
1043        self.edges.contains_key(&eid)
1044    }
1045
1046    fn edge_old_bytes(&self, eid: uni_common::Eid) -> Option<Vec<u8>> {
1047        self.edges.get(&eid).map(serialize_properties)
1048    }
1049
1050    fn vertex_old_bytes(&self, vid: uni_common::Vid) -> Option<Vec<u8>> {
1051        self.vertices.get(&vid).map(serialize_properties)
1052    }
1053
1054    /// Borrow the captured pre-image properties for `vid`, when the
1055    /// vertex pre-existed in committed state. Used by
1056    /// [`MutationEvents::from_l0_with_probe`] to populate the
1057    /// `properties_old` event-row column with the subset of keys any
1058    /// trigger predicate references.
1059    #[must_use]
1060    pub fn vertex_properties(&self, vid: uni_common::Vid) -> Option<&Properties> {
1061        self.vertices.get(&vid)
1062    }
1063
1064    /// Borrow the captured pre-image properties for `eid`, when the
1065    /// edge pre-existed in committed state.
1066    #[must_use]
1067    pub fn edge_properties(&self, eid: uni_common::Eid) -> Option<&Properties> {
1068        self.edges.get(&eid)
1069    }
1070}
1071
1072/// Serialize a `Properties` map into a stable byte representation for
1073/// the trigger event row's `old_value` column. Uses JSON for now —
1074/// matches the codec other plugin surfaces use for `CypherValue`
1075/// payloads and keeps the bytes inspectable in trigger plugins
1076/// without pulling a bespoke decoder.
1077fn serialize_properties(props: &Properties) -> Vec<u8> {
1078    serde_json::to_vec(props).unwrap_or_default()
1079}
1080
1081impl MutationEvents {
1082    /// Drain the tx-private L0 buffer into a typed event log without a
1083    /// committed-state probe. Every non-tombstoned write emits an
1084    /// `UPDATE` event; `old_value` is `None`. Equivalent to
1085    /// [`Self::from_l0_with_probe`] with `probe = None`.
1086    #[must_use]
1087    pub fn from_l0(l0: &L0Buffer) -> Self {
1088        Self::from_l0_with_probe(l0, None, &HashSet::new())
1089    }
1090
1091    /// Drain the tx-private L0 buffer into a typed event log.
1092    ///
1093    /// When `probe` is supplied, the probe distinguishes CREATE from
1094    /// UPDATE per-VID/EID and supplies the pre-image bytes used to
1095    /// populate `old_value` for `BeforeCommit` triggers. When `probe`
1096    /// is `None`, every write emits `UPDATE` and `old_value` stays
1097    /// `None` (legacy behavior — kept for callers that don't yet
1098    /// build a probe).
1099    ///
1100    /// Multi-label vertices emit one row per label so a label-filtered
1101    /// trigger fires exactly once per (vid, matching-label) pair.
1102    /// Vertices with no labels emit a single row with an empty label
1103    /// so unfiltered triggers still observe them.
1104    #[must_use]
1105    pub fn from_l0_with_probe(
1106        l0: &L0Buffer,
1107        probe: Option<&PreExistingProbe>,
1108        properties_referenced: &HashSet<String>,
1109    ) -> Self {
1110        let mut rows: Vec<MutationRow> = Vec::with_capacity(l0.mutation_count);
1111        let track_props = !properties_referenced.is_empty();
1112
1113        // Extract the subset of `props` whose keys appear in
1114        // `properties_referenced`. Returns `None` when nothing is
1115        // tracked or no referenced key is present, keeping the column
1116        // null for property-free triggers.
1117        let filtered = |props: &Properties| -> Option<Properties> {
1118            if !track_props {
1119                return None;
1120            }
1121            let mut out: Properties = Properties::new();
1122            for k in properties_referenced {
1123                if let Some(v) = props.get(k) {
1124                    out.insert(k.clone(), v.clone());
1125                }
1126            }
1127            // Always emit the (possibly empty) bag when properties are
1128            // tracked so the predicate sees a Map rather than NULL
1129            // (which would short-circuit `index` to NULL and risk
1130            // type-coercion surprises in `>` / `<>` comparisons).
1131            Some(out)
1132        };
1133
1134        // Vertex writes — CREATE if the probe says the vid didn't
1135        // pre-exist, UPDATE otherwise. Legacy callers with no probe
1136        // get UPDATE for every write.
1137        for (vid, props) in &l0.vertex_properties {
1138            if l0.vertex_tombstones.contains(vid) {
1139                continue;
1140            }
1141            let id = vid_to_i64(*vid);
1142            let labels = l0.vertex_labels.get(vid);
1143            let (kind, old, old_props_map) = match probe {
1144                Some(p) if p.vertex_pre_existed(*vid) => (
1145                    TriggerEventMask::NODE_UPDATE,
1146                    p.vertex_old_bytes(*vid),
1147                    p.vertex_properties(*vid).and_then(&filtered),
1148                ),
1149                Some(_) => (TriggerEventMask::NODE_CREATE, None, None),
1150                None => (TriggerEventMask::NODE_UPDATE, None, None),
1151            };
1152            let new_props_map = filtered(props);
1153            match labels {
1154                Some(ls) if !ls.is_empty() => {
1155                    for l in ls {
1156                        rows.push(MutationRow {
1157                            event_kind: kind,
1158                            vid_or_eid: id,
1159                            label_or_type: l.clone(),
1160                            old_value: old.clone(),
1161                            new_properties: new_props_map.clone(),
1162                            old_properties: old_props_map.clone(),
1163                        });
1164                    }
1165                }
1166                _ => {
1167                    rows.push(MutationRow {
1168                        event_kind: kind,
1169                        vid_or_eid: id,
1170                        label_or_type: String::new(),
1171                        old_value: old,
1172                        new_properties: new_props_map,
1173                        old_properties: old_props_map,
1174                    });
1175                }
1176            }
1177        }
1178
1179        // Vertex deletes. `old_value` is the pre-tx property image when
1180        // the probe captured it (the row is about to disappear).
1181        for vid in &l0.vertex_tombstones {
1182            let id = vid_to_i64(*vid);
1183            let labels = l0.vertex_labels.get(vid);
1184            let old = probe.and_then(|p| p.vertex_old_bytes(*vid));
1185            let old_props_map = probe
1186                .and_then(|p| p.vertex_properties(*vid))
1187                .and_then(&filtered);
1188            match labels {
1189                Some(ls) if !ls.is_empty() => {
1190                    for l in ls {
1191                        rows.push(MutationRow {
1192                            event_kind: TriggerEventMask::NODE_DELETE,
1193                            vid_or_eid: id,
1194                            label_or_type: l.clone(),
1195                            old_value: old.clone(),
1196                            new_properties: None,
1197                            old_properties: old_props_map.clone(),
1198                        });
1199                    }
1200                }
1201                _ => {
1202                    rows.push(MutationRow {
1203                        event_kind: TriggerEventMask::NODE_DELETE,
1204                        vid_or_eid: id,
1205                        label_or_type: String::new(),
1206                        old_value: old,
1207                        new_properties: None,
1208                        old_properties: old_props_map,
1209                    });
1210                }
1211            }
1212        }
1213
1214        // Edge writes — CREATE if not pre-existing, else UPDATE.
1215        // `old_value` carries the pre-image edge properties for UPDATE
1216        // and is `None` for CREATE.
1217        for eid in l0.edge_endpoints.keys() {
1218            if l0.tombstones.contains_key(eid) {
1219                continue;
1220            }
1221            let etype = l0.edge_types.get(eid).cloned().unwrap_or_default();
1222            let (kind, old, old_props_map) = match probe {
1223                Some(p) if p.edge_pre_existed(*eid) => (
1224                    TriggerEventMask::EDGE_UPDATE,
1225                    p.edge_old_bytes(*eid),
1226                    p.edge_properties(*eid).and_then(&filtered),
1227                ),
1228                Some(_) => (TriggerEventMask::EDGE_CREATE, None, None),
1229                None => (TriggerEventMask::EDGE_UPDATE, None, None),
1230            };
1231            let new_props_map = l0.edge_properties.get(eid).and_then(&filtered);
1232            rows.push(MutationRow {
1233                event_kind: kind,
1234                vid_or_eid: eid_to_i64(*eid),
1235                label_or_type: etype,
1236                old_value: old,
1237                new_properties: new_props_map,
1238                old_properties: old_props_map,
1239            });
1240        }
1241
1242        // Edge deletes. `old_value` is the pre-tx property image when
1243        // the probe captured it.
1244        for (eid, ts) in &l0.tombstones {
1245            let etype = l0
1246                .edge_types
1247                .get(eid)
1248                .cloned()
1249                .unwrap_or_else(|| format!("type:{}", ts.edge_type));
1250            let old = probe.and_then(|p| p.edge_old_bytes(*eid));
1251            let old_props_map = probe
1252                .and_then(|p| p.edge_properties(*eid))
1253                .and_then(&filtered);
1254            rows.push(MutationRow {
1255                event_kind: TriggerEventMask::EDGE_DELETE,
1256                vid_or_eid: eid_to_i64(*eid),
1257                label_or_type: etype,
1258                old_value: old,
1259                new_properties: None,
1260                old_properties: old_props_map,
1261            });
1262        }
1263
1264        Self { rows }
1265    }
1266
1267    /// Project every captured row into the canonical [`event_row_schema`]
1268    /// `RecordBatch`, with no per-route filtering and no predicate.
1269    ///
1270    /// Used by the CDC delivery path to hand subscribers the full
1271    /// stream of mutations for a committed transaction (M11 FU-4). The
1272    /// per-trigger filtered shape is built by `Self::filter_for`.
1273    ///
1274    /// Returns `None` when there are zero rows (lets callers skip
1275    /// constructing an empty `CdcBatch`).
1276    #[must_use]
1277    pub fn materialize_all(&self) -> Option<RecordBatch> {
1278        if self.rows.is_empty() {
1279            return None;
1280        }
1281        let mut cols = EventRowColumns::with_capacity(self.rows.len());
1282        for row in &self.rows {
1283            cols.push_row(row);
1284        }
1285        cols.into_batch()
1286    }
1287
1288    /// Filter rows matching `entry`'s subscription selectors and
1289    /// project them into the §4.18 RecordBatch shape. Returns `None`
1290    /// if no rows match (caller skips the `fire` call).
1291    fn filter_for(&self, entry: &RouteEntry) -> Option<RecordBatch> {
1292        // property_filter is satisfied vacuously here — per-property
1293        // event-row population (one row per (vid, property) write) is
1294        // not the chosen surface; predicate authors instead reference
1295        // `n.<prop>` directly and the property-bag column resolves it
1296        // through `index`.
1297        let _ = &entry.property_filter;
1298        let mut cols = EventRowColumns::default();
1299        for row in &self.rows {
1300            if entry.matches(row.event_kind, &row.label_or_type) {
1301                cols.push_row(row);
1302            }
1303        }
1304        let batch = cols.into_batch()?;
1305
1306        // Apply the compiled `predicate_source` boolean mask if any.
1307        // Evaluation failures degrade safely to "no rows match" — the
1308        // predicate was already validated at router build, so failures
1309        // here imply an Arrow/DataFusion bug we'd rather skip than
1310        // surface as a commit error.
1311        let batch = match &entry.compiled_predicate {
1312            Some(predicate) => apply_predicate(predicate, batch)?,
1313            None => batch,
1314        };
1315
1316        if batch.num_rows() == 0 {
1317            return None;
1318        }
1319        Some(batch)
1320    }
1321}
1322
1323/// Column-oriented builder for the canonical event-row [`RecordBatch`]
1324/// produced by [`MutationEvents::materialize_all`] and
1325/// [`MutationEvents::filter_for`]. Keeps the per-column allocation +
1326/// per-row push logic in one place so the two callers stay in lockstep.
1327#[derive(Default)]
1328struct EventRowColumns {
1329    kinds: Vec<u8>,
1330    ids: Vec<i64>,
1331    labels: Vec<String>,
1332    properties: Vec<String>,
1333    olds: Vec<Option<Vec<u8>>>,
1334    news: Vec<Option<Vec<u8>>>,
1335    props_new: Vec<Option<Vec<u8>>>,
1336    props_old: Vec<Option<Vec<u8>>>,
1337}
1338
1339impl EventRowColumns {
1340    fn with_capacity(cap: usize) -> Self {
1341        Self {
1342            kinds: Vec::with_capacity(cap),
1343            ids: Vec::with_capacity(cap),
1344            labels: Vec::with_capacity(cap),
1345            properties: Vec::with_capacity(cap),
1346            olds: Vec::with_capacity(cap),
1347            news: Vec::with_capacity(cap),
1348            props_new: Vec::with_capacity(cap),
1349            props_old: Vec::with_capacity(cap),
1350        }
1351    }
1352
1353    fn push_row(&mut self, row: &MutationRow) {
1354        self.kinds.push(mask_to_discriminant(row.event_kind));
1355        self.ids.push(row.vid_or_eid);
1356        self.labels.push(row.label_or_type.clone());
1357        self.properties.push(String::new());
1358        self.olds.push(row.old_value.clone());
1359        self.news.push(None);
1360        self.props_new.push(
1361            row.new_properties
1362                .as_ref()
1363                .map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
1364        );
1365        self.props_old.push(
1366            row.old_properties
1367                .as_ref()
1368                .map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
1369        );
1370    }
1371
1372    /// Materialize the columns into a `RecordBatch`. Returns `None`
1373    /// when zero rows were collected (callers skip the empty case).
1374    fn into_batch(self) -> Option<RecordBatch> {
1375        if self.kinds.is_empty() {
1376            return None;
1377        }
1378        let kind_arr: Arc<dyn arrow_array::Array> = Arc::new(UInt8Array::from(self.kinds));
1379        let id_arr: Arc<dyn arrow_array::Array> = Arc::new(Int64Array::from(self.ids));
1380        let label_arr: Arc<dyn arrow_array::Array> =
1381            Arc::new(arrow_array::StringArray::from(self.labels));
1382        let prop_arr: Arc<dyn arrow_array::Array> =
1383            Arc::new(arrow_array::StringArray::from(self.properties));
1384        let olds_iter: Vec<Option<&[u8]>> = self.olds.iter().map(|o| o.as_deref()).collect();
1385        let news_iter: Vec<Option<&[u8]>> = self.news.iter().map(|o| o.as_deref()).collect();
1386        let old_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(olds_iter));
1387        let new_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(news_iter));
1388        let pnew_iter: Vec<Option<&[u8]>> = self.props_new.iter().map(|o| o.as_deref()).collect();
1389        let pold_iter: Vec<Option<&[u8]>> = self.props_old.iter().map(|o| o.as_deref()).collect();
1390        let pnew_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pnew_iter));
1391        let pold_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pold_iter));
1392
1393        RecordBatch::try_new(
1394            event_row_schema(),
1395            vec![
1396                kind_arr, id_arr, label_arr, prop_arr, old_arr, new_arr, pnew_arr, pold_arr,
1397            ],
1398        )
1399        .ok()
1400    }
1401}
1402
1403/// Run a compiled trigger predicate against the candidate batch,
1404/// returning `Some(filtered_batch)` when at least one row passes and
1405/// `None` when the predicate eliminates every row or the evaluation
1406/// fails (logged at warn level, treated as "no match" to avoid
1407/// silently firing on rows the predicate would have rejected).
1408fn apply_predicate(predicate: &Arc<dyn PhysicalExpr>, batch: RecordBatch) -> Option<RecordBatch> {
1409    use datafusion::arrow::compute::filter_record_batch;
1410    use datafusion::logical_expr::ColumnarValue;
1411
1412    let value = match predicate.evaluate(&batch) {
1413        Ok(v) => v,
1414        Err(e) => {
1415            warn!(error = %e, "trigger predicate evaluation failed; dropping batch");
1416            return None;
1417        }
1418    };
1419    let array = match value {
1420        ColumnarValue::Array(a) => a,
1421        ColumnarValue::Scalar(s) => match s.to_array_of_size(batch.num_rows()) {
1422            Ok(a) => a,
1423            Err(e) => {
1424                warn!(error = %e, "trigger predicate scalar→array failed");
1425                return None;
1426            }
1427        },
1428    };
1429    let bool_arr = match array.as_any().downcast_ref::<BooleanArray>() {
1430        Some(b) => b,
1431        None => {
1432            warn!("trigger predicate must yield Boolean; dropping batch");
1433            return None;
1434        }
1435    };
1436    filter_record_batch(&batch, bool_arr).ok()
1437}
1438
1439fn mask_to_discriminant(m: TriggerEventMask) -> u8 {
1440    // 1-based bit position of the lowest set bit (e.g. `0b001 → 1`,
1441    // `0b100 → 3`); falls back to 0 when no bit is set. Emitted rows
1442    // always carry exactly one bit, so the lowest set bit is *the* bit.
1443    if m.0 == 0 {
1444        return 0;
1445    }
1446    m.0.trailing_zeros() as u8 + 1
1447}
1448
1449fn vid_to_i64(vid: uni_common::Vid) -> i64 {
1450    // Vid is a newtype around a u64; reinterpret-cast preserves bits.
1451    vid.as_u64() as i64
1452}
1453
1454fn eid_to_i64(eid: uni_common::Eid) -> i64 {
1455    eid.as_u64() as i64
1456}
1457
1458// ── M11 deferral queue (memory-backed v1) ──────────────────────────
1459
1460/// Maximum number of times a `TriggerOutcome::Defer` will be re-queued
1461/// before the queue gives up and drops the item with a warning. Caps
1462/// the worst case for a pathological plugin that always returns
1463/// `Defer` from cascading.
1464const DEFER_MAX_ATTEMPTS: u32 = 10;
1465
1466struct DeferredItem {
1467    plugin: Arc<dyn TriggerPlugin>,
1468    name: String,
1469    batch: MutationBatch,
1470    session_id: String,
1471    tx_id: u64,
1472    attempts: u32,
1473    /// `TriggerDeferral::payload` passed back to
1474    /// [`TriggerPlugin::on_deferred`] when this item fires (FU-5).
1475    payload: String,
1476}
1477
1478/// In-memory deferral queue for `TriggerOutcome::Defer`.
1479///
1480/// Items are keyed by their scheduled fire instant in a `BTreeMap`,
1481/// so `drain_due` pops the next-due slot in O(log n). The queue is
1482/// drained by a per-`Uni` background tick task spawned at DB build
1483/// time; firing happens on the tokio runtime.
1484///
1485/// **v1 limitations** (in-memory only):
1486/// - Restart drops queued items. A persistent queue (system-table or
1487///   WAL extension) is `TODO(M11-persist)`.
1488/// - No transactional guarantee that a deferred item eventually fires
1489///   — if the process exits before the scheduled instant, the item is
1490///   lost.
1491/// - Per-item retry is capped at `DEFER_MAX_ATTEMPTS` to prevent
1492///   runaway re-deferral loops.
1493#[derive(Default)]
1494pub struct DeferralQueue {
1495    inner: parking_lot::Mutex<BTreeMap<StdInstant, Vec<DeferredItem>>>,
1496    /// Optional JSON-sidecar persistence (FU-5). When set, every
1497    /// `push` mirrors the queue state to disk and every `drain_due`
1498    /// rewrites the sidecar so a crash-restart can re-load the queue
1499    /// state. The persistence sink resolves [`TriggerPlugin`]s by qname
1500    /// from the host's [`uni_plugin::PluginRegistry`] at load time.
1501    sidecar: parking_lot::Mutex<Option<DeferralSidecar>>,
1502}
1503
1504impl std::fmt::Debug for DeferralQueue {
1505    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1506        let len: usize = self.inner.lock().values().map(|v| v.len()).sum();
1507        f.debug_struct("DeferralQueue").field("size", &len).finish()
1508    }
1509}
1510
1511impl DeferralQueue {
1512    /// Build a fresh empty queue.
1513    #[must_use]
1514    pub fn new() -> Arc<Self> {
1515        Arc::new(Self::default())
1516    }
1517
1518    /// Build a queue with JSON-sidecar persistence rooted at
1519    /// `<data_path>/_system/deferred_triggers.json`.
1520    ///
1521    /// On startup the queue's `load` method walks the sidecar and
1522    /// re-binds each row to its `TriggerPlugin` by qname via the
1523    /// supplied [`uni_plugin::PluginRegistry`]. Items whose plugin
1524    /// can no longer be resolved are dropped with a warn.
1525    ///
1526    /// Persists on every `push` and after every `drain_due` (FU-5).
1527    /// I/O failures degrade to debug logs — in-memory queue state
1528    /// remains authoritative for the running process.
1529    #[must_use]
1530    pub fn with_persistence(data_path: std::path::PathBuf) -> Arc<Self> {
1531        let queue = Arc::new(Self::default());
1532        *queue.sidecar.lock() = Some(DeferralSidecar::new(data_path));
1533        queue
1534    }
1535
1536    /// Borrow the sidecar path, if persistence is enabled.
1537    #[must_use]
1538    pub fn sidecar_path(&self) -> Option<std::path::PathBuf> {
1539        self.sidecar.lock().as_ref().map(|s| s.path().to_path_buf())
1540    }
1541
1542    /// Replay persisted items from the sidecar, re-binding each row's
1543    /// trigger qname against the registry. Should be called once
1544    /// after `Uni::build` finishes wiring triggers but before the
1545    /// queue tick task starts. Idempotent.
1546    ///
1547    /// Returns the number of items reloaded.
1548    pub fn load_from_sidecar(
1549        self: &Arc<Self>,
1550        registry: &Arc<uni_plugin::PluginRegistry>,
1551    ) -> usize {
1552        let Some(sidecar) = self.sidecar.lock().clone() else {
1553            return 0;
1554        };
1555        let now_wall = std::time::SystemTime::now();
1556        let now_mono = StdInstant::now();
1557        let rows = match sidecar.read_all() {
1558            Ok(rows) => rows,
1559            Err(e) => {
1560                tracing::debug!(error = %e, "DeferralQueue: sidecar read failed");
1561                return 0;
1562            }
1563        };
1564        let mut restored = 0usize;
1565        for row in rows {
1566            let Some(entry) = registry
1567                .triggers()
1568                .iter()
1569                .find(|t| subscription_name(t.subscription()) == row.name)
1570                .cloned()
1571            else {
1572                tracing::warn!(
1573                    trigger = %row.name,
1574                    "DeferralQueue: dropping persisted item; trigger no longer registered"
1575                );
1576                continue;
1577            };
1578            // Re-decode the persisted MutationBatch from Arrow IPC.
1579            let batch = match arrow_ipc_decode(&row.batch_ipc) {
1580                Ok(b) => b,
1581                Err(e) => {
1582                    tracing::warn!(error = %e, "DeferralQueue: drop persisted item; IPC decode failed");
1583                    continue;
1584                }
1585            };
1586            // Translate the persisted wall-clock fire_at to a monotonic
1587            // Instant relative to current time. Past-due fire-ats
1588            // collapse to "now" so they fire on the next tick.
1589            let fire_at_wall = std::time::UNIX_EPOCH + Duration::from_millis(row.fire_at_epoch_ms);
1590            let mono_delta = fire_at_wall
1591                .duration_since(now_wall)
1592                .unwrap_or(Duration::ZERO);
1593            let fire_at_mono = now_mono + mono_delta;
1594            let item = DeferredItem {
1595                plugin: entry,
1596                name: row.name,
1597                batch: MutationBatch {
1598                    events: Arc::new(batch),
1599                },
1600                session_id: row.session_id,
1601                tx_id: row.tx_id,
1602                attempts: row.attempts,
1603                payload: row.payload,
1604            };
1605            self.inner
1606                .lock()
1607                .entry(fire_at_mono)
1608                .or_default()
1609                .push(item);
1610            restored += 1;
1611        }
1612        restored
1613    }
1614
1615    /// Persist the current queue state to the sidecar (no-op when
1616    /// persistence is disabled). I/O errors degrade to debug log.
1617    fn persist_locked(
1618        &self,
1619        guard: &parking_lot::MutexGuard<'_, BTreeMap<StdInstant, Vec<DeferredItem>>>,
1620    ) {
1621        let Some(sidecar) = self.sidecar.lock().clone() else {
1622            return;
1623        };
1624        let now_wall = std::time::SystemTime::now();
1625        let now_mono = StdInstant::now();
1626        let mut rows: Vec<PersistedDeferral> = Vec::new();
1627        for (fire_at_mono, items) in guard.iter() {
1628            for item in items {
1629                // Convert the monotonic Instant back to wall-clock by
1630                // measuring the delta against `now` and offsetting
1631                // `now_wall`. Past-due items get a fire_at slightly
1632                // before `now_wall` so they fire immediately on
1633                // restart.
1634                let fire_at_wall = if *fire_at_mono <= now_mono {
1635                    now_wall
1636                } else {
1637                    now_wall + fire_at_mono.duration_since(now_mono)
1638                };
1639                let fire_at_epoch_ms = fire_at_wall
1640                    .duration_since(std::time::UNIX_EPOCH)
1641                    .map(|d| d.as_millis() as u64)
1642                    .unwrap_or(0);
1643                let batch_ipc = match arrow_ipc_encode(&item.batch.events) {
1644                    Ok(b) => b,
1645                    Err(e) => {
1646                        tracing::debug!(error = %e, "DeferralQueue: IPC encode failed; skipping row");
1647                        continue;
1648                    }
1649                };
1650                rows.push(PersistedDeferral {
1651                    name: item.name.clone(),
1652                    session_id: item.session_id.clone(),
1653                    tx_id: item.tx_id,
1654                    attempts: item.attempts,
1655                    payload: item.payload.clone(),
1656                    batch_ipc,
1657                    fire_at_epoch_ms,
1658                });
1659            }
1660        }
1661        if let Err(e) = sidecar.write_all(&rows) {
1662            tracing::debug!(error = %e, "DeferralQueue: sidecar write failed");
1663        }
1664    }
1665
1666    fn push(&self, item: DeferredItem, fire_at: StdInstant) {
1667        let mut guard = self.inner.lock();
1668        guard.entry(fire_at).or_default().push(item);
1669        self.persist_locked(&guard);
1670    }
1671
1672    /// Pop every item whose scheduled fire instant is `<= now`.
1673    fn drain_due(&self, now: StdInstant) -> Vec<DeferredItem> {
1674        let mut guard = self.inner.lock();
1675        let mut due = Vec::new();
1676        // BTreeMap::split_off gives us [now+ε..) so we keep that half
1677        // and the front half is everything ≤ now.
1678        let mut to_keep = guard.split_off(&(now + Duration::from_nanos(1)));
1679        std::mem::swap(&mut *guard, &mut to_keep);
1680        for (_, mut items) in to_keep {
1681            due.append(&mut items);
1682        }
1683        // FU-5: persist the remaining queue state after each drain so
1684        // a restart sees only the still-pending items.
1685        self.persist_locked(&guard);
1686        due
1687    }
1688
1689    /// Approximate pending count — for diagnostics / tests.
1690    #[must_use]
1691    pub fn pending(&self) -> usize {
1692        self.inner.lock().values().map(|v| v.len()).sum()
1693    }
1694
1695    /// Tick the queue once: drain due items, fire each. Items that
1696    /// re-defer are re-enqueued until `DEFER_MAX_ATTEMPTS`. Async
1697    /// because plugin `fire` may block the runtime; we re-enter the
1698    /// tokio executor between items via `spawn_blocking` -- but since
1699    /// most triggers are CPU-light, the inline call here is fine for
1700    /// v1.
1701    pub fn tick(self: &Arc<Self>) {
1702        let due = self.drain_due(StdInstant::now());
1703        for mut item in due {
1704            // FU-5: invoke the dedicated `on_deferred` callback so
1705            // trigger plugins can receive the original `payload`.
1706            // The default impl on the trait delegates back to `fire`,
1707            // so existing trigger plugins keep working unchanged.
1708            let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1709                item.plugin.on_deferred(
1710                    TriggerContext::new(&item.session_id, item.tx_id),
1711                    &item.batch,
1712                    &item.payload,
1713                )
1714            }));
1715            let name = item.name.clone();
1716            handle_fire_outcome(outcome, &name, "deferred trigger", |until| {
1717                item.attempts += 1;
1718                if item.attempts >= DEFER_MAX_ATTEMPTS {
1719                    warn!(
1720                        trigger = %item.name,
1721                        attempts = item.attempts,
1722                        "deferred trigger exceeded DEFER_MAX_ATTEMPTS; dropping"
1723                    );
1724                    return;
1725                }
1726                // FU-5: honor the new `delay` field when re-deferring.
1727                // `None` falls back to "next tick" — matches the legacy
1728                // semantics. The trigger may have updated the payload on
1729                // re-defer; propagate the new one.
1730                let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
1731                item.payload = until.payload;
1732                self.push(item, fire_at);
1733            });
1734        }
1735    }
1736}
1737
1738// ── Helpers used by `Transaction::commit` ──────────────────────────
1739
1740/// Convenience: stable-hash a `&str` tx id (commit path stores tx_id
1741/// as `String`) down to the `u64` the `TriggerContext` carries.
1742#[must_use]
1743pub fn tx_id_to_u64(tx_id: &str) -> u64 {
1744    use std::collections::hash_map::DefaultHasher;
1745    use std::hash::{Hash, Hasher};
1746    let mut hasher = DefaultHasher::new();
1747    tx_id.hash(&mut hasher);
1748    hasher.finish()
1749}
1750
1751// ── FU-5: persisted deferral sidecar ──────────────────────────────
1752
1753/// On-disk row in `<data_path>/_system/deferred_triggers.json`.
1754///
1755/// `batch_ipc` is the trigger's [`MutationBatch`] encoded as Arrow
1756/// IPC stream bytes — preserves schema + values across restarts. The
1757/// `name` is the trigger's `subscription_name`, which the host's
1758/// re-resolution path uses to find the registered `TriggerPlugin`.
1759#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
1760struct PersistedDeferral {
1761    name: String,
1762    session_id: String,
1763    tx_id: u64,
1764    attempts: u32,
1765    payload: String,
1766    /// Arrow IPC stream bytes for the [`MutationBatch::events`]
1767    /// `RecordBatch`.
1768    #[serde(with = "serde_bytes")]
1769    batch_ipc: Vec<u8>,
1770    /// Wall-clock fire instant, milliseconds since UNIX epoch.
1771    fire_at_epoch_ms: u64,
1772}
1773
1774/// Atomic JSON-sidecar persistence handle for the deferral queue.
1775#[derive(Clone, Debug)]
1776struct DeferralSidecar {
1777    sidecar: uni_sidecar::VecSidecar<PersistedDeferral>,
1778}
1779
1780impl DeferralSidecar {
1781    /// Construct rooted at `<data_path>/_system/deferred_triggers.json`.
1782    fn new(data_path: std::path::PathBuf) -> Self {
1783        Self {
1784            sidecar: uni_sidecar::VecSidecar::new(data_path, "deferred_triggers.json"),
1785        }
1786    }
1787
1788    /// Borrow the resolved sidecar path (for diagnostics).
1789    fn path(&self) -> &std::path::Path {
1790        self.sidecar.path()
1791    }
1792
1793    fn read_all(&self) -> Result<Vec<PersistedDeferral>, String> {
1794        self.sidecar.load().map_err(|e| e.to_string())
1795    }
1796
1797    fn write_all(&self, rows: &[PersistedDeferral]) -> Result<(), String> {
1798        self.sidecar.store(rows).map_err(|e| e.to_string())
1799    }
1800}
1801
1802/// Encode a `RecordBatch` as Arrow IPC stream bytes (FU-5).
1803fn arrow_ipc_encode(batch: &arrow_array::RecordBatch) -> Result<Vec<u8>, String> {
1804    let schema = batch.schema();
1805    let mut buf: Vec<u8> = Vec::with_capacity(2048);
1806    {
1807        let mut w = arrow_ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())
1808            .map_err(|e| format!("ipc writer: {e}"))?;
1809        w.write(batch).map_err(|e| format!("ipc write: {e}"))?;
1810        w.finish().map_err(|e| format!("ipc finish: {e}"))?;
1811    }
1812    Ok(buf)
1813}
1814
1815/// Decode Arrow IPC stream bytes into a single `RecordBatch` (FU-5).
1816fn arrow_ipc_decode(bytes: &[u8]) -> Result<arrow_array::RecordBatch, String> {
1817    let reader = arrow_ipc::reader::StreamReader::try_new(bytes, None)
1818        .map_err(|e| format!("ipc reader: {e}"))?;
1819    let batches: Vec<arrow_array::RecordBatch> = reader
1820        .collect::<Result<Vec<_>, _>>()
1821        .map_err(|e| format!("ipc collect: {e}"))?;
1822    batches
1823        .into_iter()
1824        .next()
1825        .ok_or_else(|| "ipc decode: empty stream".to_owned())
1826}
1827
1828// ── Tests ──────────────────────────────────────────────────────────
1829
1830#[cfg(test)]
1831mod tests {
1832    use super::*;
1833    use uni_plugin::traits::trigger::TriggerEventMask;
1834
1835    #[test]
1836    fn mask_discriminants_are_stable() {
1837        assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_CREATE), 1);
1838        assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_UPDATE), 2);
1839        assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_DELETE), 3);
1840        assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_CREATE), 4);
1841        assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_UPDATE), 5);
1842        assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_DELETE), 6);
1843    }
1844
1845    #[test]
1846    fn empty_router_is_empty() {
1847        let by_phase = [Vec::new(), Vec::new(), Vec::new(), Vec::new()];
1848        let router = TriggerRouter {
1849            by_phase,
1850            defer_queue: None,
1851        };
1852        assert!(router.is_empty());
1853    }
1854
1855    #[test]
1856    fn tx_id_to_u64_is_deterministic() {
1857        let a = tx_id_to_u64("tx-1");
1858        let b = tx_id_to_u64("tx-1");
1859        let c = tx_id_to_u64("tx-2");
1860        assert_eq!(a, b);
1861        assert_ne!(a, c);
1862    }
1863}