dataflow_rs/engine/functions/
filter.rs1use crate::engine::error::Result;
2use crate::engine::message::{Change, Message};
3use datalogic_rs::{CompiledLogic, DataLogic};
4use log::{debug, info};
5use serde::Deserialize;
6use serde_json::Value;
7use std::sync::Arc;
8
9pub const FILTER_STATUS_PASS: usize = 200;
11pub const FILTER_STATUS_SKIP: usize = 298;
13pub const FILTER_STATUS_HALT: usize = 299;
15
16#[derive(Debug, Clone, Deserialize, Default)]
18#[serde(rename_all = "lowercase")]
19pub enum RejectAction {
20 #[default]
22 Halt,
23 Skip,
25}
26
27#[derive(Debug, Clone, Deserialize)]
29pub struct FilterConfig {
30 pub condition: Value,
33
34 #[serde(default)]
36 pub on_reject: RejectAction,
37
38 #[serde(skip)]
40 pub condition_index: Option<usize>,
41}
42
43impl FilterConfig {
44 pub fn execute(
48 &self,
49 message: &mut Message,
50 datalogic: &Arc<DataLogic>,
51 logic_cache: &[Arc<CompiledLogic>],
52 ) -> Result<(usize, Vec<Change>)> {
53 let context_arc = message.get_context_arc();
54
55 let condition_met = match self.condition_index {
56 Some(idx) if idx < logic_cache.len() => {
57 match datalogic.evaluate(&logic_cache[idx], Arc::clone(&context_arc)) {
58 Ok(Value::Bool(true)) => true,
59 Ok(_) => false,
60 Err(e) => {
61 debug!("Filter: condition evaluation error: {:?}", e);
62 false
63 }
64 }
65 }
66 _ => {
67 debug!("Filter: condition not compiled, treating as not met");
68 false
69 }
70 };
71
72 if condition_met {
73 debug!("Filter: condition passed");
74 Ok((FILTER_STATUS_PASS, vec![]))
75 } else {
76 match self.on_reject {
77 RejectAction::Halt => {
78 info!("Filter: condition not met, halting workflow");
79 Ok((FILTER_STATUS_HALT, vec![]))
80 }
81 RejectAction::Skip => {
82 debug!("Filter: condition not met, skipping");
83 Ok((FILTER_STATUS_SKIP, vec![]))
84 }
85 }
86 }
87 }
88}