Skip to main content

dataflow_rs/engine/functions/
filter.rs

1use crate::engine::error::Result;
2use crate::engine::executor::{ArenaContext, with_arena};
3use crate::engine::message::{Change, Message};
4use crate::engine::task_outcome::TaskOutcome;
5use datalogic_rs::{Engine, Logic};
6use datavalue::DataValue;
7use log::{debug, info};
8use serde::Deserialize;
9use serde_json::Value;
10use std::sync::Arc;
11
12/// What to do when the filter condition evaluates to false
13#[derive(Debug, Clone, Deserialize, Default)]
14#[serde(rename_all = "lowercase")]
15pub enum RejectAction {
16    /// Halt the entire workflow — no further tasks in this workflow execute
17    #[default]
18    Halt,
19    /// Skip only this task — continue with next task in the workflow
20    Skip,
21}
22
23/// Configuration for the filter/gate function
24#[derive(Debug, Clone, Deserialize)]
25pub struct FilterConfig {
26    /// JSONLogic condition to evaluate against the message context.
27    /// If true, the message passes through. If false, the on_reject action is taken.
28    pub condition: Value,
29
30    /// What to do when the condition is false
31    #[serde(default)]
32    pub on_reject: RejectAction,
33
34    /// Pre-compiled JSONLogic, populated by `LogicCompiler`. `None` is
35    /// treated as "condition not met" (same fallback as before).
36    #[serde(skip)]
37    pub compiled_condition: Option<Arc<Logic>>,
38}
39
40impl FilterConfig {
41    /// Execute the filter function, opening a fresh thread-local arena scope.
42    ///
43    /// Use this entry point when calling `FilterConfig` outside an existing
44    /// `with_arena` scope. Inside a workflow sync stretch the dispatch goes
45    /// through [`Self::execute_in_arena`] to reuse the cached arena form of
46    /// `message.context` and avoid a redundant `to_arena` deep walk.
47    ///
48    /// Returns `TaskOutcome::Success` when the condition passes,
49    /// `TaskOutcome::Halt` / `TaskOutcome::Skip` per `on_reject` otherwise.
50    pub fn execute(
51        &self,
52        message: &mut Message,
53        engine: &Arc<Engine>,
54    ) -> Result<(TaskOutcome, Vec<Change>)> {
55        with_arena(|arena| {
56            let mut arena_ctx = ArenaContext::from_owned(&message.context, arena);
57            self.execute_in_arena(message, &mut arena_ctx, engine)
58        })
59    }
60
61    /// Execute against an externally-provided `ArenaContext` so the cached
62    /// arena form of `message.context` is reused. Eliminates the inner
63    /// `with_arena`/`eval_to_owned` call that would re-borrow the
64    /// thread-local arena `RefCell` (panic) when invoked from inside the
65    /// sync stretch.
66    pub(crate) fn execute_in_arena(
67        &self,
68        _message: &mut Message,
69        arena_ctx: &mut ArenaContext<'_>,
70        engine: &Arc<Engine>,
71    ) -> Result<(TaskOutcome, Vec<Change>)> {
72        let condition_met = match &self.compiled_condition {
73            Some(compiled) => {
74                let ctx_av = arena_ctx.as_data_value();
75                match engine.evaluate(compiled, ctx_av, arena_ctx.arena()) {
76                    Ok(DataValue::Bool(true)) => true,
77                    Ok(_) => false,
78                    Err(e) => {
79                        debug!("Filter: condition evaluation error: {:?}", e);
80                        false
81                    }
82                }
83            }
84            None => {
85                debug!("Filter: condition not compiled, treating as not met");
86                false
87            }
88        };
89
90        if condition_met {
91            debug!("Filter: condition passed");
92            Ok((TaskOutcome::Success, vec![]))
93        } else {
94            match self.on_reject {
95                RejectAction::Halt => {
96                    info!("Filter: condition not met, halting workflow");
97                    Ok((TaskOutcome::Halt, vec![]))
98                }
99                RejectAction::Skip => {
100                    debug!("Filter: condition not met, skipping");
101                    Ok((TaskOutcome::Skip, vec![]))
102                }
103            }
104        }
105    }
106}