hyperi_rustlib/worker/engine/
pre_route.rs1use sonic_rs::JsonValueTrait as _;
16
17use super::config::PreRouteFilterConfig;
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum PreRouteExtraction {
22 Found(String),
24 Missing,
26 ParseError(String),
28}
29
30#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum PreRouteOutcome {
33 Continue,
35 Filtered,
37 Dlq(String),
39}
40
41#[derive(Debug, Clone)]
43pub enum PreRouteFilter {
44 DropFieldMissing(String),
46 DlqFieldValue { field: String, value: String },
48}
49
50#[inline]
62pub fn extract_routing_field(payload: &[u8], field_name: &str) -> PreRouteExtraction {
63 match sonic_rs::get_from_slice(payload, &[field_name]) {
64 Ok(lv) => {
65 let value = lv
69 .as_str()
70 .map_or_else(|| lv.as_raw_str().to_owned(), str::to_owned);
71 PreRouteExtraction::Found(value)
72 }
73 Err(e) if e.is_not_found() => PreRouteExtraction::Missing,
74 Err(e) => PreRouteExtraction::ParseError(e.to_string()),
75 }
76}
77
78#[must_use]
83pub fn apply_filters(
84 extraction: &PreRouteExtraction,
85 filters: &[PreRouteFilter],
86) -> PreRouteOutcome {
87 for filter in filters {
88 match (filter, extraction) {
89 (PreRouteFilter::DropFieldMissing(_field), PreRouteExtraction::Missing) => {
90 return PreRouteOutcome::Filtered;
91 }
92 (
93 PreRouteFilter::DlqFieldValue {
94 field: _field,
95 value: expected,
96 },
97 PreRouteExtraction::Found(actual),
98 ) if actual == expected => {
99 return PreRouteOutcome::Dlq(format!("field value '{actual}' matches DLQ rule"));
100 }
101 _ => {}
102 }
103 }
104
105 PreRouteOutcome::Continue
108}
109
110#[must_use]
112pub fn filters_from_config(configs: &[PreRouteFilterConfig]) -> Vec<PreRouteFilter> {
113 configs
114 .iter()
115 .map(|c| match c {
116 PreRouteFilterConfig::DropFieldMissing { field } => {
117 PreRouteFilter::DropFieldMissing(field.clone())
118 }
119 PreRouteFilterConfig::DlqFieldValue { field, value } => PreRouteFilter::DlqFieldValue {
120 field: field.clone(),
121 value: value.clone(),
122 },
123 })
124 .collect()
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
134 fn extract_routing_field_found() {
135 let payload = br#"{"_table": "events", "host": "web1"}"#;
136 let result = extract_routing_field(payload, "_table");
137 assert_eq!(result, PreRouteExtraction::Found("events".to_string()));
138 }
139
140 #[test]
141 fn extract_routing_field_missing() {
142 let payload = br#"{"host": "web1"}"#;
143 let result = extract_routing_field(payload, "_table");
144 assert_eq!(result, PreRouteExtraction::Missing);
145 }
146
147 #[test]
148 fn extract_routing_field_invalid_json() {
149 let payload = b"not json at all {{{";
150 let result = extract_routing_field(payload, "_table");
151 assert!(
152 matches!(result, PreRouteExtraction::ParseError(_)),
153 "expected ParseError, got {result:?}"
154 );
155 }
156
157 #[test]
158 fn extract_routing_field_numeric_value_returns_raw() {
159 let payload = br#"{"_table": 42}"#;
162 let result = extract_routing_field(payload, "_table");
163 assert_eq!(result, PreRouteExtraction::Found("42".to_string()));
164 }
165
166 #[test]
167 fn extract_routing_field_nested_object() {
168 let payload = br#"{"meta": {"source": "kafka"}, "_table": "logs"}"#;
169 let result = extract_routing_field(payload, "_table");
170 assert_eq!(result, PreRouteExtraction::Found("logs".to_string()));
171 }
172
173 #[test]
176 fn filter_drop_missing_field() {
177 let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
178 let result = apply_filters(&PreRouteExtraction::Missing, &filters);
179 assert_eq!(result, PreRouteOutcome::Filtered);
180 }
181
182 #[test]
183 fn filter_dlq_on_specific_value() {
184 let filters = vec![PreRouteFilter::DlqFieldValue {
185 field: "_table".to_string(),
186 value: "poison".to_string(),
187 }];
188 let result = apply_filters(&PreRouteExtraction::Found("poison".to_string()), &filters);
189 assert!(
190 matches!(result, PreRouteOutcome::Dlq(_)),
191 "expected Dlq, got {result:?}"
192 );
193 }
194
195 #[test]
196 fn filter_dlq_does_not_trigger_on_different_value() {
197 let filters = vec![PreRouteFilter::DlqFieldValue {
198 field: "_table".to_string(),
199 value: "poison".to_string(),
200 }];
201 let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
202 assert_eq!(result, PreRouteOutcome::Continue);
203 }
204
205 #[test]
206 fn no_filters_always_continue() {
207 assert_eq!(
208 apply_filters(&PreRouteExtraction::Found("x".to_string()), &[]),
209 PreRouteOutcome::Continue
210 );
211 assert_eq!(
212 apply_filters(&PreRouteExtraction::Missing, &[]),
213 PreRouteOutcome::Continue
214 );
215 assert_eq!(
216 apply_filters(&PreRouteExtraction::ParseError("bad".to_string()), &[]),
217 PreRouteOutcome::Continue
218 );
219 }
220
221 #[test]
222 fn filter_drop_missing_does_not_affect_found() {
223 let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
224 let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
225 assert_eq!(result, PreRouteOutcome::Continue);
226 }
227
228 #[test]
229 fn filters_from_config_roundtrip() {
230 let configs = vec![
231 PreRouteFilterConfig::DropFieldMissing {
232 field: "_table".to_string(),
233 },
234 PreRouteFilterConfig::DlqFieldValue {
235 field: "status".to_string(),
236 value: "error".to_string(),
237 },
238 ];
239 let filters = filters_from_config(&configs);
240 assert_eq!(filters.len(), 2);
241 assert!(matches!(filters[0], PreRouteFilter::DropFieldMissing(_)));
242 assert!(matches!(filters[1], PreRouteFilter::DlqFieldValue { .. }));
243 }
244
245 #[test]
246 fn first_matching_filter_wins() {
247 let filters = vec![
250 PreRouteFilter::DropFieldMissing("_table".to_string()),
251 PreRouteFilter::DlqFieldValue {
252 field: "_table".to_string(),
253 value: "anything".to_string(),
254 },
255 ];
256 let result = apply_filters(&PreRouteExtraction::Missing, &filters);
257 assert_eq!(result, PreRouteOutcome::Filtered);
258 }
259}