hyperi_rustlib/worker/engine/
config.rs1use serde::{Deserialize, Serialize};
10
11use super::types::PayloadFormat;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum ParseErrorAction {
17 #[default]
19 Dlq,
20 Skip,
22 FailBatch,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", rename_all = "snake_case")]
32pub enum PreRouteFilterConfig {
33 DropFieldMissing {
35 field: String,
37 },
38 DlqFieldValue {
40 field: String,
42 value: String,
44 },
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BatchProcessingConfig {
53 #[serde(default = "default_max_chunk_size")]
58 pub max_chunk_size: usize,
59
60 #[serde(default)]
62 pub format: PayloadFormat,
63
64 #[serde(default)]
68 pub routing_field: Option<String>,
69
70 #[serde(default)]
74 pub pre_route_filters: Vec<PreRouteFilterConfig>,
75
76 #[serde(default = "default_parse_error_action")]
78 pub parse_error_action: ParseErrorAction,
79
80 #[serde(default = "default_known_fields")]
86 pub known_fields: Vec<String>,
87}
88
89fn default_max_chunk_size() -> usize {
90 10_000
91}
92
93fn default_parse_error_action() -> ParseErrorAction {
94 ParseErrorAction::Dlq
95}
96
97fn default_known_fields() -> Vec<String> {
98 vec![
99 "_table".to_string(),
100 "_timestamp".to_string(),
101 "_source".to_string(),
102 "host".to_string(),
103 "source_type".to_string(),
104 "event_type".to_string(),
105 ]
106}
107
108impl Default for BatchProcessingConfig {
109 fn default() -> Self {
110 Self {
111 max_chunk_size: default_max_chunk_size(),
112 format: PayloadFormat::default(),
113 routing_field: None,
114 pre_route_filters: vec![],
115 parse_error_action: default_parse_error_action(),
116 known_fields: default_known_fields(),
117 }
118 }
119}
120
121impl BatchProcessingConfig {
122 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
132 let config: Self = if let Some(cfg) = crate::config::try_get() {
133 cfg.unmarshal_key_or_warn(key).unwrap_or_default()
136 } else {
137 tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
138 Self::default()
139 };
140 Ok(config)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn default_config_values() {
150 let config = BatchProcessingConfig::default();
151 assert_eq!(config.max_chunk_size, 10_000);
152 assert!(config.routing_field.is_none());
153 assert_eq!(config.known_fields.len(), 6);
154 assert!(config.known_fields.contains(&"_table".to_string()));
155 }
156
157 #[test]
158 fn from_cascade_falls_back_to_defaults() {
159 let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
160 assert_eq!(config.max_chunk_size, 10_000);
161 }
162
163 #[test]
164 fn parse_error_action_default_is_dlq() {
165 let action = ParseErrorAction::default();
166 assert!(matches!(action, ParseErrorAction::Dlq));
167 }
168
169 #[test]
170 fn serde_roundtrip() {
171 let config = BatchProcessingConfig::default();
172 let json = serde_json::to_string(&config).unwrap();
173 let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
174 assert_eq!(parsed.max_chunk_size, 10_000);
175 }
176
177 #[test]
178 fn pre_route_filter_serde_roundtrip() {
179 let filter = PreRouteFilterConfig::DropFieldMissing {
180 field: "_table".to_string(),
181 };
182 let json = serde_json::to_string(&filter).unwrap();
183 let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
184 assert!(matches!(
185 back,
186 PreRouteFilterConfig::DropFieldMissing { .. }
187 ));
188 }
189
190 #[test]
191 fn parse_error_action_variants_serde() {
192 let actions = [
193 ParseErrorAction::Dlq,
194 ParseErrorAction::Skip,
195 ParseErrorAction::FailBatch,
196 ];
197 for action in actions {
198 let json = serde_json::to_string(&action).unwrap();
199 let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
200 assert_eq!(action, back);
201 }
202 }
203}