use serde::{Deserialize, Serialize};
use crate::common::message::Message;
use crate::error::{Error, Result};
use crate::transform::json_path;
use crate::transform::step::Step;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FilterOperator {
#[default]
Eq,
Ne,
Gt,
Ge,
Lt,
Le,
Contains,
Matches,
AbsGt,
AbsGe,
AbsLt,
AbsLe,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FilterMode {
#[default]
And,
Or,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FilterCondition {
pub field: String,
#[serde(default)]
pub operator: FilterOperator,
pub value: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum FilterStepConfig {
Multi {
#[serde(default)]
mode: FilterMode,
conditions: Vec<FilterCondition>,
},
Single(FilterCondition),
}
struct CompiledCondition {
path: json_path::CompiledPath,
operator: FilterOperator,
expected: serde_json::Value,
regex: Option<regex::Regex>,
}
impl CompiledCondition {
fn new(condition: FilterCondition) -> Result<Self> {
let path = json_path::CompiledPath::compile(&condition.field)?;
let regex = if condition.operator == FilterOperator::Matches {
let pattern = condition
.value
.as_str()
.ok_or_else(|| Error::config("Matches operator requires a string pattern"))?;
Some(
regex::Regex::new(pattern)
.map_err(|e| Error::config(format!("Invalid regex pattern: {}", e)))?,
)
} else {
None
};
Ok(Self {
path,
operator: condition.operator,
expected: condition.value,
regex,
})
}
fn evaluate(&self, payload: &serde_json::Value) -> bool {
let Some(field_value) = self.path.extract(payload) else {
return false; };
use FilterOperator::*;
use std::cmp::Ordering::{Equal, Greater, Less};
match self.operator {
Eq => field_value == &self.expected,
Ne => field_value != &self.expected,
Gt => compare_values(field_value, &self.expected) == Some(Greater),
Ge => matches!(
compare_values(field_value, &self.expected),
Some(Greater | Equal)
),
Lt => compare_values(field_value, &self.expected) == Some(Less),
Le => matches!(
compare_values(field_value, &self.expected),
Some(Less | Equal)
),
Contains => matches!(
(field_value.as_str(), self.expected.as_str()),
(Some(haystack), Some(needle)) if haystack.contains(needle)
),
Matches => matches!(
(field_value.as_str(), &self.regex),
(Some(text), Some(re)) if re.is_match(text)
),
AbsGt => compare_abs_values(field_value, &self.expected) == Some(Greater),
AbsGe => matches!(
compare_abs_values(field_value, &self.expected),
Some(Greater | Equal)
),
AbsLt => compare_abs_values(field_value, &self.expected) == Some(Less),
AbsLe => matches!(
compare_abs_values(field_value, &self.expected),
Some(Less | Equal)
),
}
}
}
fn compare_abs_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
match (a, b) {
(serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
let a_f64 = a.as_f64()?.abs();
let b_f64 = b.as_f64()?.abs();
a_f64.partial_cmp(&b_f64)
}
_ => None,
}
}
fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> Option<std::cmp::Ordering> {
match (a, b) {
(serde_json::Value::Number(a), serde_json::Value::Number(b)) => {
let a_f64 = a.as_f64()?;
let b_f64 = b.as_f64()?;
a_f64.partial_cmp(&b_f64)
}
(serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
_ => None,
}
}
pub struct FilterStep {
mode: FilterMode,
conditions: Vec<CompiledCondition>,
}
impl FilterStep {
pub fn new(config: FilterStepConfig) -> Result<Self> {
let (mode, raw_conditions) = match config {
FilterStepConfig::Multi { mode, conditions } => (mode, conditions),
FilterStepConfig::Single(cond) => (FilterMode::And, vec![cond]),
};
if raw_conditions.is_empty() {
return Err(Error::config("Filter step requires at least one condition"));
}
let mut conditions = Vec::with_capacity(raw_conditions.len());
for cond in raw_conditions {
conditions.push(CompiledCondition::new(cond)?);
}
Ok(Self { mode, conditions })
}
fn evaluate(&self, payload: &serde_json::Value) -> bool {
match self.mode {
FilterMode::And => self.conditions.iter().all(|c| c.evaluate(payload)),
FilterMode::Or => self.conditions.iter().any(|c| c.evaluate(payload)),
}
}
}
impl Step for FilterStep {
fn step_type(&self) -> &'static str {
"filter"
}
fn process(&self, msg: Message) -> Result<Option<Message>> {
let passes = self.evaluate(&msg.payload);
tracing::debug!(
conditions_count = self.conditions.len(),
mode = ?self.mode,
result = if passes { "passed" } else { "rejected" },
"Filter step evaluation"
);
Ok(if passes { Some(msg) } else { None })
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_single_condition_config() {
let yaml = r#"
field: $.status
operator: eq
value: "success"
"#;
let config: FilterStepConfig = serde_yaml::from_str(yaml).unwrap();
match config {
FilterStepConfig::Single(c) => {
assert_eq!(c.field, "$.status");
assert_eq!(c.operator, FilterOperator::Eq);
}
_ => panic!("Expected Single variant"),
}
}
#[test]
fn test_multi_condition_config() {
let yaml = r#"
mode: and
conditions:
- field: $.status
operator: eq
value: "active"
- field: $.score
operator: ge
value: 50
"#;
let config: FilterStepConfig = serde_yaml::from_str(yaml).unwrap();
match config {
FilterStepConfig::Multi { mode, conditions } => {
assert_eq!(mode, FilterMode::And);
assert_eq!(conditions.len(), 2);
}
_ => panic!("Expected Multi variant"),
}
}
#[test]
fn test_multi_condition_or_mode() {
let yaml = r#"
mode: or
conditions:
- field: $.type
value: "premium"
- field: $.score
operator: gt
value: 100
"#;
let config: FilterStepConfig = serde_yaml::from_str(yaml).unwrap();
match config {
FilterStepConfig::Multi { mode, .. } => {
assert_eq!(mode, FilterMode::Or);
}
_ => panic!("Expected Multi variant"),
}
}
fn make_msg(payload: serde_json::Value) -> Message {
Message::new("test", payload)
}
#[test]
fn test_filter_single_eq_pass() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.status".into(),
operator: FilterOperator::Eq,
value: json!("success"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"status": "success"}));
let result = step.process(msg).unwrap();
assert!(result.is_some());
}
#[test]
fn test_filter_single_eq_reject() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.status".into(),
operator: FilterOperator::Eq,
value: json!("success"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"status": "failure"}));
let result = step.process(msg).unwrap();
assert!(result.is_none());
}
#[test]
fn test_filter_multi_and_all_pass() {
let config = FilterStepConfig::Multi {
mode: FilterMode::And,
conditions: vec![
FilterCondition {
field: "$.status".into(),
operator: FilterOperator::Eq,
value: json!("active"),
},
FilterCondition {
field: "$.score".into(),
operator: FilterOperator::Ge,
value: json!(50),
},
],
};
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"status": "active", "score": 75}));
assert!(step.process(msg).unwrap().is_some());
}
#[test]
fn test_filter_multi_and_one_fails() {
let config = FilterStepConfig::Multi {
mode: FilterMode::And,
conditions: vec![
FilterCondition {
field: "$.status".into(),
operator: FilterOperator::Eq,
value: json!("active"),
},
FilterCondition {
field: "$.score".into(),
operator: FilterOperator::Ge,
value: json!(50),
},
],
};
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"status": "active", "score": 30}));
assert!(step.process(msg).unwrap().is_none());
}
#[test]
fn test_filter_multi_or_one_passes() {
let config = FilterStepConfig::Multi {
mode: FilterMode::Or,
conditions: vec![
FilterCondition {
field: "$.type".into(),
operator: FilterOperator::Eq,
value: json!("premium"),
},
FilterCondition {
field: "$.score".into(),
operator: FilterOperator::Gt,
value: json!(100),
},
],
};
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"type": "premium", "score": 50}));
assert!(step.process(msg).unwrap().is_some());
let msg = make_msg(json!({"type": "basic", "score": 150}));
assert!(step.process(msg).unwrap().is_some());
}
#[test]
fn test_filter_multi_or_all_fail() {
let config = FilterStepConfig::Multi {
mode: FilterMode::Or,
conditions: vec![
FilterCondition {
field: "$.type".into(),
operator: FilterOperator::Eq,
value: json!("premium"),
},
FilterCondition {
field: "$.score".into(),
operator: FilterOperator::Gt,
value: json!(100),
},
],
};
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"type": "basic", "score": 50}));
assert!(step.process(msg).unwrap().is_none());
}
#[test]
fn test_filter_nested_field() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.data.user.role".into(),
operator: FilterOperator::Eq,
value: json!("admin"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"data": {"user": {"role": "admin"}}}));
assert!(step.process(msg).unwrap().is_some());
}
#[test]
fn test_filter_array_index() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.items[0].id".into(),
operator: FilterOperator::Eq,
value: json!(1),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"items": [{"id": 1}, {"id": 2}]}));
assert!(step.process(msg).unwrap().is_some());
}
#[test]
fn test_filter_contains_operator() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.message".into(),
operator: FilterOperator::Contains,
value: json!("error"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"message": "An error occurred"}));
assert!(step.process(msg).unwrap().is_some());
let msg = make_msg(json!({"message": "All good"}));
assert!(step.process(msg).unwrap().is_none());
}
#[test]
fn test_filter_matches_operator() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.email".into(),
operator: FilterOperator::Matches,
value: json!(r"^[a-z]+@example\.com$"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"email": "alice@example.com"}));
assert!(step.process(msg).unwrap().is_some());
let msg = make_msg(json!({"email": "alice@other.com"}));
assert!(step.process(msg).unwrap().is_none());
}
#[test]
fn test_filter_abs_gt_operator() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.change".into(),
operator: FilterOperator::AbsGt,
value: json!(2.0),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"change": 3.0}));
assert!(step.process(msg).unwrap().is_some());
let msg = make_msg(json!({"change": -5.0}));
assert!(step.process(msg).unwrap().is_some());
let msg = make_msg(json!({"change": 1.0}));
assert!(step.process(msg).unwrap().is_none());
let msg = make_msg(json!({"change": -1.0}));
assert!(step.process(msg).unwrap().is_none());
}
#[test]
fn test_filter_invalid_regex_error() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.x".into(),
operator: FilterOperator::Matches,
value: json!("[invalid regex"),
});
assert!(FilterStep::new(config).is_err());
}
#[test]
fn test_filter_empty_conditions_error() {
let config = FilterStepConfig::Multi {
mode: FilterMode::And,
conditions: vec![],
};
assert!(FilterStep::new(config).is_err());
}
#[test]
fn test_filter_missing_field_fails() {
let config = FilterStepConfig::Single(FilterCondition {
field: "$.nonexistent".into(),
operator: FilterOperator::Eq,
value: json!("value"),
});
let step = FilterStep::new(config).unwrap();
let msg = make_msg(json!({"other": "field"}));
assert!(step.process(msg).unwrap().is_none());
}
}