use sonic_rs::JsonValueTrait as _;
use super::config::PreRouteFilterConfig;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PreRouteExtraction {
Found(String),
Missing,
ParseError(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PreRouteOutcome {
Continue,
Filtered,
Dlq(String),
}
#[derive(Debug, Clone)]
pub enum PreRouteFilter {
DropFieldMissing(String),
DlqFieldValue { field: String, value: String },
}
#[inline]
pub fn extract_routing_field(payload: &[u8], field_name: &str) -> PreRouteExtraction {
match sonic_rs::get_from_slice(payload, &[field_name]) {
Ok(lv) => {
let value = lv
.as_str()
.map_or_else(|| lv.as_raw_str().to_owned(), str::to_owned);
PreRouteExtraction::Found(value)
}
Err(e) if e.is_not_found() => PreRouteExtraction::Missing,
Err(e) => PreRouteExtraction::ParseError(e.to_string()),
}
}
#[must_use]
pub fn apply_filters(
extraction: &PreRouteExtraction,
filters: &[PreRouteFilter],
) -> PreRouteOutcome {
for filter in filters {
match (filter, extraction) {
(PreRouteFilter::DropFieldMissing(_field), PreRouteExtraction::Missing) => {
return PreRouteOutcome::Filtered;
}
(
PreRouteFilter::DlqFieldValue {
field: _field,
value: expected,
},
PreRouteExtraction::Found(actual),
) if actual == expected => {
return PreRouteOutcome::Dlq(format!("field value '{actual}' matches DLQ rule"));
}
_ => {}
}
}
PreRouteOutcome::Continue
}
#[must_use]
pub fn filters_from_config(configs: &[PreRouteFilterConfig]) -> Vec<PreRouteFilter> {
configs
.iter()
.map(|c| match c {
PreRouteFilterConfig::DropFieldMissing { field } => {
PreRouteFilter::DropFieldMissing(field.clone())
}
PreRouteFilterConfig::DlqFieldValue { field, value } => PreRouteFilter::DlqFieldValue {
field: field.clone(),
value: value.clone(),
},
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_routing_field_found() {
let payload = br#"{"_table": "events", "host": "web1"}"#;
let result = extract_routing_field(payload, "_table");
assert_eq!(result, PreRouteExtraction::Found("events".to_string()));
}
#[test]
fn extract_routing_field_missing() {
let payload = br#"{"host": "web1"}"#;
let result = extract_routing_field(payload, "_table");
assert_eq!(result, PreRouteExtraction::Missing);
}
#[test]
fn extract_routing_field_invalid_json() {
let payload = b"not json at all {{{";
let result = extract_routing_field(payload, "_table");
assert!(
matches!(result, PreRouteExtraction::ParseError(_)),
"expected ParseError, got {result:?}"
);
}
#[test]
fn extract_routing_field_numeric_value_returns_raw() {
let payload = br#"{"_table": 42}"#;
let result = extract_routing_field(payload, "_table");
assert_eq!(result, PreRouteExtraction::Found("42".to_string()));
}
#[test]
fn extract_routing_field_nested_object() {
let payload = br#"{"meta": {"source": "kafka"}, "_table": "logs"}"#;
let result = extract_routing_field(payload, "_table");
assert_eq!(result, PreRouteExtraction::Found("logs".to_string()));
}
#[test]
fn filter_drop_missing_field() {
let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
let result = apply_filters(&PreRouteExtraction::Missing, &filters);
assert_eq!(result, PreRouteOutcome::Filtered);
}
#[test]
fn filter_dlq_on_specific_value() {
let filters = vec![PreRouteFilter::DlqFieldValue {
field: "_table".to_string(),
value: "poison".to_string(),
}];
let result = apply_filters(&PreRouteExtraction::Found("poison".to_string()), &filters);
assert!(
matches!(result, PreRouteOutcome::Dlq(_)),
"expected Dlq, got {result:?}"
);
}
#[test]
fn filter_dlq_does_not_trigger_on_different_value() {
let filters = vec![PreRouteFilter::DlqFieldValue {
field: "_table".to_string(),
value: "poison".to_string(),
}];
let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
assert_eq!(result, PreRouteOutcome::Continue);
}
#[test]
fn no_filters_always_continue() {
assert_eq!(
apply_filters(&PreRouteExtraction::Found("x".to_string()), &[]),
PreRouteOutcome::Continue
);
assert_eq!(
apply_filters(&PreRouteExtraction::Missing, &[]),
PreRouteOutcome::Continue
);
assert_eq!(
apply_filters(&PreRouteExtraction::ParseError("bad".to_string()), &[]),
PreRouteOutcome::Continue
);
}
#[test]
fn filter_drop_missing_does_not_affect_found() {
let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
assert_eq!(result, PreRouteOutcome::Continue);
}
#[test]
fn filters_from_config_roundtrip() {
let configs = vec![
PreRouteFilterConfig::DropFieldMissing {
field: "_table".to_string(),
},
PreRouteFilterConfig::DlqFieldValue {
field: "status".to_string(),
value: "error".to_string(),
},
];
let filters = filters_from_config(&configs);
assert_eq!(filters.len(), 2);
assert!(matches!(filters[0], PreRouteFilter::DropFieldMissing(_)));
assert!(matches!(filters[1], PreRouteFilter::DlqFieldValue { .. }));
}
#[test]
fn first_matching_filter_wins() {
let filters = vec![
PreRouteFilter::DropFieldMissing("_table".to_string()),
PreRouteFilter::DlqFieldValue {
field: "_table".to_string(),
value: "anything".to_string(),
},
];
let result = apply_filters(&PreRouteExtraction::Missing, &filters);
assert_eq!(result, PreRouteOutcome::Filtered);
}
}