Skip to main content

hyperi_rustlib/worker/engine/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/config.rs
3// Purpose:   Configuration for the SIMD-optimised batch processing engine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use serde::{Deserialize, Serialize};
10
11use super::types::PayloadFormat;
12
13/// Action to take when a message fails to parse.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum ParseErrorAction {
17    /// Route failed messages to the dead-letter queue (default).
18    #[default]
19    Dlq,
20    /// Silently skip failed messages (counted but not DLQ'd).
21    Skip,
22    /// Fail the entire batch on the first parse error.
23    FailBatch,
24}
25
26/// Pre-route filter applied to each message before routing decisions.
27///
28/// Filters are evaluated in order. The first filter that matches determines
29/// the message's [`super::types::PreRouteResult`].
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", rename_all = "snake_case")]
32pub enum PreRouteFilterConfig {
33    /// Route to DLQ if a required field is absent.
34    DropFieldMissing {
35        /// Name of the required field.
36        field: String,
37    },
38    /// Route to DLQ if a field equals a specific value.
39    DlqFieldValue {
40        /// Name of the field to check.
41        field: String,
42        /// Value that triggers DLQ routing.
43        value: String,
44    },
45}
46
47/// Configuration for the batch processing engine.
48///
49/// All values are overridable via the 8-layer config cascade
50/// (CLI > ENV > .env > settings.{env}.yaml > settings.yaml > defaults > rustlib > hard-coded).
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BatchProcessingConfig {
53    /// Maximum number of messages per rayon chunk.
54    ///
55    /// Smaller chunks reduce per-task overhead; larger chunks amortise
56    /// the rayon work-stealing cost. Default 10 000 matches DFE batch sizes.
57    #[serde(default = "default_max_chunk_size")]
58    pub max_chunk_size: usize,
59
60    /// Expected payload format (auto-detect by default).
61    #[serde(default)]
62    pub format: PayloadFormat,
63
64    /// JSON field used to route messages to the correct downstream sink.
65    ///
66    /// For dfe-loader this is typically `"_table"`.
67    #[serde(default)]
68    pub routing_field: Option<String>,
69
70    /// Pre-route filters applied before routing decisions.
71    ///
72    /// Evaluated in order -- first match wins.
73    #[serde(default)]
74    pub pre_route_filters: Vec<PreRouteFilterConfig>,
75
76    /// Action to take when a message fails JSON parsing.
77    #[serde(default = "default_parse_error_action")]
78    pub parse_error_action: ParseErrorAction,
79
80    /// Fields to pre-extract into [`super::types::ParsedMessage::Parsed::extracted`]
81    /// for fast routing lookups.
82    ///
83    /// Extracting these at parse time avoids repeated `value.get()` traversals
84    /// during routing and filtering.
85    #[serde(default = "default_known_fields")]
86    pub known_fields: Vec<String>,
87}
88
89fn default_max_chunk_size() -> usize {
90    10_000
91}
92
93fn default_parse_error_action() -> ParseErrorAction {
94    ParseErrorAction::Dlq
95}
96
97fn default_known_fields() -> Vec<String> {
98    vec![
99        "_table".to_string(),
100        "_timestamp".to_string(),
101        "_source".to_string(),
102        "host".to_string(),
103        "source_type".to_string(),
104        "event_type".to_string(),
105    ]
106}
107
108impl Default for BatchProcessingConfig {
109    fn default() -> Self {
110        Self {
111            max_chunk_size: default_max_chunk_size(),
112            format: PayloadFormat::default(),
113            routing_field: None,
114            pre_route_filters: vec![],
115            parse_error_action: default_parse_error_action(),
116            known_fields: default_known_fields(),
117        }
118    }
119}
120
121impl BatchProcessingConfig {
122    /// Load config from the cascade under the given key (e.g. `"batch_processing"`).
123    ///
124    /// Falls back to defaults if the config cascade is not initialised or the
125    /// key is absent.
126    ///
127    /// # Errors
128    ///
129    /// Returns a `ConfigError` only if the cascade is initialised and the key
130    /// contains data that cannot be deserialised into `BatchProcessingConfig`.
131    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
132        let config: Self = if let Some(cfg) = crate::config::try_get() {
133            cfg.unmarshal_key(key).unwrap_or_default()
134        } else {
135            tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
136            Self::default()
137        };
138        Ok(config)
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn default_config_values() {
148        let config = BatchProcessingConfig::default();
149        assert_eq!(config.max_chunk_size, 10_000);
150        assert!(config.routing_field.is_none());
151        assert_eq!(config.known_fields.len(), 6);
152        assert!(config.known_fields.contains(&"_table".to_string()));
153    }
154
155    #[test]
156    fn from_cascade_falls_back_to_defaults() {
157        let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
158        assert_eq!(config.max_chunk_size, 10_000);
159    }
160
161    #[test]
162    fn parse_error_action_default_is_dlq() {
163        let action = ParseErrorAction::default();
164        assert!(matches!(action, ParseErrorAction::Dlq));
165    }
166
167    #[test]
168    fn serde_roundtrip() {
169        let config = BatchProcessingConfig::default();
170        let json = serde_json::to_string(&config).unwrap();
171        let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
172        assert_eq!(parsed.max_chunk_size, 10_000);
173    }
174
175    #[test]
176    fn pre_route_filter_serde_roundtrip() {
177        let filter = PreRouteFilterConfig::DropFieldMissing {
178            field: "_table".to_string(),
179        };
180        let json = serde_json::to_string(&filter).unwrap();
181        let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
182        assert!(matches!(
183            back,
184            PreRouteFilterConfig::DropFieldMissing { .. }
185        ));
186    }
187
188    #[test]
189    fn parse_error_action_variants_serde() {
190        let actions = [
191            ParseErrorAction::Dlq,
192            ParseErrorAction::Skip,
193            ParseErrorAction::FailBatch,
194        ];
195        for action in actions {
196            let json = serde_json::to_string(&action).unwrap();
197            let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
198            assert_eq!(action, back);
199        }
200    }
201}