hyperi_rustlib/worker/engine/
config.rs1use serde::{Deserialize, Serialize};
10
11use super::types::PayloadFormat;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum ParseErrorAction {
17 #[default]
19 Dlq,
20 Skip,
22 FailBatch,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", rename_all = "snake_case")]
32pub enum PreRouteFilterConfig {
33 DropFieldMissing {
35 field: String,
37 },
38 DlqFieldValue {
40 field: String,
42 value: String,
44 },
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BatchProcessingConfig {
53 #[serde(default = "default_max_chunk_size")]
58 pub max_chunk_size: usize,
59
60 #[serde(default)]
62 pub format: PayloadFormat,
63
64 #[serde(default)]
68 pub routing_field: Option<String>,
69
70 #[serde(default)]
74 pub pre_route_filters: Vec<PreRouteFilterConfig>,
75
76 #[serde(default = "default_memory_pressure_pause_ms")]
78 pub memory_pressure_pause_ms: u64,
79
80 #[serde(default = "default_parse_error_action")]
82 pub parse_error_action: ParseErrorAction,
83
84 #[serde(default = "default_known_fields")]
90 pub known_fields: Vec<String>,
91}
92
93fn default_max_chunk_size() -> usize {
94 10_000
95}
96
97fn default_memory_pressure_pause_ms() -> u64 {
98 50
99}
100
101fn default_parse_error_action() -> ParseErrorAction {
102 ParseErrorAction::Dlq
103}
104
105fn default_known_fields() -> Vec<String> {
106 vec![
107 "_table".to_string(),
108 "_timestamp".to_string(),
109 "_source".to_string(),
110 "host".to_string(),
111 "source_type".to_string(),
112 "event_type".to_string(),
113 ]
114}
115
116impl Default for BatchProcessingConfig {
117 fn default() -> Self {
118 Self {
119 max_chunk_size: default_max_chunk_size(),
120 format: PayloadFormat::default(),
121 routing_field: None,
122 pre_route_filters: vec![],
123 memory_pressure_pause_ms: default_memory_pressure_pause_ms(),
124 parse_error_action: default_parse_error_action(),
125 known_fields: default_known_fields(),
126 }
127 }
128}
129
130impl BatchProcessingConfig {
131 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
141 let config: Self = if let Some(cfg) = crate::config::try_get() {
142 cfg.unmarshal_key(key).unwrap_or_default()
143 } else {
144 tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
145 Self::default()
146 };
147 Ok(config)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn default_config_values() {
157 let config = BatchProcessingConfig::default();
158 assert_eq!(config.max_chunk_size, 10_000);
159 assert!(config.routing_field.is_none());
160 assert_eq!(config.memory_pressure_pause_ms, 50);
161 assert_eq!(config.known_fields.len(), 6);
162 assert!(config.known_fields.contains(&"_table".to_string()));
163 }
164
165 #[test]
166 fn from_cascade_falls_back_to_defaults() {
167 let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
168 assert_eq!(config.max_chunk_size, 10_000);
169 }
170
171 #[test]
172 fn parse_error_action_default_is_dlq() {
173 let action = ParseErrorAction::default();
174 assert!(matches!(action, ParseErrorAction::Dlq));
175 }
176
177 #[test]
178 fn serde_roundtrip() {
179 let config = BatchProcessingConfig::default();
180 let json = serde_json::to_string(&config).unwrap();
181 let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
182 assert_eq!(parsed.max_chunk_size, 10_000);
183 }
184
185 #[test]
186 fn pre_route_filter_serde_roundtrip() {
187 let filter = PreRouteFilterConfig::DropFieldMissing {
188 field: "_table".to_string(),
189 };
190 let json = serde_json::to_string(&filter).unwrap();
191 let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
192 assert!(matches!(
193 back,
194 PreRouteFilterConfig::DropFieldMissing { .. }
195 ));
196 }
197
198 #[test]
199 fn parse_error_action_variants_serde() {
200 let actions = [
201 ParseErrorAction::Dlq,
202 ParseErrorAction::Skip,
203 ParseErrorAction::FailBatch,
204 ];
205 for action in actions {
206 let json = serde_json::to_string(&action).unwrap();
207 let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
208 assert_eq!(action, back);
209 }
210 }
211}