Skip to main content

dataflow_rs/engine/functions/
filter.rs

1use crate::engine::error::Result;
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, info};
5use serde::Deserialize;
6use serde_json::Value;
7use std::sync::Arc;
8
9/// Status code: filter condition passed, continue normally
10pub const FILTER_STATUS_PASS: usize = 200;
11/// Status code: skip this task, continue with next task
12pub const FILTER_STATUS_SKIP: usize = 298;
13/// Status code: halt the current workflow, no further tasks execute
14pub const FILTER_STATUS_HALT: usize = 299;
15
16/// What to do when the filter condition evaluates to false
17#[derive(Debug, Clone, Deserialize, Default)]
18#[serde(rename_all = "lowercase")]
19pub enum RejectAction {
20    /// Halt the entire workflow — no further tasks in this workflow execute
21    #[default]
22    Halt,
23    /// Skip only this task — continue with next task in the workflow
24    Skip,
25}
26
27/// Configuration for the filter/gate function
28#[derive(Debug, Clone, Deserialize)]
29pub struct FilterConfig {
30    /// JSONLogic condition to evaluate against the message context.
31    /// If true, the message passes through. If false, the on_reject action is taken.
32    pub condition: Value,
33
34    /// What to do when the condition is false
35    #[serde(default)]
36    pub on_reject: RejectAction,
37
38    /// Cache index for the compiled condition
39    #[serde(skip)]
40    pub condition_index: Option<usize>,
41}
42
43impl FilterConfig {
44    /// Execute the filter function.
45    ///
46    /// Returns status 200 if condition passes, 299 for halt, 298 for skip.
47    pub fn execute(
48        &self,
49        message: &mut Message,
50        datalogic: &Arc<DataLogic>,
51        logic_cache: &[Arc<CompiledLogic>],
52    ) -> Result<(usize, Vec<Change>)> {
53        let context_arc = message.get_context_arc();
54
55        let condition_met = match self.condition_index {
56            Some(idx) if idx < logic_cache.len() => {
57                match datalogic.evaluate(&logic_cache[idx], Arc::clone(&context_arc)) {
58                    Ok(Value::Bool(true)) => true,
59                    Ok(_) => false,
60                    Err(e) => {
61                        debug!("Filter: condition evaluation error: {:?}", e);
62                        false
63                    }
64                }
65            }
66            _ => {
67                debug!("Filter: condition not compiled, treating as not met");
68                false
69            }
70        };
71
72        if condition_met {
73            debug!("Filter: condition passed");
74            Ok((FILTER_STATUS_PASS, vec![]))
75        } else {
76            match self.on_reject {
77                RejectAction::Halt => {
78                    info!("Filter: condition not met, halting workflow");
79                    Ok((FILTER_STATUS_HALT, vec![]))
80                }
81                RejectAction::Skip => {
82                    debug!("Filter: condition not met, skipping");
83                    Ok((FILTER_STATUS_SKIP, vec![]))
84                }
85            }
86        }
87    }
88}