dataflow_rs/engine/executor.rs
1//! # Evaluation Primitives
2//!
3//! Free functions and types that support JSONLogic evaluation in the engine.
4//! Built-in function execution lives on each config type (`MapConfig::execute`,
5//! `ValidationConfig::execute`, …) — this module just provides the shared
6//! evaluation machinery they all build on.
7//!
8//! The bump arena is held in a thread-local cell on each Tokio worker. Per
9//! call, the arena is rewound via `Bump::reset` (constant-time, retains chunks)
10//! before the eval. Chunks accumulate to fit the workload's high-water mark
11//! and persist across calls — no per-task malloc/free churn. Profiling
12//! showed per-task `Bump::with_capacity` malloc was the dominant cost when
13//! arena sizing was tuned for realistic workloads; thread-local reuse
14//! amortizes that to zero in steady state.
15//!
16//! `ArenaContext` (below) extends this further for **mutating** tasks (map):
17//! the message context is `to_arena`'d once per task call into a depth‑2
18//! cache, and subsequent writes only re‑arena the dirtied subtree — typically
19//! `data.MT103` while the heavy `data.input` stays cached.
20
21use crate::engine::error::Result;
22use bumpalo::Bump;
23use datalogic_rs::{Engine, Logic};
24use datavalue::{DataValue, OwnedDataValue};
25use log::error;
26use std::cell::RefCell;
27use std::sync::Arc;
28
29/// Initial bump arena capacity per worker thread. Sized to cover a realistic
30/// ISO-20022-shaped payload's `to_arena` deep-clone in one shot, so the first
31/// few calls on each thread don't trigger `Bump::new_chunk`. After that the
32/// chunks persist across calls and the capacity is irrelevant.
33const ARENA_INITIAL_CAPACITY: usize = 128 * 1024;
34
35thread_local! {
36 /// Per-worker-thread bump arena. `Engine` and `Arc<Logic>` are `Send + Sync`
37 /// and shared across threads; `Bump` is `!Send` so it lives here for
38 /// zero-contention scratch space. Chunks accumulate over the thread's
39 /// lifetime and `reset()` rewinds the pointer without freeing chunks back
40 /// to the OS — steady-state allocator pressure is zero.
41 static EVAL_ARENA: RefCell<Bump> = RefCell::new(Bump::with_capacity(ARENA_INITIAL_CAPACITY));
42}
43
44/// Evaluate `compiled` against `context` using the worker thread's bump
45/// arena, returning the result as an owned `OwnedDataValue`. The arena is
46/// rewound before the call so peak memory is bounded by the single largest
47/// evaluation; chunks persist across calls so steady-state allocation is zero.
48///
49/// Use this for one-shot evals where the context isn't reused across
50/// multiple JSONLogic calls (e.g. a single condition check). For batches of
51/// read-only evals against the same context (validation, log) use
52/// [`with_arena`] and convert the context once via
53/// [`datavalue::OwnedDataValue::to_arena`].
54#[inline]
55pub(crate) fn eval_to_owned(
56 engine: &Engine,
57 compiled: &Logic,
58 context: &OwnedDataValue,
59) -> std::result::Result<OwnedDataValue, datalogic_rs::Error> {
60 EVAL_ARENA.with(|cell| {
61 let mut arena = cell.borrow_mut();
62 arena.reset();
63 let r = engine.evaluate(compiled, context, &arena)?;
64 Ok(r.to_owned())
65 })
66}
67
68/// Run `f` with the worker thread's bump arena rewound. The closure receives
69/// the `Bump` and can amortize work across multiple `engine.evaluate` calls
70/// by converting the input context to `DataValue` once and reusing it. Use
71/// this for batches of read-only evals against the same context (validation,
72/// log) — it skips the per-eval `to_arena` deep-clone that dominates
73/// realistic profile.
74#[inline]
75pub(crate) fn with_arena<R>(f: impl FnOnce(&Bump) -> R) -> R {
76 EVAL_ARENA.with(|cell| {
77 let mut arena = cell.borrow_mut();
78 arena.reset();
79 f(&arena)
80 })
81}
82
83/// Depth‑2 arena cache for a `Message.context` (always an
84/// `OwnedDataValue::Object`).
85///
86/// Built once at the top of a mutating task call, then mutated in place as
87/// the task writes back into `message.context`. Writes at path `a.b.X`
88/// invalidate only the `(a, b)` arena slot — `data.input` stays cached
89/// across the entire map task even while `data.MT103.*` is being written.
90///
91/// **Lifetime model.** All arena allocations come out of the borrowed `Bump`.
92/// `top_keys` / `top_values` / `depth2` are owned `Vec`s so we can mutate
93/// them freely; the `DataValue<'a>` slice handed to `engine.evaluate` is a
94/// fresh `arena.alloc_slice_copy` per call, so it stays valid for that eval
95/// regardless of subsequent mutations.
96pub(crate) struct ArenaContext<'a> {
97 arena: &'a Bump,
98 /// Top-level slot keys, arena-allocated `&'a str`.
99 top_keys: Vec<&'a str>,
100 /// Top-level slot values. When a slot's owned value is an `Object`, the
101 /// corresponding `top_values[i]` is `DataValue::Object(&'a [...])` whose
102 /// slice was minted from `depth2[i]` via `alloc_slice_copy`. When not an
103 /// Object, `depth2[i] = None` and `top_values[i]` is the full arena form.
104 top_values: Vec<DataValue<'a>>,
105 /// Depth‑2 cache, parallel to `top_keys`. `None` for non‑Object top slots.
106 depth2: Vec<Option<Depth2Cache<'a>>>,
107}
108
109struct Depth2Cache<'a> {
110 keys: Vec<&'a str>,
111 values: Vec<DataValue<'a>>,
112}
113
114impl<'a> ArenaContext<'a> {
115 /// Build from an `OwnedDataValue` context (which should be the canonical
116 /// `Object { data, metadata, temp_data }` shape). Deep-walks the owned
117 /// tree exactly once; subsequent reads / mutations are O(touched slot).
118 pub fn from_owned(ctx: &OwnedDataValue, arena: &'a Bump) -> Self {
119 let mut top_keys: Vec<&'a str> = Vec::with_capacity(4);
120 let mut top_values: Vec<DataValue<'a>> = Vec::with_capacity(4);
121 let mut depth2: Vec<Option<Depth2Cache<'a>>> = Vec::with_capacity(4);
122
123 if let OwnedDataValue::Object(pairs) = ctx {
124 for (k, v) in pairs {
125 top_keys.push(arena.alloc_str(k));
126 match v {
127 OwnedDataValue::Object(children) => {
128 let mut d2_keys: Vec<&'a str> = Vec::with_capacity(children.len());
129 let mut d2_vals: Vec<DataValue<'a>> = Vec::with_capacity(children.len());
130 for (ck, cv) in children {
131 d2_keys.push(arena.alloc_str(ck));
132 d2_vals.push(cv.to_arena(arena));
133 }
134 let slice = build_object_slice(arena, &d2_keys, &d2_vals);
135 top_values.push(DataValue::Object(slice));
136 depth2.push(Some(Depth2Cache {
137 keys: d2_keys,
138 values: d2_vals,
139 }));
140 }
141 _ => {
142 top_values.push(v.to_arena(arena));
143 depth2.push(None);
144 }
145 }
146 }
147 }
148
149 Self {
150 arena,
151 top_keys,
152 top_values,
153 depth2,
154 }
155 }
156
157 /// Build an arena `DataValue::Object` for the current state. The returned
158 /// slice is freshly allocated in the arena and stays valid for the caller
159 /// to pass into `engine.evaluate`; later mutations on `self` allocate a
160 /// new slice on the next call.
161 pub fn as_data_value(&self) -> DataValue<'a> {
162 let slice = build_object_slice(self.arena, &self.top_keys, &self.top_values);
163 DataValue::Object(slice)
164 }
165
166 /// Borrow the underlying arena — needed by callers that want to allocate
167 /// or evaluate into the same `Bump` (e.g. `engine.evaluate(...)`).
168 #[inline]
169 pub fn arena(&self) -> &'a Bump {
170 self.arena
171 }
172
173 /// Apply an owned write at `path` (pre-split into `parts`) to *both* the
174 /// underlying `OwnedDataValue` context (via the supplied closure that
175 /// performs the in-place mutation) and the arena cache. Skips the runtime
176 /// `str::split` that shows up in profiles as `CharSearcher::next_match`.
177 pub fn apply_mutation_parts(
178 &mut self,
179 owned_ctx: &mut OwnedDataValue,
180 parts: &[Arc<str>],
181 apply: impl FnOnce(&mut OwnedDataValue),
182 ) {
183 apply(owned_ctx);
184 self.refresh_after_write_parts(owned_ctx, parts);
185 }
186
187 /// Refresh the arena slot(s) for `path` from the current `owned_ctx`,
188 /// without applying any new write. Used when a sync task mutated
189 /// `message.context` directly (e.g. `parse_json` going through legacy
190 /// helpers) and we need the arena to catch up.
191 pub fn refresh_for_path(&mut self, owned_ctx: &OwnedDataValue, path: &str) {
192 self.refresh_after_write(owned_ctx, path);
193 }
194
195 /// Pre-split variant of `refresh_after_write` — same algorithm, no
196 /// per-call `str::split` walk. `parts` retains the original `#` prefix;
197 /// the hash strip is applied here at lookup so the cache key matches
198 /// what `set_nested_value_parts` actually wrote.
199 fn refresh_after_write_parts(&mut self, owned_ctx: &OwnedDataValue, parts: &[Arc<str>]) {
200 let top_raw: &str = match parts.first() {
201 Some(p) if !p.is_empty() => p,
202 _ => {
203 self.rebuild_all_from(owned_ctx);
204 return;
205 }
206 };
207 let top = top_raw.strip_prefix('#').unwrap_or(top_raw);
208 fn strip<'p>(p: &'p Arc<str>) -> &'p str {
209 let s: &'p str = p;
210 s.strip_prefix('#').unwrap_or(s)
211 }
212 let depth2_key: Option<&str> = parts.get(1).map(strip);
213 let depth3_key: Option<&str> = parts.get(2).map(strip);
214 self.refresh_after_write_inner(owned_ctx, top, depth2_key, depth3_key);
215 }
216
217 /// Refresh the arena cache after `owned_ctx` was mutated at `path`.
218 fn refresh_after_write(&mut self, owned_ctx: &OwnedDataValue, path: &str) {
219 let mut parts = path.split('.');
220 let top_raw = match parts.next() {
221 Some(p) if !p.is_empty() => p,
222 _ => {
223 self.rebuild_all_from(owned_ctx);
224 return;
225 }
226 };
227 let top = top_raw.strip_prefix('#').unwrap_or(top_raw);
228 let depth2_key = parts.next().map(|p| p.strip_prefix('#').unwrap_or(p));
229 let depth3_key = parts.next().map(|p| p.strip_prefix('#').unwrap_or(p));
230 self.refresh_after_write_inner(owned_ctx, top, depth2_key, depth3_key);
231 }
232
233 /// Shared body: walk the cache for `top` and optional `depth2_key`,
234 /// rebuilding only the dirtied slot. `depth3_key` is ignored (the
235 /// depth-3 sub-cache was tried but regressed on the realistic workload —
236 /// per-write d3 cache thrashing exceeded the savings).
237 fn refresh_after_write_inner(
238 &mut self,
239 owned_ctx: &OwnedDataValue,
240 top: &str,
241 depth2_key: Option<&str>,
242 _depth3_key: Option<&str>,
243 ) {
244 let OwnedDataValue::Object(owned_pairs) = owned_ctx else {
245 self.rebuild_all_from(owned_ctx);
246 return;
247 };
248
249 let owned_top_val = owned_pairs.iter().find(|(k, _)| k == top).map(|(_, v)| v);
250
251 let top_idx = self.top_keys.iter().position(|k| *k == top);
252
253 match (owned_top_val, top_idx) {
254 (None, Some(idx)) => {
255 // Top slot was removed from owned ctx — remove from cache.
256 self.top_keys.remove(idx);
257 self.top_values.remove(idx);
258 self.depth2.remove(idx);
259 }
260 (Some(new_val), idx_opt) => {
261 let idx = match idx_opt {
262 Some(i) => i,
263 None => {
264 self.top_keys.push(self.arena.alloc_str(top));
265 self.top_values.push(DataValue::Null);
266 self.depth2.push(None);
267 self.top_keys.len() - 1
268 }
269 };
270
271 match (new_val, depth2_key, &mut self.depth2[idx]) {
272 // Depth-2 write into an existing Object top slot that already
273 // has a depth-2 cache → refresh only the child.
274 (OwnedDataValue::Object(new_children), Some(child_key), Some(d2)) => {
275 if let Some(new_child) = new_children
276 .iter()
277 .find(|(k, _)| k == child_key)
278 .map(|(_, v)| v)
279 {
280 // Replace or insert the single child slot.
281 let child_arena = new_child.to_arena(self.arena);
282 if let Some(pos) = d2.keys.iter().position(|k| *k == child_key) {
283 d2.values[pos] = child_arena;
284 } else {
285 d2.keys.push(self.arena.alloc_str(child_key));
286 d2.values.push(child_arena);
287 }
288 // Also reflect deletions of *other* depth-2 keys
289 // (rare but possible if the write replaced the
290 // whole top object). Cheap O(n) scan.
291 if d2.keys.len() != new_children.len() {
292 // Owned children diverged from our cache —
293 // rebuild the depth-2 cache from owned.
294 self.rebuild_top_slot(owned_top_val.unwrap(), idx);
295 return;
296 }
297 } else {
298 // child_key not found in new owned object — child
299 // was removed. Drop from cache.
300 if let Some(pos) = d2.keys.iter().position(|k| *k == child_key) {
301 d2.keys.remove(pos);
302 d2.values.remove(pos);
303 }
304 }
305 let slice = build_object_slice(self.arena, &d2.keys, &d2.values);
306 self.top_values[idx] = DataValue::Object(slice);
307 }
308 // Top-level (depth-1) write or shape change → rebuild
309 // the whole top slot (cheap relative to a full ctx walk).
310 _ => {
311 self.rebuild_top_slot(new_val, idx);
312 }
313 }
314 }
315 (None, None) => { /* no-op */ }
316 }
317 }
318
319 fn rebuild_top_slot(&mut self, owned: &OwnedDataValue, idx: usize) {
320 match owned {
321 OwnedDataValue::Object(children) => {
322 let mut d2_keys: Vec<&'a str> = Vec::with_capacity(children.len());
323 let mut d2_vals: Vec<DataValue<'a>> = Vec::with_capacity(children.len());
324 for (ck, cv) in children {
325 d2_keys.push(self.arena.alloc_str(ck));
326 d2_vals.push(cv.to_arena(self.arena));
327 }
328 let slice = build_object_slice(self.arena, &d2_keys, &d2_vals);
329 self.top_values[idx] = DataValue::Object(slice);
330 self.depth2[idx] = Some(Depth2Cache {
331 keys: d2_keys,
332 values: d2_vals,
333 });
334 }
335 _ => {
336 self.top_values[idx] = owned.to_arena(self.arena);
337 self.depth2[idx] = None;
338 }
339 }
340 }
341
342 /// Last-resort: ditch all cached state and rebuild from scratch. Should be
343 /// rare on normal flows — only triggered if the context shape changes in
344 /// a way the depth-2 cache can't track.
345 fn rebuild_all_from(&mut self, ctx: &OwnedDataValue) {
346 let rebuilt = ArenaContext::from_owned(ctx, self.arena);
347 self.top_keys = rebuilt.top_keys;
348 self.top_values = rebuilt.top_values;
349 self.depth2 = rebuilt.depth2;
350 }
351}
352
353/// Allocate a fresh `(key, value)` slice in the arena. Each
354/// `engine.evaluate` call gets its own slice; subsequent mutations to the
355/// underlying Vecs are independent.
356fn build_object_slice<'a>(
357 arena: &'a Bump,
358 keys: &[&'a str],
359 values: &[DataValue<'a>],
360) -> &'a [(&'a str, DataValue<'a>)] {
361 debug_assert_eq!(keys.len(), values.len());
362 arena.alloc_slice_fill_iter(keys.iter().zip(values.iter()).map(|(k, v)| (*k, *v)))
363}
364
365/// Evaluate a workflow or task condition using a cached compiled logic
366/// expression. Returns `true` when `condition_index` is `None` (no condition
367/// is treated as "always run"). Evaluation errors are logged and downgraded
368/// to `false` — a condition that fails to evaluate skips its task/workflow
369/// rather than aborting the whole message.
370pub fn evaluate_condition(
371 engine: &Engine,
372 compiled_condition: Option<&Arc<Logic>>,
373 context: &OwnedDataValue,
374) -> Result<bool> {
375 match compiled_condition {
376 Some(compiled) => match eval_to_owned(engine, compiled, context) {
377 Ok(value) => Ok(matches!(value, OwnedDataValue::Bool(true))),
378 Err(e) => {
379 error!("Failed to evaluate condition: {:?}", e);
380 Ok(false)
381 }
382 },
383 None => Ok(true),
384 }
385}
386
387/// Same as `evaluate_condition` but evaluates against an arena-resident
388/// `DataValue` and an existing `Bump`. Used inside a `with_arena` block
389/// (the workflow sync-stretch path) to avoid re-entering the thread-local
390/// arena `RefCell::borrow_mut`.
391pub fn evaluate_condition_in_arena(
392 engine: &Engine,
393 compiled_condition: Option<&Arc<Logic>>,
394 ctx: DataValue<'_>,
395 arena: &Bump,
396) -> Result<bool> {
397 match compiled_condition {
398 Some(compiled) => match engine.evaluate(compiled, ctx, arena) {
399 Ok(value) => Ok(matches!(value, DataValue::Bool(true))),
400 Err(e) => {
401 error!("Failed to evaluate condition: {:?}", e);
402 Ok(false)
403 }
404 },
405 None => Ok(true),
406 }
407}