dataflow_rs/engine/functions/filter.rs
1use crate::engine::error::Result;
2use crate::engine::executor::{ArenaContext, with_arena};
3use crate::engine::message::{Change, Message};
4use crate::engine::task_outcome::TaskOutcome;
5use datalogic_rs::{Engine, Logic};
6use datavalue::DataValue;
7use log::{debug, info};
8use serde::Deserialize;
9use serde_json::Value;
10use std::sync::Arc;
11
12/// What to do when the filter condition evaluates to false
13#[derive(Debug, Clone, Deserialize, Default)]
14#[serde(rename_all = "lowercase")]
15pub enum RejectAction {
16 /// Halt the entire workflow — no further tasks in this workflow execute
17 #[default]
18 Halt,
19 /// Skip only this task — continue with next task in the workflow
20 Skip,
21}
22
23/// Configuration for the filter/gate function
24#[derive(Debug, Clone, Deserialize)]
25pub struct FilterConfig {
26 /// JSONLogic condition to evaluate against the message context.
27 /// If true, the message passes through. If false, the on_reject action is taken.
28 pub condition: Value,
29
30 /// What to do when the condition is false
31 #[serde(default)]
32 pub on_reject: RejectAction,
33
34 /// Pre-compiled JSONLogic, populated by `LogicCompiler`. `None` is
35 /// treated as "condition not met" (same fallback as before).
36 #[serde(skip)]
37 pub compiled_condition: Option<Arc<Logic>>,
38}
39
40impl FilterConfig {
41 /// Execute the filter function, opening a fresh thread-local arena scope.
42 ///
43 /// Use this entry point when calling `FilterConfig` outside an existing
44 /// `with_arena` scope. Inside a workflow sync stretch the dispatch goes
45 /// through [`Self::execute_in_arena`] to reuse the cached arena form of
46 /// `message.context` and avoid a redundant `to_arena` deep walk.
47 ///
48 /// Returns `TaskOutcome::Success` when the condition passes,
49 /// `TaskOutcome::Halt` / `TaskOutcome::Skip` per `on_reject` otherwise.
50 pub fn execute(
51 &self,
52 message: &mut Message,
53 engine: &Arc<Engine>,
54 ) -> Result<(TaskOutcome, Vec<Change>)> {
55 with_arena(|arena| {
56 let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
57 self.execute_in_arena(message, &mut arena_ctx, engine)
58 })
59 }
60
61 /// Execute against an externally-provided `ArenaContext` so the cached
62 /// arena form of `message.context` is reused. Eliminates the inner
63 /// `with_arena`/`eval_to_owned` call that would re-borrow the
64 /// thread-local arena `RefCell` (panic) when invoked from inside the
65 /// sync stretch.
66 pub(crate) fn execute_in_arena(
67 &self,
68 _message: &mut Message,
69 arena_ctx: &mut ArenaContext<'_>,
70 engine: &Arc<Engine>,
71 ) -> Result<(TaskOutcome, Vec<Change>)> {
72 let condition_met = match &self.compiled_condition {
73 Some(compiled) => {
74 let ctx_av = arena_ctx.as_data_value();
75 match engine.evaluate(compiled, ctx_av, arena_ctx.arena()) {
76 Ok(DataValue::Bool(true)) => true,
77 Ok(_) => false,
78 Err(e) => {
79 debug!("Filter: condition evaluation error: {:?}", e);
80 false
81 }
82 }
83 }
84 None => {
85 debug!("Filter: condition not compiled, treating as not met");
86 false
87 }
88 };
89
90 if condition_met {
91 debug!("Filter: condition passed");
92 Ok((TaskOutcome::Success, vec![]))
93 } else {
94 match self.on_reject {
95 RejectAction::Halt => {
96 info!("Filter: condition not met, halting workflow");
97 Ok((TaskOutcome::Halt, vec![]))
98 }
99 RejectAction::Skip => {
100 debug!("Filter: condition not met, skipping");
101 Ok((TaskOutcome::Skip, vec![]))
102 }
103 }
104 }
105 }
106}