Skip to main content

dataflow_rs/engine/
executor.rs

1//! # Evaluation Primitives
2//!
3//! Free functions and types that support JSONLogic evaluation in the engine.
4//! Built-in function execution lives on each config type (`MapConfig::execute`,
5//! `ValidationConfig::execute`, …) — this module just provides the shared
6//! evaluation machinery they all build on.
7//!
8//! The bump arena is held in a thread-local cell on each Tokio worker. Per
9//! call, the arena is rewound via `Bump::reset` (constant-time, retains chunks)
10//! before the eval. Chunks accumulate to fit the workload's high-water mark
11//! and persist across calls — no per-task malloc/free churn. Profiling
12//! showed per-task `Bump::with_capacity` malloc was the dominant cost when
13//! arena sizing was tuned for realistic workloads; thread-local reuse
14//! amortizes that to zero in steady state.
15//!
16//! `ArenaContext` (below) extends this further for **mutating** tasks (map):
17//! the message context is `to_arena`'d once per task call into a depth‑2
18//! cache, and subsequent writes only re‑arena the dirtied subtree — typically
19//! `data.MT103` while the heavy `data.input` stays cached.
20
21use crate::engine::error::Result;
22use bumpalo::Bump;
23use datalogic_rs::{Engine, Logic};
24use datavalue::{DataValue, OwnedDataValue};
25use log::error;
26use std::cell::RefCell;
27use std::sync::Arc;
28
29/// Initial bump arena capacity per worker thread. Sized to cover a realistic
30/// ISO-20022-shaped payload's `to_arena` deep-clone in one shot, so the first
31/// few calls on each thread don't trigger `Bump::new_chunk`. After that the
32/// chunks persist across calls and the capacity is irrelevant.
33const ARENA_INITIAL_CAPACITY: usize = 128 * 1024;
34
35thread_local! {
36    /// Per-worker-thread bump arena. `Engine` and `Arc<Logic>` are `Send + Sync`
37    /// and shared across threads; `Bump` is `!Send` so it lives here for
38    /// zero-contention scratch space. Chunks accumulate over the thread's
39    /// lifetime and `reset()` rewinds the pointer without freeing chunks back
40    /// to the OS — steady-state allocator pressure is zero.
41    static EVAL_ARENA: RefCell<Bump> = RefCell::new(Bump::with_capacity(ARENA_INITIAL_CAPACITY));
42}
43
44/// Evaluate `compiled` against `context` using the worker thread's bump
45/// arena, returning the result as an owned `OwnedDataValue`. The arena is
46/// rewound before the call so peak memory is bounded by the single largest
47/// evaluation; chunks persist across calls so steady-state allocation is zero.
48///
49/// Use this for one-shot evals where the context isn't reused across
50/// multiple JSONLogic calls (e.g. a single condition check). For batches of
51/// read-only evals against the same context (validation, log) use
52/// [`with_arena`] and convert the context once via
53/// [`datavalue::OwnedDataValue::to_arena`].
54#[inline]
55pub(crate) fn eval_to_owned(
56    engine: &Engine,
57    compiled: &Logic,
58    context: &OwnedDataValue,
59) -> std::result::Result<OwnedDataValue, datalogic_rs::Error> {
60    EVAL_ARENA.with(|cell| {
61        let mut arena = cell.borrow_mut();
62        arena.reset();
63        let r = engine.evaluate(compiled, context, &arena)?;
64        Ok(r.to_owned())
65    })
66}
67
68/// Run `f` with the worker thread's bump arena rewound. The closure receives
69/// the `Bump` and can amortize work across multiple `engine.evaluate` calls
70/// by converting the input context to `DataValue` once and reusing it. Use
71/// this for batches of read-only evals against the same context (validation,
72/// log) — it skips the per-eval `to_arena` deep-clone that dominates
73/// realistic profile.
74#[inline]
75pub(crate) fn with_arena<R>(f: impl FnOnce(&Bump) -> R) -> R {
76    EVAL_ARENA.with(|cell| {
77        let mut arena = cell.borrow_mut();
78        arena.reset();
79        f(&arena)
80    })
81}
82
83/// Depth‑2 arena cache for a `Message.context` (always an
84/// `OwnedDataValue::Object`).
85///
86/// Built once at the top of a mutating task call, then mutated in place as
87/// the task writes back into `message.context`. Writes at path `a.b.X`
88/// invalidate only the `(a, b)` arena slot — `data.input` stays cached
89/// across the entire map task even while `data.MT103.*` is being written.
90///
91/// **Lifetime model.** All arena allocations come out of the borrowed `Bump`.
92/// `top_keys` / `top_values` / `depth2` are owned `Vec`s so we can mutate
93/// them freely; the `DataValue<'a>` slice handed to `engine.evaluate` is a
94/// fresh `arena.alloc_slice_copy` per call, so it stays valid for that eval
95/// regardless of subsequent mutations.
96pub(crate) struct ArenaContext<'a> {
97    arena: &'a Bump,
98    /// Top-level slot keys, arena-allocated `&'a str`.
99    top_keys: Vec<&'a str>,
100    /// Top-level slot values. When a slot's owned value is an `Object`, the
101    /// corresponding `top_values[i]` is `DataValue::Object(&'a [...])` whose
102    /// slice was minted from `depth2[i]` via `alloc_slice_copy`. When not an
103    /// Object, `depth2[i] = None` and `top_values[i]` is the full arena form.
104    top_values: Vec<DataValue<'a>>,
105    /// Depth‑2 cache, parallel to `top_keys`. `None` for non‑Object top slots.
106    depth2: Vec<Option<Depth2Cache<'a>>>,
107}
108
109struct Depth2Cache<'a> {
110    keys: Vec<&'a str>,
111    values: Vec<DataValue<'a>>,
112}
113
114impl<'a> ArenaContext<'a> {
115    /// Build from an `OwnedDataValue` context (which should be the canonical
116    /// `Object { data, metadata, temp_data }` shape). Deep-walks the owned
117    /// tree exactly once; subsequent reads / mutations are O(touched slot).
118    pub fn from_owned(ctx: &OwnedDataValue, arena: &'a Bump) -> Self {
119        let mut top_keys: Vec<&'a str> = Vec::with_capacity(4);
120        let mut top_values: Vec<DataValue<'a>> = Vec::with_capacity(4);
121        let mut depth2: Vec<Option<Depth2Cache<'a>>> = Vec::with_capacity(4);
122
123        if let OwnedDataValue::Object(pairs) = ctx {
124            for (k, v) in pairs {
125                top_keys.push(arena.alloc_str(k));
126                match v {
127                    OwnedDataValue::Object(children) => {
128                        let mut d2_keys: Vec<&'a str> = Vec::with_capacity(children.len());
129                        let mut d2_vals: Vec<DataValue<'a>> = Vec::with_capacity(children.len());
130                        for (ck, cv) in children {
131                            d2_keys.push(arena.alloc_str(ck));
132                            d2_vals.push(cv.to_arena(arena));
133                        }
134                        let slice = build_object_slice(arena, &d2_keys, &d2_vals);
135                        top_values.push(DataValue::Object(slice));
136                        depth2.push(Some(Depth2Cache {
137                            keys: d2_keys,
138                            values: d2_vals,
139                        }));
140                    }
141                    _ => {
142                        top_values.push(v.to_arena(arena));
143                        depth2.push(None);
144                    }
145                }
146            }
147        }
148
149        Self {
150            arena,
151            top_keys,
152            top_values,
153            depth2,
154        }
155    }
156
157    /// Build an arena `DataValue::Object` for the current state. The returned
158    /// slice is freshly allocated in the arena and stays valid for the caller
159    /// to pass into `engine.evaluate`; later mutations on `self` allocate a
160    /// new slice on the next call.
161    pub fn as_data_value(&self) -> DataValue<'a> {
162        let slice = build_object_slice(self.arena, &self.top_keys, &self.top_values);
163        DataValue::Object(slice)
164    }
165
166    /// Borrow the underlying arena — needed by callers that want to allocate
167    /// or evaluate into the same `Bump` (e.g. `engine.evaluate(...)`).
168    #[inline]
169    pub fn arena(&self) -> &'a Bump {
170        self.arena
171    }
172
173    /// Apply an owned write at `path` (pre-split into `parts`) to *both* the
174    /// underlying `OwnedDataValue` context (via the supplied closure that
175    /// performs the in-place mutation) and the arena cache. Skips the runtime
176    /// `str::split` that shows up in profiles as `CharSearcher::next_match`.
177    pub fn apply_mutation_parts(
178        &mut self,
179        owned_ctx: &mut OwnedDataValue,
180        parts: &[Arc<str>],
181        apply: impl FnOnce(&mut OwnedDataValue),
182    ) {
183        apply(owned_ctx);
184        self.refresh_after_write_parts(owned_ctx, parts);
185    }
186
187    /// Refresh the arena slot(s) for `path` from the current `owned_ctx`,
188    /// without applying any new write. Used when a sync task mutated
189    /// `message.context` directly (e.g. `parse_json` going through legacy
190    /// helpers) and we need the arena to catch up.
191    pub fn refresh_for_path(&mut self, owned_ctx: &OwnedDataValue, path: &str) {
192        self.refresh_after_write(owned_ctx, path);
193    }
194
195    /// Pre-split variant of `refresh_after_write` — same algorithm, no
196    /// per-call `str::split` walk. `parts` retains the original `#` prefix;
197    /// the hash strip is applied here at lookup so the cache key matches
198    /// what `set_nested_value_parts` actually wrote.
199    fn refresh_after_write_parts(&mut self, owned_ctx: &OwnedDataValue, parts: &[Arc<str>]) {
200        let top_raw: &str = match parts.first() {
201            Some(p) if !p.is_empty() => p,
202            _ => {
203                self.rebuild_all_from(owned_ctx);
204                return;
205            }
206        };
207        let top = top_raw.strip_prefix('#').unwrap_or(top_raw);
208        fn strip<'p>(p: &'p Arc<str>) -> &'p str {
209            let s: &'p str = p;
210            s.strip_prefix('#').unwrap_or(s)
211        }
212        let depth2_key: Option<&str> = parts.get(1).map(strip);
213        let depth3_key: Option<&str> = parts.get(2).map(strip);
214        self.refresh_after_write_inner(owned_ctx, top, depth2_key, depth3_key);
215    }
216
217    /// Refresh the arena cache after `owned_ctx` was mutated at `path`.
218    fn refresh_after_write(&mut self, owned_ctx: &OwnedDataValue, path: &str) {
219        let mut parts = path.split('.');
220        let top_raw = match parts.next() {
221            Some(p) if !p.is_empty() => p,
222            _ => {
223                self.rebuild_all_from(owned_ctx);
224                return;
225            }
226        };
227        let top = top_raw.strip_prefix('#').unwrap_or(top_raw);
228        let depth2_key = parts.next().map(|p| p.strip_prefix('#').unwrap_or(p));
229        let depth3_key = parts.next().map(|p| p.strip_prefix('#').unwrap_or(p));
230        self.refresh_after_write_inner(owned_ctx, top, depth2_key, depth3_key);
231    }
232
233    /// Shared body: walk the cache for `top` and optional `depth2_key`,
234    /// rebuilding only the dirtied slot. `depth3_key` is ignored (the
235    /// depth-3 sub-cache was tried but regressed on the realistic workload —
236    /// per-write d3 cache thrashing exceeded the savings).
237    fn refresh_after_write_inner(
238        &mut self,
239        owned_ctx: &OwnedDataValue,
240        top: &str,
241        depth2_key: Option<&str>,
242        _depth3_key: Option<&str>,
243    ) {
244        let OwnedDataValue::Object(owned_pairs) = owned_ctx else {
245            self.rebuild_all_from(owned_ctx);
246            return;
247        };
248
249        let owned_top_val = owned_pairs.iter().find(|(k, _)| k == top).map(|(_, v)| v);
250
251        let top_idx = self.top_keys.iter().position(|k| *k == top);
252
253        match (owned_top_val, top_idx) {
254            (None, Some(idx)) => {
255                // Top slot was removed from owned ctx — remove from cache.
256                self.top_keys.remove(idx);
257                self.top_values.remove(idx);
258                self.depth2.remove(idx);
259            }
260            (Some(new_val), idx_opt) => {
261                let idx = match idx_opt {
262                    Some(i) => i,
263                    None => {
264                        self.top_keys.push(self.arena.alloc_str(top));
265                        self.top_values.push(DataValue::Null);
266                        self.depth2.push(None);
267                        self.top_keys.len() - 1
268                    }
269                };
270
271                match (new_val, depth2_key, &mut self.depth2[idx]) {
272                    // Depth-2 write into an existing Object top slot that already
273                    // has a depth-2 cache → refresh only the child.
274                    (OwnedDataValue::Object(new_children), Some(child_key), Some(d2)) => {
275                        if let Some(new_child) = new_children
276                            .iter()
277                            .find(|(k, _)| k == child_key)
278                            .map(|(_, v)| v)
279                        {
280                            // Replace or insert the single child slot.
281                            let child_arena = new_child.to_arena(self.arena);
282                            if let Some(pos) = d2.keys.iter().position(|k| *k == child_key) {
283                                d2.values[pos] = child_arena;
284                            } else {
285                                d2.keys.push(self.arena.alloc_str(child_key));
286                                d2.values.push(child_arena);
287                            }
288                            // Also reflect deletions of *other* depth-2 keys
289                            // (rare but possible if the write replaced the
290                            // whole top object). Cheap O(n) scan.
291                            if d2.keys.len() != new_children.len() {
292                                // Owned children diverged from our cache —
293                                // rebuild the depth-2 cache from owned.
294                                self.rebuild_top_slot(owned_top_val.unwrap(), idx);
295                                return;
296                            }
297                        } else {
298                            // child_key not found in new owned object — child
299                            // was removed. Drop from cache.
300                            if let Some(pos) = d2.keys.iter().position(|k| *k == child_key) {
301                                d2.keys.remove(pos);
302                                d2.values.remove(pos);
303                            }
304                        }
305                        let slice = build_object_slice(self.arena, &d2.keys, &d2.values);
306                        self.top_values[idx] = DataValue::Object(slice);
307                    }
308                    // Top-level (depth-1) write or shape change → rebuild
309                    // the whole top slot (cheap relative to a full ctx walk).
310                    _ => {
311                        self.rebuild_top_slot(new_val, idx);
312                    }
313                }
314            }
315            (None, None) => { /* no-op */ }
316        }
317    }
318
319    fn rebuild_top_slot(&mut self, owned: &OwnedDataValue, idx: usize) {
320        match owned {
321            OwnedDataValue::Object(children) => {
322                let mut d2_keys: Vec<&'a str> = Vec::with_capacity(children.len());
323                let mut d2_vals: Vec<DataValue<'a>> = Vec::with_capacity(children.len());
324                for (ck, cv) in children {
325                    d2_keys.push(self.arena.alloc_str(ck));
326                    d2_vals.push(cv.to_arena(self.arena));
327                }
328                let slice = build_object_slice(self.arena, &d2_keys, &d2_vals);
329                self.top_values[idx] = DataValue::Object(slice);
330                self.depth2[idx] = Some(Depth2Cache {
331                    keys: d2_keys,
332                    values: d2_vals,
333                });
334            }
335            _ => {
336                self.top_values[idx] = owned.to_arena(self.arena);
337                self.depth2[idx] = None;
338            }
339        }
340    }
341
342    /// Last-resort: ditch all cached state and rebuild from scratch. Should be
343    /// rare on normal flows — only triggered if the context shape changes in
344    /// a way the depth-2 cache can't track.
345    fn rebuild_all_from(&mut self, ctx: &OwnedDataValue) {
346        let rebuilt = ArenaContext::from_owned(ctx, self.arena);
347        self.top_keys = rebuilt.top_keys;
348        self.top_values = rebuilt.top_values;
349        self.depth2 = rebuilt.depth2;
350    }
351}
352
353/// Allocate a fresh `(key, value)` slice in the arena. Each
354/// `engine.evaluate` call gets its own slice; subsequent mutations to the
355/// underlying Vecs are independent.
356fn build_object_slice<'a>(
357    arena: &'a Bump,
358    keys: &[&'a str],
359    values: &[DataValue<'a>],
360) -> &'a [(&'a str, DataValue<'a>)] {
361    debug_assert_eq!(keys.len(), values.len());
362    arena.alloc_slice_fill_iter(keys.iter().zip(values.iter()).map(|(k, v)| (*k, *v)))
363}
364
365/// Evaluate a workflow or task condition using a cached compiled logic
366/// expression. Returns `true` when `condition_index` is `None` (no condition
367/// is treated as "always run"). Evaluation errors are logged and downgraded
368/// to `false` — a condition that fails to evaluate skips its task/workflow
369/// rather than aborting the whole message.
370pub fn evaluate_condition(
371    engine: &Engine,
372    compiled_condition: Option<&Arc<Logic>>,
373    context: &OwnedDataValue,
374) -> Result<bool> {
375    match compiled_condition {
376        Some(compiled) => match eval_to_owned(engine, compiled, context) {
377            Ok(value) => Ok(matches!(value, OwnedDataValue::Bool(true))),
378            Err(e) => {
379                error!("Failed to evaluate condition: {:?}", e);
380                Ok(false)
381            }
382        },
383        None => Ok(true),
384    }
385}
386
387/// Same as `evaluate_condition` but evaluates against an arena-resident
388/// `DataValue` and an existing `Bump`. Used inside a `with_arena` block
389/// (the workflow sync-stretch path) to avoid re-entering the thread-local
390/// arena `RefCell::borrow_mut`.
391pub fn evaluate_condition_in_arena(
392    engine: &Engine,
393    compiled_condition: Option<&Arc<Logic>>,
394    ctx: DataValue<'_>,
395    arena: &Bump,
396) -> Result<bool> {
397    match compiled_condition {
398        Some(compiled) => match engine.evaluate(compiled, ctx, arena) {
399            Ok(value) => Ok(matches!(value, DataValue::Bool(true))),
400            Err(e) => {
401                error!("Failed to evaluate condition: {:?}", e);
402                Ok(false)
403            }
404        },
405        None => Ok(true),
406    }
407}