Skip to main content

hyperi_rustlib/worker/engine/
pre_route.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/pre_route.rs
3// Purpose:   Zero-copy pre-route field extraction and filter evaluation
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Pre-route phase: extract a routing field from raw JSON bytes using
10//! `sonic_rs::get_from_slice` (SIMD-accelerated), then apply filters
11//! to decide whether the message continues, is dropped, or goes to DLQ.
12//!
13//! Hot path: ~50-100 ns per message.
14
15use sonic_rs::JsonValueTrait as _;
16
17use super::config::PreRouteFilterConfig;
18
19/// Result of extracting the routing field from raw bytes.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub enum PreRouteExtraction {
22    /// Field found with this string value.
23    Found(String),
24    /// Field not present in the JSON object.
25    Missing,
26    /// Payload is not valid JSON.
27    ParseError(String),
28}
29
30/// Outcome after applying filters to a pre-route extraction.
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub enum PreRouteOutcome {
33    /// Message passes -- proceed to parse + transform.
34    Continue,
35    /// Message filtered out -- skip parse, include in commit.
36    Filtered,
37    /// Message routes to DLQ with reason.
38    Dlq(String),
39}
40
41/// Runtime filter derived from [`PreRouteFilterConfig`].
42#[derive(Debug, Clone)]
43pub enum PreRouteFilter {
44    /// Drop (filter) the message when the named field is absent.
45    DropFieldMissing(String),
46    /// Route to DLQ when the named field equals a specific value.
47    DlqFieldValue { field: String, value: String },
48}
49
50/// Extract a routing field from raw JSON bytes using SIMD.
51///
52/// Uses `sonic_rs::get_from_slice` for zero-copy extraction. The error path
53/// (distinguishing a missing field from invalid JSON) requires a full
54/// validity check and is intentionally cold.
55///
56/// # Behaviour
57/// - `Ok(lazy_value)` and value is a string → `Found(string)`
58/// - `Ok(lazy_value)` and value is not a string → `Found(raw_str)` (raw JSON)
59/// - `Err` with `is_not_found()` → `Missing`
60/// - `Err` other → `ParseError`
61#[inline]
62pub fn extract_routing_field(payload: &[u8], field_name: &str) -> PreRouteExtraction {
63    match sonic_rs::get_from_slice(payload, &[field_name]) {
64        Ok(lv) => {
65            // Extract the string value if it is a JSON string, otherwise
66            // fall back to the raw representation (e.g. a number or bool
67            // used as a routing key).
68            let value = lv
69                .as_str()
70                .map_or_else(|| lv.as_raw_str().to_owned(), str::to_owned);
71            PreRouteExtraction::Found(value)
72        }
73        Err(e) if e.is_not_found() => PreRouteExtraction::Missing,
74        Err(e) => PreRouteExtraction::ParseError(e.to_string()),
75    }
76}
77
78/// Apply a list of runtime filters to a pre-route extraction result.
79///
80/// Filters are evaluated in order -- first match wins. If no filter matches
81/// the message continues.
82#[must_use]
83pub fn apply_filters(
84    extraction: &PreRouteExtraction,
85    filters: &[PreRouteFilter],
86) -> PreRouteOutcome {
87    for filter in filters {
88        match (filter, extraction) {
89            (PreRouteFilter::DropFieldMissing(_field), PreRouteExtraction::Missing) => {
90                return PreRouteOutcome::Filtered;
91            }
92            (
93                PreRouteFilter::DlqFieldValue {
94                    field: _field,
95                    value: expected,
96                },
97                PreRouteExtraction::Found(actual),
98            ) if actual == expected => {
99                return PreRouteOutcome::Dlq(format!("field value '{actual}' matches DLQ rule"));
100            }
101            _ => {}
102        }
103    }
104
105    // A parse error with no filters still results in Continue -- the parse
106    // phase will detect and handle the invalid payload.
107    PreRouteOutcome::Continue
108}
109
110/// Convert config-layer filter definitions to runtime filters.
111#[must_use]
112pub fn filters_from_config(configs: &[PreRouteFilterConfig]) -> Vec<PreRouteFilter> {
113    configs
114        .iter()
115        .map(|c| match c {
116            PreRouteFilterConfig::DropFieldMissing { field } => {
117                PreRouteFilter::DropFieldMissing(field.clone())
118            }
119            PreRouteFilterConfig::DlqFieldValue { field, value } => PreRouteFilter::DlqFieldValue {
120                field: field.clone(),
121                value: value.clone(),
122            },
123        })
124        .collect()
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    // ---- extraction tests ---------------------------------------------------
132
133    #[test]
134    fn extract_routing_field_found() {
135        let payload = br#"{"_table": "events", "host": "web1"}"#;
136        let result = extract_routing_field(payload, "_table");
137        assert_eq!(result, PreRouteExtraction::Found("events".to_string()));
138    }
139
140    #[test]
141    fn extract_routing_field_missing() {
142        let payload = br#"{"host": "web1"}"#;
143        let result = extract_routing_field(payload, "_table");
144        assert_eq!(result, PreRouteExtraction::Missing);
145    }
146
147    #[test]
148    fn extract_routing_field_invalid_json() {
149        let payload = b"not json at all {{{";
150        let result = extract_routing_field(payload, "_table");
151        assert!(
152            matches!(result, PreRouteExtraction::ParseError(_)),
153            "expected ParseError, got {result:?}"
154        );
155    }
156
157    #[test]
158    fn extract_routing_field_numeric_value_returns_raw() {
159        // Routing fields are sometimes integers in practice (e.g. a table ID).
160        // The extractor should return the raw representation rather than panic.
161        let payload = br#"{"_table": 42}"#;
162        let result = extract_routing_field(payload, "_table");
163        assert_eq!(result, PreRouteExtraction::Found("42".to_string()));
164    }
165
166    #[test]
167    fn extract_routing_field_nested_object() {
168        let payload = br#"{"meta": {"source": "kafka"}, "_table": "logs"}"#;
169        let result = extract_routing_field(payload, "_table");
170        assert_eq!(result, PreRouteExtraction::Found("logs".to_string()));
171    }
172
173    // ---- filter tests -------------------------------------------------------
174
175    #[test]
176    fn filter_drop_missing_field() {
177        let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
178        let result = apply_filters(&PreRouteExtraction::Missing, &filters);
179        assert_eq!(result, PreRouteOutcome::Filtered);
180    }
181
182    #[test]
183    fn filter_dlq_on_specific_value() {
184        let filters = vec![PreRouteFilter::DlqFieldValue {
185            field: "_table".to_string(),
186            value: "poison".to_string(),
187        }];
188        let result = apply_filters(&PreRouteExtraction::Found("poison".to_string()), &filters);
189        assert!(
190            matches!(result, PreRouteOutcome::Dlq(_)),
191            "expected Dlq, got {result:?}"
192        );
193    }
194
195    #[test]
196    fn filter_dlq_does_not_trigger_on_different_value() {
197        let filters = vec![PreRouteFilter::DlqFieldValue {
198            field: "_table".to_string(),
199            value: "poison".to_string(),
200        }];
201        let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
202        assert_eq!(result, PreRouteOutcome::Continue);
203    }
204
205    #[test]
206    fn no_filters_always_continue() {
207        assert_eq!(
208            apply_filters(&PreRouteExtraction::Found("x".to_string()), &[]),
209            PreRouteOutcome::Continue
210        );
211        assert_eq!(
212            apply_filters(&PreRouteExtraction::Missing, &[]),
213            PreRouteOutcome::Continue
214        );
215        assert_eq!(
216            apply_filters(&PreRouteExtraction::ParseError("bad".to_string()), &[]),
217            PreRouteOutcome::Continue
218        );
219    }
220
221    #[test]
222    fn filter_drop_missing_does_not_affect_found() {
223        let filters = vec![PreRouteFilter::DropFieldMissing("_table".to_string())];
224        let result = apply_filters(&PreRouteExtraction::Found("events".to_string()), &filters);
225        assert_eq!(result, PreRouteOutcome::Continue);
226    }
227
228    #[test]
229    fn filters_from_config_roundtrip() {
230        let configs = vec![
231            PreRouteFilterConfig::DropFieldMissing {
232                field: "_table".to_string(),
233            },
234            PreRouteFilterConfig::DlqFieldValue {
235                field: "status".to_string(),
236                value: "error".to_string(),
237            },
238        ];
239        let filters = filters_from_config(&configs);
240        assert_eq!(filters.len(), 2);
241        assert!(matches!(filters[0], PreRouteFilter::DropFieldMissing(_)));
242        assert!(matches!(filters[1], PreRouteFilter::DlqFieldValue { .. }));
243    }
244
245    #[test]
246    fn first_matching_filter_wins() {
247        // Two filters: drop-missing and DLQ-on-value. With a Missing extraction
248        // only the first (drop-missing) should fire.
249        let filters = vec![
250            PreRouteFilter::DropFieldMissing("_table".to_string()),
251            PreRouteFilter::DlqFieldValue {
252                field: "_table".to_string(),
253                value: "anything".to_string(),
254            },
255        ];
256        let result = apply_filters(&PreRouteExtraction::Missing, &filters);
257        assert_eq!(result, PreRouteOutcome::Filtered);
258    }
259}