uni_plugin_host/triggers.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Host-side dispatch for `TriggerPlugin` registrations (M5f).
5//!
6//! Bridges `PluginRegistry::triggers()` into the transaction commit
7//! path. The dispatcher builds a per-phase routing table once per
8//! commit, drains mutation events from the transaction's private L0
9//! buffer into a stable Arrow `RecordBatch`, applies subscription
10//! selectors (event-kind mask + label / edge-type / property filter),
11//! and invokes each matching trigger at the appropriate phase.
12//!
13//! Phase ordering inside a single commit:
14//! 1. `BeforeMutation` then `BeforeCommit` — fired before the writer
15//! lock is taken. `Synchronous` reject aborts the transaction.
16//! 2. WAL flush + L1 merge run.
17//! 3. `AfterMutation` then `AfterCommit` — fired after publish. `Async`
18//! fire-mode triggers are spawned onto the tokio runtime so the
19//! writer's hot path stays untouched.
20//!
21//! Behavior contract:
22//! - `predicate_source` is compiled at router build (per-commit) via
23//! `uni_cypher::parse_expression` → AST property-ref rewrite →
24//! `cypher_expr_to_df` → DataFusion `PhysicalExpr`, and evaluated
25//! against the per-row event batch in `filter_for`. Predicates may
26//! reference the event-row columns (`event_kind`, `vid_or_eid`,
27//! `label`, `property`, `old_value`, `new_value`) as well as
28//! per-entity properties: `n.foo` reads the new (post-mutation)
29//! property value, `old.foo` reads the pre-image. Referenced
30//! property keys are tracked in `RouteEntry::properties_referenced`
31//! so [`MutationEvents::from_l0_with_probe`] materializes exactly
32//! those keys into the per-row property bags — predicate-gated
33//! cost, no work for property-free predicates.
34//! - `TriggerOutcome::Defer` enqueues the trigger fire into the
35//! per-`Uni` [`DeferralQueue`] (in-memory, ticked at 50ms by the
36//! background task spawned in `Uni::build`). Items re-fire on the
37//! next tick; re-deferring is capped at `DEFER_MAX_ATTEMPTS`.
38//! Restart-durable persistence lives with the M11 CDC scheduler.
39//! - `NODE_CREATE` / `NODE_UPDATE` / `NODE_DELETE` (and the edge
40//! analogs) are distinguished via a committed-state probe
41//! ([`PreExistingProbe`]) passed to
42//! [`MutationEvents::from_l0_with_probe`]. The probe covers (a) the
43//! current L0 buffer + pending-flush L0s via
44//! [`PreExistingProbe::from_l0_chain`] (sync, no I/O) and (b) the
45//! L1 storage layer via [`PreExistingProbe::extend_with_l1`] (async,
46//! batched `_vid IN (…)` scan per label, chunked at 1024 VIDs).
47//! Callers that construct [`MutationEvents`] without a probe
48//! ([`MutationEvents::from_l0`]) fall back to emitting `NODE_UPDATE`
49//! / `EDGE_UPDATE` for every non-tombstoned write.
50//! - `old_value` is populated from the L0-chain probe for vertices
51//! and edges visible there, and from the L1 probe (which now
52//! projects every property column on the candidate label) for
53//! vertices that were drained out of L0 by a previous flush. Edge
54//! pre-images are captured in the L0 chain via `edge_properties`.
55
56use std::collections::{BTreeMap, HashMap, HashSet};
57use std::sync::Arc;
58use std::time::{Duration, Instant as StdInstant};
59
60use arrow_array::{BooleanArray, Int64Array, LargeBinaryArray, RecordBatch, UInt8Array};
61use arrow_schema::{DataType, Field, Schema, SchemaRef};
62use datafusion::physical_plan::PhysicalExpr;
63use tokio::runtime::Handle;
64use tracing::warn;
65use uni_common::cypher_value_codec;
66use uni_common::{Properties, UniError, Value};
67use uni_plugin::PluginRegistry;
68use uni_plugin::traits::trigger::{
69 FireMode, MutationBatch, TriggerContext, TriggerEventMask, TriggerOutcome, TriggerPhase,
70 TriggerPlugin, TriggerSubscription,
71};
72use uni_store::runtime::L0Manager;
73use uni_store::runtime::l0::L0Buffer;
74
75/// Number of distinct `TriggerPhase` variants (`BeforeMutation`,
76/// `AfterMutation`, `BeforeCommit`, `AfterCommit`).
77const PHASE_COUNT: usize = 4;
78
79/// Canonical Arrow schema for the per-row event batch handed to each
80/// `TriggerPlugin::fire` call. Kept in one place so `filter_for` and
81/// the `predicate_source` compiler agree on column names + types.
82///
83/// Also used by the CDC delivery path (M11 FU-4) so subscribers
84/// receive events in the same shape triggers do.
85pub fn event_row_schema() -> SchemaRef {
86 Arc::new(Schema::new(vec![
87 Field::new("event_kind", DataType::UInt8, false),
88 Field::new("vid_or_eid", DataType::Int64, false),
89 Field::new("label", DataType::Utf8, false),
90 Field::new("property", DataType::Utf8, false),
91 Field::new("old_value", DataType::LargeBinary, true),
92 Field::new("new_value", DataType::LargeBinary, true),
93 // Per-row property bags carrying a CypherValue-encoded
94 // `Value::Map` of the (selected) post-mutation and pre-image
95 // property values. The Cypher predicate compiler rewrites
96 // `n.foo` / `old.foo` references against these columns, which
97 // the existing `index(map, key)` UDF handles via the
98 // CypherValue codec — no bespoke map-access path required.
99 Field::new("properties_new", DataType::LargeBinary, true),
100 Field::new("properties_old", DataType::LargeBinary, true),
101 ]))
102}
103
104/// Compile a Cypher boolean expression (`predicate_source`) into a
105/// DataFusion `PhysicalExpr` that evaluates against [`event_row_schema`],
106/// together with the set of node/edge property keys the predicate
107/// references (used downstream to predicate-gate property-bag
108/// materialization).
109///
110/// Pipeline: `uni_cypher::parse_expression` → in-place AST rewrite of
111/// `n.foo` / `old.foo` into `properties_new.foo` / `properties_old.foo`
112/// → `cypher_expr_to_df` (whose property-access translator emits
113/// `index(col, "foo")` for non-graph-entity bases — the existing
114/// `index` UDF then performs map lookup on the CypherValue-encoded
115/// `LargeBinary` bag) → DataFusion `TypeCoercion` →
116/// `create_physical_expr`. Same pattern as `apply_having_filter` in
117/// `crates/uni-query/src/query/df_graph/locy_fixpoint.rs:2734-2810`,
118/// just narrowed to a single expression against a fixed schema.
119///
120/// # Errors
121///
122/// Returns an error string if the predicate fails to parse, references
123/// columns not present in the event-row schema (event-row columns or
124/// `n.<prop>` / `old.<prop>` property references), or fails type
125/// coercion.
126fn compile_predicate(source: &str) -> Result<(Arc<dyn PhysicalExpr>, HashSet<String>), String> {
127 use datafusion::common::DFSchema;
128 use datafusion::logical_expr::LogicalPlanBuilder;
129 use datafusion::optimizer::AnalyzerRule;
130 use datafusion::optimizer::analyzer::type_coercion::TypeCoercion;
131 use datafusion::physical_expr::create_physical_expr;
132 use datafusion::prelude::SessionContext;
133
134 let mut cypher_expr =
135 uni_cypher::parse_expression(source).map_err(|e| format!("parse: {e}"))?;
136 let mut props_referenced: HashSet<String> = HashSet::new();
137 rewrite_property_refs(&mut cypher_expr, &mut props_referenced);
138 let df_expr_raw = uni_query::query::df_expr::cypher_expr_to_df(&cypher_expr, None)
139 .map_err(|e| format!("translate: {e}"))?;
140
141 let schema = event_row_schema();
142 let df_schema = DFSchema::try_from(schema.as_ref().clone())
143 .map_err(|e| format!("schema-conversion: {e}"))?;
144
145 let ctx = SessionContext::new();
146 // Register Cypher UDFs (`index`, `_cypher_gt`, ...) so (a) UDF
147 // resolution below can swap placeholder `DummyUdf` nodes (which
148 // declare `return_type = Null`) for the real impls (which declare
149 // `LargeBinary` etc.), and (b) the resulting physical-expr can
150 // invoke them at evaluation time.
151 uni_query::query::df_udfs::register_cypher_udfs(&ctx)
152 .map_err(|e| format!("udf-register: {e}"))?;
153 let state = ctx.state();
154 let config = state.config_options().clone();
155 let props = state.execution_props();
156
157 // Resolve UDFs first so the type-system sees the *real* return
158 // types (e.g. `index` → LargeBinary) when the Cypher coercion pass
159 // below decides whether `LargeBinary > Int64` needs to be rewritten
160 // to `_cypher_gt`. Without this, `apply_type_coercion` sees Null and
161 // routes through the bogus cast-to-Int64 path.
162 let df_expr_resolved = resolve_dummy_udfs(df_expr_raw, &state)
163 .map_err(|e| format!("resolve-udfs (pre-coerce): {e}"))?;
164
165 // Apply Cypher-aware type coercion: rewrites `LargeBinary <op>
166 // <native>` (e.g. `index(properties_new, "balance") > 100`) into
167 // `_cypher_gt(left, right)` so the property-bag access path works
168 // for native operands.
169 let df_expr = uni_query::query::df_expr::apply_type_coercion(&df_expr_resolved, &df_schema)
170 .map_err(|e| format!("cypher-coercion: {e}"))?;
171
172 // Wrap in a Filter plan so TypeCoercion can align literals against
173 // the event-row column types (e.g. `event_kind = 1` coerces `1`
174 // from Int64 literal to UInt8 to match the column).
175 let empty = datafusion::logical_expr::LogicalPlan::EmptyRelation(
176 datafusion::logical_expr::EmptyRelation {
177 produce_one_row: false,
178 schema: Arc::new(df_schema.clone()),
179 },
180 );
181 let filter_plan = LogicalPlanBuilder::from(empty)
182 .filter(df_expr.clone())
183 .map_err(|e| format!("filter-plan: {e}"))?
184 .build()
185 .map_err(|e| format!("plan-build: {e}"))?;
186 let coerced_expr = match TypeCoercion::new().analyze(filter_plan, &config) {
187 Ok(datafusion::logical_expr::LogicalPlan::Filter(f)) => f.predicate,
188 _ => df_expr,
189 };
190
191 // Resolve placeholder `DummyUdf` scalar-function nodes (produced by
192 // `cypher_expr_to_df` / `apply_type_coercion`) into the real UDF
193 // impls registered on the SessionContext. Mirrors
194 // `QueryExecutor::resolve_udfs` (`df_planner.rs:5168`) — without
195 // this pass, `index` and `_cypher_gt` evaluation fails at runtime
196 // with "UDF '<name>' is not registered".
197 let resolved_expr =
198 resolve_dummy_udfs(coerced_expr, &state).map_err(|e| format!("resolve-udfs: {e}"))?;
199
200 let physical = create_physical_expr(&resolved_expr, &df_schema, props)
201 .map_err(|e| format!("physical-expr: {e}"))?;
202 Ok((physical, props_referenced))
203}
204
205/// Walk `expr` and replace every `ScalarFunction` whose name matches a
206/// UDF registered on `state.scalar_functions()` with the registered
207/// implementation. The Cypher translator (`cypher_expr_to_df`) emits
208/// placeholder `DummyUdf` wrappers carrying only the name; the real
209/// `IndexUdf` / `_cypher_gt` / ... impls live on the SessionContext.
210fn resolve_dummy_udfs(
211 expr: datafusion::logical_expr::Expr,
212 state: &datafusion::execution::SessionState,
213) -> Result<datafusion::logical_expr::Expr, String> {
214 use datafusion::common::tree_node::{Transformed, TreeNode};
215 use datafusion::logical_expr::Expr as DfExpr;
216
217 let result = expr
218 .transform_up(|node| {
219 if let DfExpr::ScalarFunction(ref func) = node {
220 let udf_name = func.func.name();
221 if let Some(registered_udf) = state.scalar_functions().get(udf_name) {
222 return Ok(Transformed::yes(DfExpr::ScalarFunction(
223 datafusion::logical_expr::expr::ScalarFunction {
224 func: registered_udf.clone(),
225 args: func.args.clone(),
226 },
227 )));
228 }
229 }
230 Ok(Transformed::no(node))
231 })
232 .map_err(|e| format!("udf-resolve walk: {e}"))?;
233 Ok(result.data)
234}
235
236/// Walk a parsed Cypher expression and rewrite property references on
237/// the canonical entity aliases (`n` for the post-mutation row,
238/// `old` for the pre-image) so they resolve against the per-row
239/// `properties_new` / `properties_old` columns of [`event_row_schema`].
240///
241/// `n.foo` → `properties_new.foo` (translates downstream to
242/// `index(col("properties_new"), "foo")` via the standard
243/// non-graph-entity property-access path in `cypher_expr_to_df`).
244/// `old.foo` → `properties_old.foo`. All referenced property names
245/// are collected into `referenced` for predicate-gated materialization
246/// in [`MutationEvents::from_l0_with_probe`].
247///
248/// Other Cypher expressions are walked recursively so a predicate like
249/// `n.balance > 100 AND old.status <> n.status` is fully rewritten.
250fn rewrite_property_refs(expr: &mut uni_cypher::ast::Expr, referenced: &mut HashSet<String>) {
251 use uni_cypher::ast::Expr;
252 match expr {
253 Expr::Property(base, prop) => {
254 // First recurse into the base — supports chained access like
255 // `n.address.city` (the inner `n.address` is rewritten to
256 // `properties_new.address`, then `index(...)` chains).
257 rewrite_property_refs(base, referenced);
258 if let Expr::Variable(name) = base.as_ref() {
259 match name.as_str() {
260 "n" => {
261 referenced.insert(prop.clone());
262 **base = Expr::Variable("properties_new".to_owned());
263 }
264 "old" => {
265 referenced.insert(prop.clone());
266 **base = Expr::Variable("properties_old".to_owned());
267 }
268 _ => {}
269 }
270 }
271 }
272 Expr::BinaryOp { left, right, .. } => {
273 rewrite_property_refs(left, referenced);
274 rewrite_property_refs(right, referenced);
275 }
276 Expr::UnaryOp { expr: inner, .. } => rewrite_property_refs(inner, referenced),
277 Expr::FunctionCall { args, .. } => {
278 for a in args {
279 rewrite_property_refs(a, referenced);
280 }
281 }
282 Expr::Case {
283 expr: case_expr,
284 when_then,
285 else_expr,
286 } => {
287 if let Some(e) = case_expr.as_deref_mut() {
288 rewrite_property_refs(e, referenced);
289 }
290 for (w, t) in when_then {
291 rewrite_property_refs(w, referenced);
292 rewrite_property_refs(t, referenced);
293 }
294 if let Some(e) = else_expr.as_deref_mut() {
295 rewrite_property_refs(e, referenced);
296 }
297 }
298 Expr::IsNull(inner) | Expr::IsNotNull(inner) | Expr::IsUnique(inner) => {
299 rewrite_property_refs(inner, referenced);
300 }
301 Expr::In { expr: e, list } => {
302 rewrite_property_refs(e, referenced);
303 rewrite_property_refs(list, referenced);
304 }
305 Expr::List(items) => {
306 for i in items {
307 rewrite_property_refs(i, referenced);
308 }
309 }
310 Expr::Map(pairs) => {
311 for (_, v) in pairs {
312 rewrite_property_refs(v, referenced);
313 }
314 }
315 Expr::ArrayIndex { array, index } => {
316 rewrite_property_refs(array, referenced);
317 rewrite_property_refs(index, referenced);
318 }
319 Expr::ArraySlice { array, start, end } => {
320 rewrite_property_refs(array, referenced);
321 if let Some(s) = start.as_deref_mut() {
322 rewrite_property_refs(s, referenced);
323 }
324 if let Some(e) = end.as_deref_mut() {
325 rewrite_property_refs(e, referenced);
326 }
327 }
328 // Literal / Parameter / Variable / Wildcard / subquery variants
329 // do not carry rewritable property refs at the surface level.
330 _ => {}
331 }
332}
333
334fn phase_index(p: TriggerPhase) -> usize {
335 // `TriggerPhase` is `#[non_exhaustive]` — fall back to BeforeMutation
336 // bucket so a future variant can't silently slot into an existing
337 // route's phase by accident.
338 match p {
339 TriggerPhase::BeforeMutation => 0,
340 TriggerPhase::AfterMutation => 1,
341 TriggerPhase::BeforeCommit => 2,
342 TriggerPhase::AfterCommit => 3,
343 _ => 0,
344 }
345}
346
347/// A single route in the per-phase dispatch table.
348struct RouteEntry {
349 plugin: Arc<dyn TriggerPlugin>,
350 name: String,
351 event_mask: u32,
352 label_filter: Option<Vec<String>>,
353 edge_type_filter: Option<Vec<String>>,
354 property_filter: Option<Vec<String>>,
355 fire_mode: FireMode,
356 /// Compiled `predicate_source` expression, evaluated per-row in
357 /// `filter_for` to drop rows where the predicate is false. `None`
358 /// when the subscription has no predicate. The compile is done
359 /// once per [`TriggerRouter::from_registry`] call.
360 compiled_predicate: Option<Arc<dyn PhysicalExpr>>,
361 /// Property names that the compiled predicate references via
362 /// `n.<prop>` or `old.<prop>`. Used to predicate-gate the
363 /// property-bag materialization in
364 /// [`MutationEvents::from_l0_with_probe`] — when this set is
365 /// empty the event-row pipeline does no per-property work for
366 /// this route.
367 properties_referenced: HashSet<String>,
368}
369
370impl RouteEntry {
371 fn matches(&self, kind: TriggerEventMask, label_or_type: &str) -> bool {
372 if (self.event_mask & kind.0) == 0 {
373 return false;
374 }
375 if let Some(ref labels) = self.label_filter
376 && kind_is_node(kind)
377 && !labels.iter().any(|l| l.as_str() == label_or_type)
378 {
379 return false;
380 }
381 if let Some(ref ets) = self.edge_type_filter
382 && kind_is_edge(kind)
383 && !ets.iter().any(|e| e.as_str() == label_or_type)
384 {
385 return false;
386 }
387 true
388 }
389}
390
391fn kind_is_node(kind: TriggerEventMask) -> bool {
392 let mask = TriggerEventMask::NODE_CREATE
393 .union(TriggerEventMask::NODE_UPDATE)
394 .union(TriggerEventMask::NODE_DELETE)
395 .union(TriggerEventMask::LABEL_ADDED)
396 .union(TriggerEventMask::LABEL_REMOVED);
397 (kind.0 & mask.0) != 0
398}
399
400fn kind_is_edge(kind: TriggerEventMask) -> bool {
401 let mask = TriggerEventMask::EDGE_CREATE
402 .union(TriggerEventMask::EDGE_UPDATE)
403 .union(TriggerEventMask::EDGE_DELETE);
404 (kind.0 & mask.0) != 0
405}
406
407/// Per-commit trigger dispatcher.
408pub struct TriggerRouter {
409 by_phase: [Vec<RouteEntry>; PHASE_COUNT],
410 /// Per-`Uni` deferral queue. `None` for read-only / test setups
411 /// without a queue — `TriggerOutcome::Defer` then falls back to
412 /// the legacy warn-and-collapse behavior.
413 defer_queue: Option<Arc<DeferralQueue>>,
414}
415
416impl TriggerRouter {
417 /// Snapshot the registered triggers into a routing table.
418 ///
419 /// Cheap for predicate-less subscriptions (one `Arc` clone for the
420 /// trigger vector, then one pass to bucket by phase). For
421 /// subscriptions carrying `predicate_source`, compiles the Cypher
422 /// predicate into a DataFusion `PhysicalExpr` once and stashes it
423 /// on the route — sub-millisecond per predicate, amortized against
424 /// commit overhead.
425 ///
426 /// # Errors
427 ///
428 /// Returns [`UniError::TriggerRejected`] (with a descriptive
429 /// `reason`) if any subscription's `predicate_source` fails to
430 /// parse, references unknown columns, or fails type coercion. The
431 /// error surfaces at commit time, not at registration — this is a
432 /// deliberate trade-off to keep `uni-plugin` free of a `uni-cypher`
433 /// dependency.
434 pub fn from_registry(reg: &PluginRegistry) -> Result<Self, UniError> {
435 Self::from_registry_with_queue(reg, None)
436 }
437
438 /// Variant that wires in a per-`Uni` deferral queue so
439 /// `TriggerOutcome::Defer` enqueues for re-firing instead of
440 /// being warned and dropped.
441 ///
442 /// # Errors
443 ///
444 /// Same as [`Self::from_registry`].
445 pub fn from_registry_with_queue(
446 reg: &PluginRegistry,
447 defer_queue: Option<Arc<DeferralQueue>>,
448 ) -> Result<Self, UniError> {
449 let triggers = reg.triggers();
450 let mut by_phase: [Vec<RouteEntry>; PHASE_COUNT] =
451 [Vec::new(), Vec::new(), Vec::new(), Vec::new()];
452 for plugin in triggers.iter() {
453 let sub: &TriggerSubscription = plugin.subscription();
454 let name = subscription_name(sub);
455 let (compiled_predicate, properties_referenced) = match sub.predicate_source.as_deref()
456 {
457 Some(src) => {
458 let (expr, refs) =
459 compile_predicate(src).map_err(|e| UniError::TriggerRejected {
460 trigger: name.clone(),
461 reason: format!(
462 "predicate_source compile failed: {e}. \
463 Supported references: event-row columns \
464 (event_kind, vid_or_eid, label, property, \
465 old_value, new_value) and entity property \
466 references `n.<prop>` (post-mutation) / \
467 `old.<prop>` (pre-image)."
468 ),
469 })?;
470 (Some(expr), refs)
471 }
472 None => (None, HashSet::new()),
473 };
474 let entry = RouteEntry {
475 plugin: Arc::clone(plugin),
476 name,
477 event_mask: sub.events.0,
478 label_filter: sub
479 .labels
480 .as_ref()
481 .map(|v| v.iter().map(|s| s.to_string()).collect()),
482 edge_type_filter: sub
483 .edge_types
484 .as_ref()
485 .map(|v| v.iter().map(|s| s.to_string()).collect()),
486 property_filter: sub
487 .properties
488 .as_ref()
489 .map(|v| v.iter().map(|s| s.to_string()).collect()),
490 fire_mode: sub.fire_mode,
491 compiled_predicate,
492 properties_referenced,
493 };
494 by_phase[phase_index(sub.phase)].push(entry);
495 }
496 Ok(Self {
497 by_phase,
498 defer_queue,
499 })
500 }
501
502 /// True if no triggers are registered at any phase.
503 #[must_use]
504 pub fn is_empty(&self) -> bool {
505 self.by_phase.iter().all(|v| v.is_empty())
506 }
507
508 /// Union of node/edge property names that any compiled trigger
509 /// predicate references (across all phases). Empty when no
510 /// trigger has a `predicate_source` mentioning `n.<prop>` /
511 /// `old.<prop>`. Drives predicate-gated property-bag
512 /// materialization in [`MutationEvents::from_l0_with_probe`].
513 #[must_use]
514 pub fn properties_referenced(&self) -> HashSet<String> {
515 let mut out: HashSet<String> = HashSet::new();
516 for routes in &self.by_phase {
517 for entry in routes {
518 for p in &entry.properties_referenced {
519 out.insert(p.clone());
520 }
521 }
522 }
523 out
524 }
525
526 /// Fire `BeforeMutation` then `BeforeCommit` phases in order.
527 ///
528 /// Returns `Err(UniError::TriggerRejected)` if a `Synchronous`
529 /// trigger returns `TriggerOutcome::Reject` or `Err`. `Async` /
530 /// `EventualConsistency` triggers are ignored at before-phases
531 /// (they ride on after-phases only — firing async work pre-commit
532 /// would let it observe a transaction that subsequently aborts).
533 ///
534 /// # Errors
535 ///
536 /// `UniError::TriggerRejected` on reject or fire error.
537 pub fn dispatch_before(
538 &self,
539 ctx: TriggerContext<'_>,
540 events: &MutationEvents,
541 ) -> Result<(), UniError> {
542 for &phase in &[TriggerPhase::BeforeMutation, TriggerPhase::BeforeCommit] {
543 let routes = &self.by_phase[phase_index(phase)];
544 for entry in routes {
545 if !matches!(entry.fire_mode, FireMode::Synchronous) {
546 continue;
547 }
548 let Some(batch) = events.filter_for(entry) else {
549 continue;
550 };
551 let mb = MutationBatch {
552 events: Arc::new(batch),
553 };
554 let ctx_ref = TriggerContext::new(ctx.session_id, ctx.tx_id);
555 match entry.plugin.fire(ctx_ref, &mb) {
556 Ok(TriggerOutcome::Continue) => {}
557 Ok(TriggerOutcome::Reject { reason }) => {
558 return Err(UniError::TriggerRejected {
559 trigger: entry.name.to_string(),
560 reason,
561 });
562 }
563 Ok(TriggerOutcome::Defer { until }) => {
564 // Memory-backed in-process deferral. FU-5 adds
565 // an optional `delay` to `TriggerDeferral`;
566 // `None` re-fires on the next queue tick,
567 // `Some(d)` schedules at `now + d`.
568 enqueue_deferral(
569 &self.defer_queue,
570 Arc::clone(&entry.plugin),
571 entry.name.clone(),
572 mb.clone(),
573 ctx.session_id.to_owned(),
574 ctx.tx_id,
575 until,
576 );
577 }
578 Ok(_) => {
579 // `TriggerOutcome` is `#[non_exhaustive]`; an
580 // unrecognised future variant is conservatively
581 // treated as Continue.
582 }
583 Err(e) => {
584 return Err(UniError::TriggerRejected {
585 trigger: entry.name.to_string(),
586 reason: e.to_string(),
587 });
588 }
589 }
590 }
591 }
592 Ok(())
593 }
594
595 /// Fire `AfterMutation` then `AfterCommit` phases. Cannot abort.
596 ///
597 /// `Synchronous` after-phase triggers run inline (panics caught and
598 /// logged). `Async` triggers are spawned on `runtime`.
599 /// `EventualConsistency` triggers are spawned the same as `Async`
600 /// in v1 (a real batched queue lands with M5g).
601 pub fn dispatch_after(
602 &self,
603 ctx: TriggerContext<'_>,
604 events: &MutationEvents,
605 runtime: &Handle,
606 ) {
607 for &phase in &[TriggerPhase::AfterMutation, TriggerPhase::AfterCommit] {
608 let routes = &self.by_phase[phase_index(phase)];
609 for entry in routes {
610 let Some(batch) = events.filter_for(entry) else {
611 continue;
612 };
613 let mb = MutationBatch {
614 events: Arc::new(batch),
615 };
616 match entry.fire_mode {
617 FireMode::Synchronous => {
618 fire_caught(entry, ctx.session_id, ctx.tx_id, &mb, &self.defer_queue);
619 }
620 // `FireMode::Async`, `EventualConsistency`, and any
621 // future variant land on the spawn path. v1 collapses
622 // EventualConsistency onto Async (no batched queue);
623 // M5g adds the real queue.
624 _ => {
625 let plugin = Arc::clone(&entry.plugin);
626 let name = entry.name.clone();
627 let session_id = ctx.session_id.to_owned();
628 let tx_id = ctx.tx_id;
629 let queue = self.defer_queue.clone();
630 runtime.spawn(async move {
631 let mb_inner = mb;
632 let result =
633 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
634 plugin.fire(TriggerContext::new(&session_id, tx_id), &mb_inner)
635 }));
636 handle_fire_outcome(result, &name, "async trigger", |until| {
637 enqueue_deferral(
638 &queue,
639 Arc::clone(&plugin),
640 name.clone(),
641 mb_inner,
642 session_id.clone(),
643 tx_id,
644 until,
645 );
646 });
647 });
648 }
649 }
650 }
651 }
652 }
653}
654
655/// Enqueue a [`TriggerOutcome::Defer`] result into the host's
656/// in-memory [`DeferralQueue`]. When no queue is wired (read-only or
657/// test setups) the item is dropped with a warn — matches the legacy
658/// fallback behavior.
659///
660/// The fire instant honors `until.delay` (FU-5); `None` collapses to
661/// "now" so the item fires on the next tick.
662fn enqueue_deferral(
663 queue: &Option<Arc<DeferralQueue>>,
664 plugin: Arc<dyn TriggerPlugin>,
665 name: String,
666 mb: MutationBatch,
667 session_id: String,
668 tx_id: u64,
669 until: uni_plugin::traits::trigger::TriggerDeferral,
670) {
671 let Some(queue) = queue else {
672 warn!(trigger = %name, "Defer with no queue wired; dropping");
673 return;
674 };
675 let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
676 queue.push(
677 DeferredItem {
678 plugin,
679 name,
680 batch: mb,
681 session_id,
682 tx_id,
683 attempts: 0,
684 payload: until.payload,
685 },
686 fire_at,
687 );
688}
689
690/// Dispatch the result of a `catch_unwind`-wrapped trigger fire.
691///
692/// All three fire paths (`dispatch_after`'s spawned task, [`fire_caught`], and
693/// [`DeferralQueue::tick`]) share the same four-way ladder:
694/// `Ok(Ok(Defer))` / `Ok(Ok(_))` (Continue/Reject/future) / `Ok(Err)` (the
695/// plugin errored) / `Err` (the plugin panicked). They differ only in the log
696/// `label` and what to do on a `Defer` — captured by `on_defer`.
697fn handle_fire_outcome<E: std::fmt::Display>(
698 outcome: Result<Result<TriggerOutcome, E>, Box<dyn std::any::Any + Send>>,
699 name: &str,
700 label: &str,
701 on_defer: impl FnOnce(uni_plugin::traits::trigger::TriggerDeferral),
702) {
703 match outcome {
704 Ok(Ok(TriggerOutcome::Defer { until })) => on_defer(until),
705 Ok(Ok(_)) => {}
706 Ok(Err(e)) => warn!(trigger = %name, error = %e, "{label} errored"),
707 Err(_) => warn!(trigger = %name, "{label} panicked"),
708 }
709}
710
711fn fire_caught(
712 entry: &RouteEntry,
713 session_id: &str,
714 tx_id: u64,
715 mb: &MutationBatch,
716 defer_queue: &Option<Arc<DeferralQueue>>,
717) {
718 let plugin = Arc::clone(&entry.plugin);
719 let name = entry.name.clone();
720 let mb_clone = mb.clone();
721 let session_id_owned = session_id.to_owned();
722 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
723 plugin.fire(TriggerContext::new(&session_id_owned, tx_id), &mb_clone)
724 }));
725 handle_fire_outcome(result, &name, "after-phase trigger", |until| {
726 enqueue_deferral(
727 defer_queue,
728 plugin,
729 name.clone(),
730 mb_clone,
731 session_id_owned,
732 tx_id,
733 until,
734 );
735 });
736}
737
738fn subscription_name(sub: &TriggerSubscription) -> String {
739 // `TriggerSubscription` carries no explicit name field; use the
740 // first line of the docs as a stable identifier, falling back to
741 // a generic label. Keeps `UniError::TriggerRejected` human-readable
742 // without an ABI bump for a name field on the subscription struct.
743 sub.docs
744 .lines()
745 .next()
746 .map(str::to_owned)
747 .unwrap_or_else(|| "<unnamed trigger>".to_owned())
748}
749
750// ── Mutation event extraction ──────────────────────────────────────
751
752/// In-memory, untyped event log drained from `tx_l0`. Held by value
753/// across the commit boundary and filtered per-route on dispatch.
754pub struct MutationEvents {
755 rows: Vec<MutationRow>,
756}
757
758struct MutationRow {
759 event_kind: TriggerEventMask,
760 vid_or_eid: i64,
761 /// For NODE_* events: the affected label (one row per label).
762 /// For EDGE_* events: the edge type.
763 label_or_type: String,
764 /// Pre-image properties when known (probe was supplied and the
765 /// vertex/edge existed before this tx); `None` otherwise. The
766 /// `BeforeCommit` dispatch path serializes this into the
767 /// `old_value` Arrow column.
768 old_value: Option<Vec<u8>>,
769 /// Post-mutation property map filtered to the predicate-referenced
770 /// keys; serialized into the `properties_new` LargeBinary column.
771 /// `None` when no trigger references any property.
772 new_properties: Option<Properties>,
773 /// Pre-image property map filtered to the predicate-referenced
774 /// keys; serialized into the `properties_old` LargeBinary column.
775 /// `None` when no trigger references any property or the entity
776 /// did not pre-exist.
777 old_properties: Option<Properties>,
778}
779
780/// Snapshot of the committed graph state used to (a) distinguish
781/// CREATE from UPDATE in [`MutationEvents::from_l0_with_probe`] and
782/// (b) populate `old_value` for vertex and edge mutation events.
783///
784/// Built once per commit. The cheap [`Self::from_l0_chain`] scans the
785/// writer's `L0Manager` (current L0 + pending-flush L0s) — no I/O,
786/// runs before the writer write lock is acquired. The richer
787/// [`Self::extend_with_l1`] adds an async L1 storage probe for VIDs
788/// not found in the L0 chain — closes the gap where a vertex flushed
789/// to L1 in a previous commit would otherwise be misclassified as
790/// `NODE_CREATE` on its next mutation. The L1 probe also projects
791/// every property column on the target label so the resulting
792/// `old_value` carries the same pre-image fidelity as the L0-chain
793/// path. Edge pre-images are captured via the L0 chain's
794/// `edge_properties` snapshot.
795#[derive(Default)]
796pub struct PreExistingProbe {
797 /// VIDs known to exist in committed state (with their pre-image
798 /// properties, when captured — populated by L0 probe; empty
799 /// `Properties` map when added by L1 existence probe).
800 vertices: HashMap<uni_common::Vid, Properties>,
801 /// EIDs known to exist in committed state (with their pre-image
802 /// properties, when captured — populated by L0 probe). The map
803 /// uses `Properties::default()` for entries added through an
804 /// existence-only path.
805 edges: HashMap<uni_common::Eid, Properties>,
806}
807
808impl PreExistingProbe {
809 /// Build a probe by scanning the current L0 + pending-flush L0s
810 /// for vertices/edges referenced in `tx_l0`. Properties are cloned
811 /// from the committed L0 chain.
812 ///
813 /// Only mutations actually present in `tx_l0` are probed — keeping
814 /// the work proportional to the commit's mutation count rather
815 /// than to the total graph size.
816 #[must_use]
817 pub fn from_l0_chain(l0_manager: &L0Manager, tx_l0: &L0Buffer) -> Self {
818 let mut vertices: HashMap<uni_common::Vid, Properties> = HashMap::new();
819 let mut edges: HashMap<uni_common::Eid, Properties> = HashMap::new();
820
821 let candidate_vids: Vec<uni_common::Vid> = tx_l0
822 .vertex_properties
823 .keys()
824 .copied()
825 .chain(tx_l0.vertex_tombstones.iter().copied())
826 .collect();
827 let candidate_eids: Vec<uni_common::Eid> = tx_l0
828 .edge_endpoints
829 .keys()
830 .copied()
831 .chain(tx_l0.tombstones.keys().copied())
832 .collect();
833
834 let mut probe_buffer = |buf: &L0Buffer| {
835 for vid in &candidate_vids {
836 if vertices.contains_key(vid) {
837 continue;
838 }
839 if buf.vertex_tombstones.contains(vid) {
840 continue;
841 }
842 if let Some(props) = buf.vertex_properties.get(vid) {
843 vertices.insert(*vid, props.clone());
844 }
845 }
846 for eid in &candidate_eids {
847 if edges.contains_key(eid) {
848 continue;
849 }
850 if buf.tombstones.contains_key(eid) {
851 continue;
852 }
853 if buf.edge_endpoints.contains_key(eid) {
854 let props = buf.edge_properties.get(eid).cloned().unwrap_or_default();
855 edges.insert(*eid, props);
856 }
857 }
858 };
859
860 {
861 let current = l0_manager.get_current();
862 let g = current.read();
863 probe_buffer(&g);
864 }
865 for pending in l0_manager.get_pending_flush() {
866 let g = pending.read();
867 probe_buffer(&g);
868 }
869
870 Self { vertices, edges }
871 }
872
873 /// Snapshot the (vid, label) pairs that should be probed against
874 /// L1 storage — VIDs in `tx_l0` not already marked pre-existing
875 /// by the L0 chain. Sync — must run under the `tx_l0` read lock.
876 /// Returned vector is sized by chunked-IN-list quota, ready to
877 /// hand to [`Self::extend_with_l1`] outside the lock.
878 #[must_use]
879 pub fn pending_l1_candidates(&self, tx_l0: &L0Buffer) -> Vec<(uni_common::Vid, String)> {
880 let mut out: Vec<(uni_common::Vid, String)> = Vec::new();
881 for vid in tx_l0
882 .vertex_properties
883 .keys()
884 .chain(tx_l0.vertex_tombstones.iter())
885 {
886 if self.vertices.contains_key(vid) {
887 continue;
888 }
889 let label = tx_l0
890 .vertex_labels
891 .get(vid)
892 .and_then(|labels| labels.first())
893 .cloned();
894 if let Some(label) = label {
895 out.push((*vid, label));
896 }
897 }
898 out
899 }
900
901 /// Extend an existing probe with an L1 storage scan for the
902 /// supplied `(vid, label)` candidates (typically the output of
903 /// [`Self::pending_l1_candidates`]). Async — runs outside the
904 /// tx_l0 read lock.
905 ///
906 /// Groups candidates by label, chunks each group into 1024-VID
907 /// batches, and issues one `scan_vertex_table` per chunk with a
908 /// `_vid IN (…)` filter — bounded I/O proportional to the
909 /// commit's mutation count, not the graph size. For every
910 /// returned VID, every non-vid column is converted via
911 /// [`uni_store::storage::arrow_convert::arrow_to_value`] and
912 /// stashed as the pre-image `Properties` map; this populates the
913 /// `old_value` column on `NODE_UPDATE` / `NODE_DELETE` events
914 /// emitted by [`MutationEvents::from_l0_with_probe`] for vertices
915 /// that were only visible after the last L0 flush.
916 ///
917 /// # Errors
918 ///
919 /// Per-chunk scan errors are logged and ignored — the L0 probe
920 /// already captured the high-fidelity subset, so a failed L1
921 /// probe degrades to "L1 vertices are misclassified as CREATE"
922 /// rather than failing the commit.
923 pub async fn extend_with_l1(
924 &mut self,
925 candidates: Vec<(uni_common::Vid, String)>,
926 storage: &uni_store::storage::manager::StorageManager,
927 ) {
928 use arrow_array::Array;
929 use std::collections::HashMap as StdHashMap;
930 use uni_store::storage::arrow_convert::arrow_to_value;
931 const CHUNK_SIZE: usize = 1024;
932
933 let mut by_label: StdHashMap<String, Vec<uni_common::Vid>> = StdHashMap::new();
934 for (vid, label) in candidates {
935 by_label.entry(label).or_default().push(vid);
936 }
937
938 for (label, vids) in by_label {
939 // Discover the table's full column set once per label so
940 // we can request every property (not just `_vid`).
941 let table_name = uni_store::backend::table_names::vertex_table_name(&label);
942 let column_names: Vec<String> =
943 match storage.backend().get_table_schema(&table_name).await {
944 Ok(Some(schema)) => schema.fields().iter().map(|f| f.name().clone()).collect(),
945 Ok(None) => {
946 // Table absent: nothing to probe.
947 continue;
948 }
949 Err(e) => {
950 warn!(label = %label, error = %e, "L1 pre-image probe: \
951 schema lookup failed; vids fall back to CREATE");
952 continue;
953 }
954 };
955 // Always include `_vid`; the column-filter inside
956 // `scan_vertex_table` is permissive about missing columns,
957 // so passing every name from the schema is safe.
958 let col_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect();
959
960 for chunk in vids.chunks(CHUNK_SIZE) {
961 let in_list = chunk
962 .iter()
963 .map(|v| v.as_u64().to_string())
964 .collect::<Vec<_>>()
965 .join(",");
966 let filter = format!("_vid IN ({in_list})");
967 let batch = match storage
968 .scan_vertex_table(&label, &col_refs, Some(&filter))
969 .await
970 {
971 Ok(Some(b)) => b,
972 Ok(None) => continue,
973 Err(e) => {
974 warn!(label = %label, error = %e, "L1 pre-image probe failed; \
975 affected vids fall back to NODE_CREATE classification");
976 continue;
977 }
978 };
979 let vid_col = match batch
980 .column_by_name("_vid")
981 .and_then(|c| c.as_any().downcast_ref::<arrow_array::UInt64Array>())
982 {
983 Some(c) => c,
984 None => {
985 warn!(label = %label, "L1 probe returned batch without _vid column");
986 continue;
987 }
988 };
989 // Cache (column_index, column_name) pairs for the
990 // per-row property assembly. Skip storage-internal
991 // columns (`_vid`, `_version`, `_label`) — user
992 // properties are everything else.
993 let schema = batch.schema();
994 let property_cols: Vec<(usize, String)> = schema
995 .fields()
996 .iter()
997 .enumerate()
998 .filter_map(|(idx, f)| {
999 let name = f.name();
1000 if name == "_vid"
1001 || name == "_version"
1002 || name == "_label"
1003 || name == "_labels"
1004 {
1005 None
1006 } else {
1007 Some((idx, name.clone()))
1008 }
1009 })
1010 .collect();
1011
1012 for row in 0..vid_col.len() {
1013 if vid_col.is_null(row) {
1014 continue;
1015 }
1016 let raw = vid_col.value(row);
1017 let vid = uni_common::Vid::from(raw);
1018 let mut props = Properties::new();
1019 for (col_idx, col_name) in &property_cols {
1020 let col = batch.column(*col_idx);
1021 let value = arrow_to_value(col.as_ref(), row, None);
1022 if !matches!(value, uni_common::Value::Null) {
1023 props.insert(col_name.clone(), value);
1024 }
1025 }
1026 // First insert wins: L0-chain entries always come
1027 // first and may already hold richer pre-image data.
1028 self.vertices.entry(vid).or_insert(props);
1029 }
1030 }
1031 }
1032 }
1033
1034 /// True if `vid` was visible in committed state before this tx.
1035 #[must_use]
1036 pub fn vertex_pre_existed(&self, vid: uni_common::Vid) -> bool {
1037 self.vertices.contains_key(&vid)
1038 }
1039
1040 /// True if `eid` was visible in committed state before this tx.
1041 #[must_use]
1042 pub fn edge_pre_existed(&self, eid: uni_common::Eid) -> bool {
1043 self.edges.contains_key(&eid)
1044 }
1045
1046 fn edge_old_bytes(&self, eid: uni_common::Eid) -> Option<Vec<u8>> {
1047 self.edges.get(&eid).map(serialize_properties)
1048 }
1049
1050 fn vertex_old_bytes(&self, vid: uni_common::Vid) -> Option<Vec<u8>> {
1051 self.vertices.get(&vid).map(serialize_properties)
1052 }
1053
1054 /// Borrow the captured pre-image properties for `vid`, when the
1055 /// vertex pre-existed in committed state. Used by
1056 /// [`MutationEvents::from_l0_with_probe`] to populate the
1057 /// `properties_old` event-row column with the subset of keys any
1058 /// trigger predicate references.
1059 #[must_use]
1060 pub fn vertex_properties(&self, vid: uni_common::Vid) -> Option<&Properties> {
1061 self.vertices.get(&vid)
1062 }
1063
1064 /// Borrow the captured pre-image properties for `eid`, when the
1065 /// edge pre-existed in committed state.
1066 #[must_use]
1067 pub fn edge_properties(&self, eid: uni_common::Eid) -> Option<&Properties> {
1068 self.edges.get(&eid)
1069 }
1070}
1071
1072/// Serialize a `Properties` map into a stable byte representation for
1073/// the trigger event row's `old_value` column. Uses JSON for now —
1074/// matches the codec other plugin surfaces use for `CypherValue`
1075/// payloads and keeps the bytes inspectable in trigger plugins
1076/// without pulling a bespoke decoder.
1077fn serialize_properties(props: &Properties) -> Vec<u8> {
1078 serde_json::to_vec(props).unwrap_or_default()
1079}
1080
1081impl MutationEvents {
1082 /// Drain the tx-private L0 buffer into a typed event log without a
1083 /// committed-state probe. Every non-tombstoned write emits an
1084 /// `UPDATE` event; `old_value` is `None`. Equivalent to
1085 /// [`Self::from_l0_with_probe`] with `probe = None`.
1086 #[must_use]
1087 pub fn from_l0(l0: &L0Buffer) -> Self {
1088 Self::from_l0_with_probe(l0, None, &HashSet::new())
1089 }
1090
1091 /// Drain the tx-private L0 buffer into a typed event log.
1092 ///
1093 /// When `probe` is supplied, the probe distinguishes CREATE from
1094 /// UPDATE per-VID/EID and supplies the pre-image bytes used to
1095 /// populate `old_value` for `BeforeCommit` triggers. When `probe`
1096 /// is `None`, every write emits `UPDATE` and `old_value` stays
1097 /// `None` (legacy behavior — kept for callers that don't yet
1098 /// build a probe).
1099 ///
1100 /// Multi-label vertices emit one row per label so a label-filtered
1101 /// trigger fires exactly once per (vid, matching-label) pair.
1102 /// Vertices with no labels emit a single row with an empty label
1103 /// so unfiltered triggers still observe them.
1104 #[must_use]
1105 pub fn from_l0_with_probe(
1106 l0: &L0Buffer,
1107 probe: Option<&PreExistingProbe>,
1108 properties_referenced: &HashSet<String>,
1109 ) -> Self {
1110 let mut rows: Vec<MutationRow> = Vec::with_capacity(l0.mutation_count);
1111 let track_props = !properties_referenced.is_empty();
1112
1113 // Extract the subset of `props` whose keys appear in
1114 // `properties_referenced`. Returns `None` when nothing is
1115 // tracked or no referenced key is present, keeping the column
1116 // null for property-free triggers.
1117 let filtered = |props: &Properties| -> Option<Properties> {
1118 if !track_props {
1119 return None;
1120 }
1121 let mut out: Properties = Properties::new();
1122 for k in properties_referenced {
1123 if let Some(v) = props.get(k) {
1124 out.insert(k.clone(), v.clone());
1125 }
1126 }
1127 // Always emit the (possibly empty) bag when properties are
1128 // tracked so the predicate sees a Map rather than NULL
1129 // (which would short-circuit `index` to NULL and risk
1130 // type-coercion surprises in `>` / `<>` comparisons).
1131 Some(out)
1132 };
1133
1134 // Vertex writes — CREATE if the probe says the vid didn't
1135 // pre-exist, UPDATE otherwise. Legacy callers with no probe
1136 // get UPDATE for every write.
1137 for (vid, props) in &l0.vertex_properties {
1138 if l0.vertex_tombstones.contains(vid) {
1139 continue;
1140 }
1141 let id = vid_to_i64(*vid);
1142 let labels = l0.vertex_labels.get(vid);
1143 let (kind, old, old_props_map) = match probe {
1144 Some(p) if p.vertex_pre_existed(*vid) => (
1145 TriggerEventMask::NODE_UPDATE,
1146 p.vertex_old_bytes(*vid),
1147 p.vertex_properties(*vid).and_then(&filtered),
1148 ),
1149 Some(_) => (TriggerEventMask::NODE_CREATE, None, None),
1150 None => (TriggerEventMask::NODE_UPDATE, None, None),
1151 };
1152 let new_props_map = filtered(props);
1153 match labels {
1154 Some(ls) if !ls.is_empty() => {
1155 for l in ls {
1156 rows.push(MutationRow {
1157 event_kind: kind,
1158 vid_or_eid: id,
1159 label_or_type: l.clone(),
1160 old_value: old.clone(),
1161 new_properties: new_props_map.clone(),
1162 old_properties: old_props_map.clone(),
1163 });
1164 }
1165 }
1166 _ => {
1167 rows.push(MutationRow {
1168 event_kind: kind,
1169 vid_or_eid: id,
1170 label_or_type: String::new(),
1171 old_value: old,
1172 new_properties: new_props_map,
1173 old_properties: old_props_map,
1174 });
1175 }
1176 }
1177 }
1178
1179 // Vertex deletes. `old_value` is the pre-tx property image when
1180 // the probe captured it (the row is about to disappear).
1181 for vid in &l0.vertex_tombstones {
1182 let id = vid_to_i64(*vid);
1183 let labels = l0.vertex_labels.get(vid);
1184 let old = probe.and_then(|p| p.vertex_old_bytes(*vid));
1185 let old_props_map = probe
1186 .and_then(|p| p.vertex_properties(*vid))
1187 .and_then(&filtered);
1188 match labels {
1189 Some(ls) if !ls.is_empty() => {
1190 for l in ls {
1191 rows.push(MutationRow {
1192 event_kind: TriggerEventMask::NODE_DELETE,
1193 vid_or_eid: id,
1194 label_or_type: l.clone(),
1195 old_value: old.clone(),
1196 new_properties: None,
1197 old_properties: old_props_map.clone(),
1198 });
1199 }
1200 }
1201 _ => {
1202 rows.push(MutationRow {
1203 event_kind: TriggerEventMask::NODE_DELETE,
1204 vid_or_eid: id,
1205 label_or_type: String::new(),
1206 old_value: old,
1207 new_properties: None,
1208 old_properties: old_props_map,
1209 });
1210 }
1211 }
1212 }
1213
1214 // Edge writes — CREATE if not pre-existing, else UPDATE.
1215 // `old_value` carries the pre-image edge properties for UPDATE
1216 // and is `None` for CREATE.
1217 for eid in l0.edge_endpoints.keys() {
1218 if l0.tombstones.contains_key(eid) {
1219 continue;
1220 }
1221 let etype = l0.edge_types.get(eid).cloned().unwrap_or_default();
1222 let (kind, old, old_props_map) = match probe {
1223 Some(p) if p.edge_pre_existed(*eid) => (
1224 TriggerEventMask::EDGE_UPDATE,
1225 p.edge_old_bytes(*eid),
1226 p.edge_properties(*eid).and_then(&filtered),
1227 ),
1228 Some(_) => (TriggerEventMask::EDGE_CREATE, None, None),
1229 None => (TriggerEventMask::EDGE_UPDATE, None, None),
1230 };
1231 let new_props_map = l0.edge_properties.get(eid).and_then(&filtered);
1232 rows.push(MutationRow {
1233 event_kind: kind,
1234 vid_or_eid: eid_to_i64(*eid),
1235 label_or_type: etype,
1236 old_value: old,
1237 new_properties: new_props_map,
1238 old_properties: old_props_map,
1239 });
1240 }
1241
1242 // Edge deletes. `old_value` is the pre-tx property image when
1243 // the probe captured it.
1244 for (eid, ts) in &l0.tombstones {
1245 let etype = l0
1246 .edge_types
1247 .get(eid)
1248 .cloned()
1249 .unwrap_or_else(|| format!("type:{}", ts.edge_type));
1250 let old = probe.and_then(|p| p.edge_old_bytes(*eid));
1251 let old_props_map = probe
1252 .and_then(|p| p.edge_properties(*eid))
1253 .and_then(&filtered);
1254 rows.push(MutationRow {
1255 event_kind: TriggerEventMask::EDGE_DELETE,
1256 vid_or_eid: eid_to_i64(*eid),
1257 label_or_type: etype,
1258 old_value: old,
1259 new_properties: None,
1260 old_properties: old_props_map,
1261 });
1262 }
1263
1264 Self { rows }
1265 }
1266
1267 /// Project every captured row into the canonical [`event_row_schema`]
1268 /// `RecordBatch`, with no per-route filtering and no predicate.
1269 ///
1270 /// Used by the CDC delivery path to hand subscribers the full
1271 /// stream of mutations for a committed transaction (M11 FU-4). The
1272 /// per-trigger filtered shape is built by `Self::filter_for`.
1273 ///
1274 /// Returns `None` when there are zero rows (lets callers skip
1275 /// constructing an empty `CdcBatch`).
1276 #[must_use]
1277 pub fn materialize_all(&self) -> Option<RecordBatch> {
1278 if self.rows.is_empty() {
1279 return None;
1280 }
1281 let mut cols = EventRowColumns::with_capacity(self.rows.len());
1282 for row in &self.rows {
1283 cols.push_row(row);
1284 }
1285 cols.into_batch()
1286 }
1287
1288 /// Filter rows matching `entry`'s subscription selectors and
1289 /// project them into the §4.18 RecordBatch shape. Returns `None`
1290 /// if no rows match (caller skips the `fire` call).
1291 fn filter_for(&self, entry: &RouteEntry) -> Option<RecordBatch> {
1292 // property_filter is satisfied vacuously here — per-property
1293 // event-row population (one row per (vid, property) write) is
1294 // not the chosen surface; predicate authors instead reference
1295 // `n.<prop>` directly and the property-bag column resolves it
1296 // through `index`.
1297 let _ = &entry.property_filter;
1298 let mut cols = EventRowColumns::default();
1299 for row in &self.rows {
1300 if entry.matches(row.event_kind, &row.label_or_type) {
1301 cols.push_row(row);
1302 }
1303 }
1304 let batch = cols.into_batch()?;
1305
1306 // Apply the compiled `predicate_source` boolean mask if any.
1307 // Evaluation failures degrade safely to "no rows match" — the
1308 // predicate was already validated at router build, so failures
1309 // here imply an Arrow/DataFusion bug we'd rather skip than
1310 // surface as a commit error.
1311 let batch = match &entry.compiled_predicate {
1312 Some(predicate) => apply_predicate(predicate, batch)?,
1313 None => batch,
1314 };
1315
1316 if batch.num_rows() == 0 {
1317 return None;
1318 }
1319 Some(batch)
1320 }
1321}
1322
1323/// Column-oriented builder for the canonical event-row [`RecordBatch`]
1324/// produced by [`MutationEvents::materialize_all`] and
1325/// [`MutationEvents::filter_for`]. Keeps the per-column allocation +
1326/// per-row push logic in one place so the two callers stay in lockstep.
1327#[derive(Default)]
1328struct EventRowColumns {
1329 kinds: Vec<u8>,
1330 ids: Vec<i64>,
1331 labels: Vec<String>,
1332 properties: Vec<String>,
1333 olds: Vec<Option<Vec<u8>>>,
1334 news: Vec<Option<Vec<u8>>>,
1335 props_new: Vec<Option<Vec<u8>>>,
1336 props_old: Vec<Option<Vec<u8>>>,
1337}
1338
1339impl EventRowColumns {
1340 fn with_capacity(cap: usize) -> Self {
1341 Self {
1342 kinds: Vec::with_capacity(cap),
1343 ids: Vec::with_capacity(cap),
1344 labels: Vec::with_capacity(cap),
1345 properties: Vec::with_capacity(cap),
1346 olds: Vec::with_capacity(cap),
1347 news: Vec::with_capacity(cap),
1348 props_new: Vec::with_capacity(cap),
1349 props_old: Vec::with_capacity(cap),
1350 }
1351 }
1352
1353 fn push_row(&mut self, row: &MutationRow) {
1354 self.kinds.push(mask_to_discriminant(row.event_kind));
1355 self.ids.push(row.vid_or_eid);
1356 self.labels.push(row.label_or_type.clone());
1357 self.properties.push(String::new());
1358 self.olds.push(row.old_value.clone());
1359 self.news.push(None);
1360 self.props_new.push(
1361 row.new_properties
1362 .as_ref()
1363 .map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
1364 );
1365 self.props_old.push(
1366 row.old_properties
1367 .as_ref()
1368 .map(|m| cypher_value_codec::encode(&Value::Map(m.clone()))),
1369 );
1370 }
1371
1372 /// Materialize the columns into a `RecordBatch`. Returns `None`
1373 /// when zero rows were collected (callers skip the empty case).
1374 fn into_batch(self) -> Option<RecordBatch> {
1375 if self.kinds.is_empty() {
1376 return None;
1377 }
1378 let kind_arr: Arc<dyn arrow_array::Array> = Arc::new(UInt8Array::from(self.kinds));
1379 let id_arr: Arc<dyn arrow_array::Array> = Arc::new(Int64Array::from(self.ids));
1380 let label_arr: Arc<dyn arrow_array::Array> =
1381 Arc::new(arrow_array::StringArray::from(self.labels));
1382 let prop_arr: Arc<dyn arrow_array::Array> =
1383 Arc::new(arrow_array::StringArray::from(self.properties));
1384 let olds_iter: Vec<Option<&[u8]>> = self.olds.iter().map(|o| o.as_deref()).collect();
1385 let news_iter: Vec<Option<&[u8]>> = self.news.iter().map(|o| o.as_deref()).collect();
1386 let old_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(olds_iter));
1387 let new_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(news_iter));
1388 let pnew_iter: Vec<Option<&[u8]>> = self.props_new.iter().map(|o| o.as_deref()).collect();
1389 let pold_iter: Vec<Option<&[u8]>> = self.props_old.iter().map(|o| o.as_deref()).collect();
1390 let pnew_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pnew_iter));
1391 let pold_arr: Arc<dyn arrow_array::Array> = Arc::new(LargeBinaryArray::from(pold_iter));
1392
1393 RecordBatch::try_new(
1394 event_row_schema(),
1395 vec![
1396 kind_arr, id_arr, label_arr, prop_arr, old_arr, new_arr, pnew_arr, pold_arr,
1397 ],
1398 )
1399 .ok()
1400 }
1401}
1402
1403/// Run a compiled trigger predicate against the candidate batch,
1404/// returning `Some(filtered_batch)` when at least one row passes and
1405/// `None` when the predicate eliminates every row or the evaluation
1406/// fails (logged at warn level, treated as "no match" to avoid
1407/// silently firing on rows the predicate would have rejected).
1408fn apply_predicate(predicate: &Arc<dyn PhysicalExpr>, batch: RecordBatch) -> Option<RecordBatch> {
1409 use datafusion::arrow::compute::filter_record_batch;
1410 use datafusion::logical_expr::ColumnarValue;
1411
1412 let value = match predicate.evaluate(&batch) {
1413 Ok(v) => v,
1414 Err(e) => {
1415 warn!(error = %e, "trigger predicate evaluation failed; dropping batch");
1416 return None;
1417 }
1418 };
1419 let array = match value {
1420 ColumnarValue::Array(a) => a,
1421 ColumnarValue::Scalar(s) => match s.to_array_of_size(batch.num_rows()) {
1422 Ok(a) => a,
1423 Err(e) => {
1424 warn!(error = %e, "trigger predicate scalar→array failed");
1425 return None;
1426 }
1427 },
1428 };
1429 let bool_arr = match array.as_any().downcast_ref::<BooleanArray>() {
1430 Some(b) => b,
1431 None => {
1432 warn!("trigger predicate must yield Boolean; dropping batch");
1433 return None;
1434 }
1435 };
1436 filter_record_batch(&batch, bool_arr).ok()
1437}
1438
1439fn mask_to_discriminant(m: TriggerEventMask) -> u8 {
1440 // 1-based bit position of the lowest set bit (e.g. `0b001 → 1`,
1441 // `0b100 → 3`); falls back to 0 when no bit is set. Emitted rows
1442 // always carry exactly one bit, so the lowest set bit is *the* bit.
1443 if m.0 == 0 {
1444 return 0;
1445 }
1446 m.0.trailing_zeros() as u8 + 1
1447}
1448
1449fn vid_to_i64(vid: uni_common::Vid) -> i64 {
1450 // Vid is a newtype around a u64; reinterpret-cast preserves bits.
1451 vid.as_u64() as i64
1452}
1453
1454fn eid_to_i64(eid: uni_common::Eid) -> i64 {
1455 eid.as_u64() as i64
1456}
1457
1458// ── M11 deferral queue (memory-backed v1) ──────────────────────────
1459
1460/// Maximum number of times a `TriggerOutcome::Defer` will be re-queued
1461/// before the queue gives up and drops the item with a warning. Caps
1462/// the worst case for a pathological plugin that always returns
1463/// `Defer` from cascading.
1464const DEFER_MAX_ATTEMPTS: u32 = 10;
1465
1466struct DeferredItem {
1467 plugin: Arc<dyn TriggerPlugin>,
1468 name: String,
1469 batch: MutationBatch,
1470 session_id: String,
1471 tx_id: u64,
1472 attempts: u32,
1473 /// `TriggerDeferral::payload` passed back to
1474 /// [`TriggerPlugin::on_deferred`] when this item fires (FU-5).
1475 payload: String,
1476}
1477
1478/// In-memory deferral queue for `TriggerOutcome::Defer`.
1479///
1480/// Items are keyed by their scheduled fire instant in a `BTreeMap`,
1481/// so `drain_due` pops the next-due slot in O(log n). The queue is
1482/// drained by a per-`Uni` background tick task spawned at DB build
1483/// time; firing happens on the tokio runtime.
1484///
1485/// **v1 limitations** (in-memory only):
1486/// - Restart drops queued items. A persistent queue (system-table or
1487/// WAL extension) is `TODO(M11-persist)`.
1488/// - No transactional guarantee that a deferred item eventually fires
1489/// — if the process exits before the scheduled instant, the item is
1490/// lost.
1491/// - Per-item retry is capped at `DEFER_MAX_ATTEMPTS` to prevent
1492/// runaway re-deferral loops.
1493#[derive(Default)]
1494pub struct DeferralQueue {
1495 inner: parking_lot::Mutex<BTreeMap<StdInstant, Vec<DeferredItem>>>,
1496 /// Optional JSON-sidecar persistence (FU-5). When set, every
1497 /// `push` mirrors the queue state to disk and every `drain_due`
1498 /// rewrites the sidecar so a crash-restart can re-load the queue
1499 /// state. The persistence sink resolves [`TriggerPlugin`]s by qname
1500 /// from the host's [`uni_plugin::PluginRegistry`] at load time.
1501 sidecar: parking_lot::Mutex<Option<DeferralSidecar>>,
1502}
1503
1504impl std::fmt::Debug for DeferralQueue {
1505 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1506 let len: usize = self.inner.lock().values().map(|v| v.len()).sum();
1507 f.debug_struct("DeferralQueue").field("size", &len).finish()
1508 }
1509}
1510
1511impl DeferralQueue {
1512 /// Build a fresh empty queue.
1513 #[must_use]
1514 pub fn new() -> Arc<Self> {
1515 Arc::new(Self::default())
1516 }
1517
1518 /// Build a queue with JSON-sidecar persistence rooted at
1519 /// `<data_path>/_system/deferred_triggers.json`.
1520 ///
1521 /// On startup the queue's `load` method walks the sidecar and
1522 /// re-binds each row to its `TriggerPlugin` by qname via the
1523 /// supplied [`uni_plugin::PluginRegistry`]. Items whose plugin
1524 /// can no longer be resolved are dropped with a warn.
1525 ///
1526 /// Persists on every `push` and after every `drain_due` (FU-5).
1527 /// I/O failures degrade to debug logs — in-memory queue state
1528 /// remains authoritative for the running process.
1529 #[must_use]
1530 pub fn with_persistence(data_path: std::path::PathBuf) -> Arc<Self> {
1531 let queue = Arc::new(Self::default());
1532 *queue.sidecar.lock() = Some(DeferralSidecar::new(data_path));
1533 queue
1534 }
1535
1536 /// Borrow the sidecar path, if persistence is enabled.
1537 #[must_use]
1538 pub fn sidecar_path(&self) -> Option<std::path::PathBuf> {
1539 self.sidecar.lock().as_ref().map(|s| s.path().to_path_buf())
1540 }
1541
1542 /// Replay persisted items from the sidecar, re-binding each row's
1543 /// trigger qname against the registry. Should be called once
1544 /// after `Uni::build` finishes wiring triggers but before the
1545 /// queue tick task starts. Idempotent.
1546 ///
1547 /// Returns the number of items reloaded.
1548 pub fn load_from_sidecar(
1549 self: &Arc<Self>,
1550 registry: &Arc<uni_plugin::PluginRegistry>,
1551 ) -> usize {
1552 let Some(sidecar) = self.sidecar.lock().clone() else {
1553 return 0;
1554 };
1555 let now_wall = std::time::SystemTime::now();
1556 let now_mono = StdInstant::now();
1557 let rows = match sidecar.read_all() {
1558 Ok(rows) => rows,
1559 Err(e) => {
1560 tracing::debug!(error = %e, "DeferralQueue: sidecar read failed");
1561 return 0;
1562 }
1563 };
1564 let mut restored = 0usize;
1565 for row in rows {
1566 let Some(entry) = registry
1567 .triggers()
1568 .iter()
1569 .find(|t| subscription_name(t.subscription()) == row.name)
1570 .cloned()
1571 else {
1572 tracing::warn!(
1573 trigger = %row.name,
1574 "DeferralQueue: dropping persisted item; trigger no longer registered"
1575 );
1576 continue;
1577 };
1578 // Re-decode the persisted MutationBatch from Arrow IPC.
1579 let batch = match arrow_ipc_decode(&row.batch_ipc) {
1580 Ok(b) => b,
1581 Err(e) => {
1582 tracing::warn!(error = %e, "DeferralQueue: drop persisted item; IPC decode failed");
1583 continue;
1584 }
1585 };
1586 // Translate the persisted wall-clock fire_at to a monotonic
1587 // Instant relative to current time. Past-due fire-ats
1588 // collapse to "now" so they fire on the next tick.
1589 let fire_at_wall = std::time::UNIX_EPOCH + Duration::from_millis(row.fire_at_epoch_ms);
1590 let mono_delta = fire_at_wall
1591 .duration_since(now_wall)
1592 .unwrap_or(Duration::ZERO);
1593 let fire_at_mono = now_mono + mono_delta;
1594 let item = DeferredItem {
1595 plugin: entry,
1596 name: row.name,
1597 batch: MutationBatch {
1598 events: Arc::new(batch),
1599 },
1600 session_id: row.session_id,
1601 tx_id: row.tx_id,
1602 attempts: row.attempts,
1603 payload: row.payload,
1604 };
1605 self.inner
1606 .lock()
1607 .entry(fire_at_mono)
1608 .or_default()
1609 .push(item);
1610 restored += 1;
1611 }
1612 restored
1613 }
1614
1615 /// Persist the current queue state to the sidecar (no-op when
1616 /// persistence is disabled). I/O errors degrade to debug log.
1617 fn persist_locked(
1618 &self,
1619 guard: &parking_lot::MutexGuard<'_, BTreeMap<StdInstant, Vec<DeferredItem>>>,
1620 ) {
1621 let Some(sidecar) = self.sidecar.lock().clone() else {
1622 return;
1623 };
1624 let now_wall = std::time::SystemTime::now();
1625 let now_mono = StdInstant::now();
1626 let mut rows: Vec<PersistedDeferral> = Vec::new();
1627 for (fire_at_mono, items) in guard.iter() {
1628 for item in items {
1629 // Convert the monotonic Instant back to wall-clock by
1630 // measuring the delta against `now` and offsetting
1631 // `now_wall`. Past-due items get a fire_at slightly
1632 // before `now_wall` so they fire immediately on
1633 // restart.
1634 let fire_at_wall = if *fire_at_mono <= now_mono {
1635 now_wall
1636 } else {
1637 now_wall + fire_at_mono.duration_since(now_mono)
1638 };
1639 let fire_at_epoch_ms = fire_at_wall
1640 .duration_since(std::time::UNIX_EPOCH)
1641 .map(|d| d.as_millis() as u64)
1642 .unwrap_or(0);
1643 let batch_ipc = match arrow_ipc_encode(&item.batch.events) {
1644 Ok(b) => b,
1645 Err(e) => {
1646 tracing::debug!(error = %e, "DeferralQueue: IPC encode failed; skipping row");
1647 continue;
1648 }
1649 };
1650 rows.push(PersistedDeferral {
1651 name: item.name.clone(),
1652 session_id: item.session_id.clone(),
1653 tx_id: item.tx_id,
1654 attempts: item.attempts,
1655 payload: item.payload.clone(),
1656 batch_ipc,
1657 fire_at_epoch_ms,
1658 });
1659 }
1660 }
1661 if let Err(e) = sidecar.write_all(&rows) {
1662 tracing::debug!(error = %e, "DeferralQueue: sidecar write failed");
1663 }
1664 }
1665
1666 fn push(&self, item: DeferredItem, fire_at: StdInstant) {
1667 let mut guard = self.inner.lock();
1668 guard.entry(fire_at).or_default().push(item);
1669 self.persist_locked(&guard);
1670 }
1671
1672 /// Pop every item whose scheduled fire instant is `<= now`.
1673 fn drain_due(&self, now: StdInstant) -> Vec<DeferredItem> {
1674 let mut guard = self.inner.lock();
1675 let mut due = Vec::new();
1676 // BTreeMap::split_off gives us [now+ε..) so we keep that half
1677 // and the front half is everything ≤ now.
1678 let mut to_keep = guard.split_off(&(now + Duration::from_nanos(1)));
1679 std::mem::swap(&mut *guard, &mut to_keep);
1680 for (_, mut items) in to_keep {
1681 due.append(&mut items);
1682 }
1683 // FU-5: persist the remaining queue state after each drain so
1684 // a restart sees only the still-pending items.
1685 self.persist_locked(&guard);
1686 due
1687 }
1688
1689 /// Approximate pending count — for diagnostics / tests.
1690 #[must_use]
1691 pub fn pending(&self) -> usize {
1692 self.inner.lock().values().map(|v| v.len()).sum()
1693 }
1694
1695 /// Tick the queue once: drain due items, fire each. Items that
1696 /// re-defer are re-enqueued until `DEFER_MAX_ATTEMPTS`. Async
1697 /// because plugin `fire` may block the runtime; we re-enter the
1698 /// tokio executor between items via `spawn_blocking` -- but since
1699 /// most triggers are CPU-light, the inline call here is fine for
1700 /// v1.
1701 pub fn tick(self: &Arc<Self>) {
1702 let due = self.drain_due(StdInstant::now());
1703 for mut item in due {
1704 // FU-5: invoke the dedicated `on_deferred` callback so
1705 // trigger plugins can receive the original `payload`.
1706 // The default impl on the trait delegates back to `fire`,
1707 // so existing trigger plugins keep working unchanged.
1708 let outcome = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1709 item.plugin.on_deferred(
1710 TriggerContext::new(&item.session_id, item.tx_id),
1711 &item.batch,
1712 &item.payload,
1713 )
1714 }));
1715 let name = item.name.clone();
1716 handle_fire_outcome(outcome, &name, "deferred trigger", |until| {
1717 item.attempts += 1;
1718 if item.attempts >= DEFER_MAX_ATTEMPTS {
1719 warn!(
1720 trigger = %item.name,
1721 attempts = item.attempts,
1722 "deferred trigger exceeded DEFER_MAX_ATTEMPTS; dropping"
1723 );
1724 return;
1725 }
1726 // FU-5: honor the new `delay` field when re-deferring.
1727 // `None` falls back to "next tick" — matches the legacy
1728 // semantics. The trigger may have updated the payload on
1729 // re-defer; propagate the new one.
1730 let fire_at = StdInstant::now() + until.delay.unwrap_or(Duration::ZERO);
1731 item.payload = until.payload;
1732 self.push(item, fire_at);
1733 });
1734 }
1735 }
1736}
1737
1738// ── Helpers used by `Transaction::commit` ──────────────────────────
1739
1740/// Convenience: stable-hash a `&str` tx id (commit path stores tx_id
1741/// as `String`) down to the `u64` the `TriggerContext` carries.
1742#[must_use]
1743pub fn tx_id_to_u64(tx_id: &str) -> u64 {
1744 use std::collections::hash_map::DefaultHasher;
1745 use std::hash::{Hash, Hasher};
1746 let mut hasher = DefaultHasher::new();
1747 tx_id.hash(&mut hasher);
1748 hasher.finish()
1749}
1750
1751// ── FU-5: persisted deferral sidecar ──────────────────────────────
1752
1753/// On-disk row in `<data_path>/_system/deferred_triggers.json`.
1754///
1755/// `batch_ipc` is the trigger's [`MutationBatch`] encoded as Arrow
1756/// IPC stream bytes — preserves schema + values across restarts. The
1757/// `name` is the trigger's `subscription_name`, which the host's
1758/// re-resolution path uses to find the registered `TriggerPlugin`.
1759#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
1760struct PersistedDeferral {
1761 name: String,
1762 session_id: String,
1763 tx_id: u64,
1764 attempts: u32,
1765 payload: String,
1766 /// Arrow IPC stream bytes for the [`MutationBatch::events`]
1767 /// `RecordBatch`.
1768 #[serde(with = "serde_bytes")]
1769 batch_ipc: Vec<u8>,
1770 /// Wall-clock fire instant, milliseconds since UNIX epoch.
1771 fire_at_epoch_ms: u64,
1772}
1773
1774/// Atomic JSON-sidecar persistence handle for the deferral queue.
1775#[derive(Clone, Debug)]
1776struct DeferralSidecar {
1777 sidecar: uni_sidecar::VecSidecar<PersistedDeferral>,
1778}
1779
1780impl DeferralSidecar {
1781 /// Construct rooted at `<data_path>/_system/deferred_triggers.json`.
1782 fn new(data_path: std::path::PathBuf) -> Self {
1783 Self {
1784 sidecar: uni_sidecar::VecSidecar::new(data_path, "deferred_triggers.json"),
1785 }
1786 }
1787
1788 /// Borrow the resolved sidecar path (for diagnostics).
1789 fn path(&self) -> &std::path::Path {
1790 self.sidecar.path()
1791 }
1792
1793 fn read_all(&self) -> Result<Vec<PersistedDeferral>, String> {
1794 self.sidecar.load().map_err(|e| e.to_string())
1795 }
1796
1797 fn write_all(&self, rows: &[PersistedDeferral]) -> Result<(), String> {
1798 self.sidecar.store(rows).map_err(|e| e.to_string())
1799 }
1800}
1801
1802/// Encode a `RecordBatch` as Arrow IPC stream bytes (FU-5).
1803fn arrow_ipc_encode(batch: &arrow_array::RecordBatch) -> Result<Vec<u8>, String> {
1804 let schema = batch.schema();
1805 let mut buf: Vec<u8> = Vec::with_capacity(2048);
1806 {
1807 let mut w = arrow_ipc::writer::StreamWriter::try_new(&mut buf, schema.as_ref())
1808 .map_err(|e| format!("ipc writer: {e}"))?;
1809 w.write(batch).map_err(|e| format!("ipc write: {e}"))?;
1810 w.finish().map_err(|e| format!("ipc finish: {e}"))?;
1811 }
1812 Ok(buf)
1813}
1814
1815/// Decode Arrow IPC stream bytes into a single `RecordBatch` (FU-5).
1816fn arrow_ipc_decode(bytes: &[u8]) -> Result<arrow_array::RecordBatch, String> {
1817 let reader = arrow_ipc::reader::StreamReader::try_new(bytes, None)
1818 .map_err(|e| format!("ipc reader: {e}"))?;
1819 let batches: Vec<arrow_array::RecordBatch> = reader
1820 .collect::<Result<Vec<_>, _>>()
1821 .map_err(|e| format!("ipc collect: {e}"))?;
1822 batches
1823 .into_iter()
1824 .next()
1825 .ok_or_else(|| "ipc decode: empty stream".to_owned())
1826}
1827
1828// ── Tests ──────────────────────────────────────────────────────────
1829
1830#[cfg(test)]
1831mod tests {
1832 use super::*;
1833 use uni_plugin::traits::trigger::TriggerEventMask;
1834
1835 #[test]
1836 fn mask_discriminants_are_stable() {
1837 assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_CREATE), 1);
1838 assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_UPDATE), 2);
1839 assert_eq!(mask_to_discriminant(TriggerEventMask::NODE_DELETE), 3);
1840 assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_CREATE), 4);
1841 assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_UPDATE), 5);
1842 assert_eq!(mask_to_discriminant(TriggerEventMask::EDGE_DELETE), 6);
1843 }
1844
1845 #[test]
1846 fn empty_router_is_empty() {
1847 let by_phase = [Vec::new(), Vec::new(), Vec::new(), Vec::new()];
1848 let router = TriggerRouter {
1849 by_phase,
1850 defer_queue: None,
1851 };
1852 assert!(router.is_empty());
1853 }
1854
1855 #[test]
1856 fn tx_id_to_u64_is_deterministic() {
1857 let a = tx_id_to_u64("tx-1");
1858 let b = tx_id_to_u64("tx-1");
1859 let c = tx_id_to_u64("tx-2");
1860 assert_eq!(a, b);
1861 assert_ne!(a, c);
1862 }
1863}