Skip to main content

hyperi_rustlib/worker/engine/
config.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/config.rs
3// Purpose:   Configuration for the SIMD-optimised batch processing engine
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9use serde::{Deserialize, Serialize};
10
11use super::types::PayloadFormat;
12
13/// Action to take when a message fails to parse.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum ParseErrorAction {
17    /// Route failed messages to the dead-letter queue (default).
18    #[default]
19    Dlq,
20    /// Silently skip failed messages (counted but not DLQ'd).
21    Skip,
22    /// Fail the entire batch on the first parse error.
23    FailBatch,
24}
25
26/// Pre-route filter applied to each message before routing decisions.
27///
28/// Filters are evaluated in order. The first filter that matches determines
29/// the message's [`super::types::PreRouteResult`].
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", rename_all = "snake_case")]
32pub enum PreRouteFilterConfig {
33    /// Route to DLQ if a required field is absent.
34    DropFieldMissing {
35        /// Name of the required field.
36        field: String,
37    },
38    /// Route to DLQ if a field equals a specific value.
39    DlqFieldValue {
40        /// Name of the field to check.
41        field: String,
42        /// Value that triggers DLQ routing.
43        value: String,
44    },
45}
46
47/// Configuration for the batch processing engine.
48///
49/// All values are overridable via the 8-layer config cascade
50/// (CLI > ENV > .env > settings.{env}.yaml > settings.yaml > defaults > rustlib > hard-coded).
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BatchProcessingConfig {
53    /// Maximum number of messages per rayon chunk.
54    ///
55    /// Smaller chunks reduce per-task overhead; larger chunks amortise
56    /// the rayon work-stealing cost. Default 10 000 matches DFE batch sizes.
57    #[serde(default = "default_max_chunk_size")]
58    pub max_chunk_size: usize,
59
60    /// Expected payload format (auto-detect by default).
61    #[serde(default)]
62    pub format: PayloadFormat,
63
64    /// JSON field used to route messages to the correct downstream sink.
65    ///
66    /// For dfe-loader this is typically `"_table"`.
67    #[serde(default)]
68    pub routing_field: Option<String>,
69
70    /// Pre-route filters applied before routing decisions.
71    ///
72    /// Evaluated in order -- first match wins.
73    #[serde(default)]
74    pub pre_route_filters: Vec<PreRouteFilterConfig>,
75
76    /// Milliseconds to pause between batches when memory pressure is high.
77    #[serde(default = "default_memory_pressure_pause_ms")]
78    pub memory_pressure_pause_ms: u64,
79
80    /// Action to take when a message fails JSON parsing.
81    #[serde(default = "default_parse_error_action")]
82    pub parse_error_action: ParseErrorAction,
83
84    /// Fields to pre-extract into [`super::types::ParsedMessage::Parsed::extracted`]
85    /// for fast routing lookups.
86    ///
87    /// Extracting these at parse time avoids repeated `value.get()` traversals
88    /// during routing and filtering.
89    #[serde(default = "default_known_fields")]
90    pub known_fields: Vec<String>,
91}
92
93fn default_max_chunk_size() -> usize {
94    10_000
95}
96
97fn default_memory_pressure_pause_ms() -> u64 {
98    50
99}
100
101fn default_parse_error_action() -> ParseErrorAction {
102    ParseErrorAction::Dlq
103}
104
105fn default_known_fields() -> Vec<String> {
106    vec![
107        "_table".to_string(),
108        "_timestamp".to_string(),
109        "_source".to_string(),
110        "host".to_string(),
111        "source_type".to_string(),
112        "event_type".to_string(),
113    ]
114}
115
116impl Default for BatchProcessingConfig {
117    fn default() -> Self {
118        Self {
119            max_chunk_size: default_max_chunk_size(),
120            format: PayloadFormat::default(),
121            routing_field: None,
122            pre_route_filters: vec![],
123            memory_pressure_pause_ms: default_memory_pressure_pause_ms(),
124            parse_error_action: default_parse_error_action(),
125            known_fields: default_known_fields(),
126        }
127    }
128}
129
130impl BatchProcessingConfig {
131    /// Load config from the cascade under the given key (e.g. `"batch_processing"`).
132    ///
133    /// Falls back to defaults if the config cascade is not initialised or the
134    /// key is absent.
135    ///
136    /// # Errors
137    ///
138    /// Returns a `ConfigError` only if the cascade is initialised and the key
139    /// contains data that cannot be deserialised into `BatchProcessingConfig`.
140    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
141        let config: Self = if let Some(cfg) = crate::config::try_get() {
142            cfg.unmarshal_key(key).unwrap_or_default()
143        } else {
144            tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
145            Self::default()
146        };
147        Ok(config)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn default_config_values() {
157        let config = BatchProcessingConfig::default();
158        assert_eq!(config.max_chunk_size, 10_000);
159        assert!(config.routing_field.is_none());
160        assert_eq!(config.memory_pressure_pause_ms, 50);
161        assert_eq!(config.known_fields.len(), 6);
162        assert!(config.known_fields.contains(&"_table".to_string()));
163    }
164
165    #[test]
166    fn from_cascade_falls_back_to_defaults() {
167        let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
168        assert_eq!(config.max_chunk_size, 10_000);
169    }
170
171    #[test]
172    fn parse_error_action_default_is_dlq() {
173        let action = ParseErrorAction::default();
174        assert!(matches!(action, ParseErrorAction::Dlq));
175    }
176
177    #[test]
178    fn serde_roundtrip() {
179        let config = BatchProcessingConfig::default();
180        let json = serde_json::to_string(&config).unwrap();
181        let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
182        assert_eq!(parsed.max_chunk_size, 10_000);
183    }
184
185    #[test]
186    fn pre_route_filter_serde_roundtrip() {
187        let filter = PreRouteFilterConfig::DropFieldMissing {
188            field: "_table".to_string(),
189        };
190        let json = serde_json::to_string(&filter).unwrap();
191        let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
192        assert!(matches!(
193            back,
194            PreRouteFilterConfig::DropFieldMissing { .. }
195        ));
196    }
197
198    #[test]
199    fn parse_error_action_variants_serde() {
200        let actions = [
201            ParseErrorAction::Dlq,
202            ParseErrorAction::Skip,
203            ParseErrorAction::FailBatch,
204        ];
205        for action in actions {
206            let json = serde_json::to_string(&action).unwrap();
207            let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
208            assert_eq!(action, back);
209        }
210    }
211}