hyperi_rustlib/worker/engine/
config.rs1use serde::{Deserialize, Serialize};
10
11use super::types::PayloadFormat;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum ParseErrorAction {
17 #[default]
19 Dlq,
20 Skip,
22 FailBatch,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
31#[serde(tag = "type", rename_all = "snake_case")]
32pub enum PreRouteFilterConfig {
33 DropFieldMissing {
35 field: String,
37 },
38 DlqFieldValue {
40 field: String,
42 value: String,
44 },
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct BatchProcessingConfig {
53 #[serde(default = "default_max_chunk_size")]
58 pub max_chunk_size: usize,
59
60 #[serde(default)]
62 pub format: PayloadFormat,
63
64 #[serde(default)]
68 pub routing_field: Option<String>,
69
70 #[serde(default)]
74 pub pre_route_filters: Vec<PreRouteFilterConfig>,
75
76 #[serde(default = "default_parse_error_action")]
78 pub parse_error_action: ParseErrorAction,
79
80 #[serde(default = "default_known_fields")]
86 pub known_fields: Vec<String>,
87}
88
89fn default_max_chunk_size() -> usize {
90 10_000
91}
92
93fn default_parse_error_action() -> ParseErrorAction {
94 ParseErrorAction::Dlq
95}
96
97fn default_known_fields() -> Vec<String> {
98 vec![
99 "_table".to_string(),
100 "_timestamp".to_string(),
101 "_source".to_string(),
102 "host".to_string(),
103 "source_type".to_string(),
104 "event_type".to_string(),
105 ]
106}
107
108impl Default for BatchProcessingConfig {
109 fn default() -> Self {
110 Self {
111 max_chunk_size: default_max_chunk_size(),
112 format: PayloadFormat::default(),
113 routing_field: None,
114 pre_route_filters: vec![],
115 parse_error_action: default_parse_error_action(),
116 known_fields: default_known_fields(),
117 }
118 }
119}
120
121impl BatchProcessingConfig {
122 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
132 let config: Self = if let Some(cfg) = crate::config::try_get() {
133 cfg.unmarshal_key(key).unwrap_or_default()
134 } else {
135 tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
136 Self::default()
137 };
138 Ok(config)
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn default_config_values() {
148 let config = BatchProcessingConfig::default();
149 assert_eq!(config.max_chunk_size, 10_000);
150 assert!(config.routing_field.is_none());
151 assert_eq!(config.known_fields.len(), 6);
152 assert!(config.known_fields.contains(&"_table".to_string()));
153 }
154
155 #[test]
156 fn from_cascade_falls_back_to_defaults() {
157 let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
158 assert_eq!(config.max_chunk_size, 10_000);
159 }
160
161 #[test]
162 fn parse_error_action_default_is_dlq() {
163 let action = ParseErrorAction::default();
164 assert!(matches!(action, ParseErrorAction::Dlq));
165 }
166
167 #[test]
168 fn serde_roundtrip() {
169 let config = BatchProcessingConfig::default();
170 let json = serde_json::to_string(&config).unwrap();
171 let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
172 assert_eq!(parsed.max_chunk_size, 10_000);
173 }
174
175 #[test]
176 fn pre_route_filter_serde_roundtrip() {
177 let filter = PreRouteFilterConfig::DropFieldMissing {
178 field: "_table".to_string(),
179 };
180 let json = serde_json::to_string(&filter).unwrap();
181 let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
182 assert!(matches!(
183 back,
184 PreRouteFilterConfig::DropFieldMissing { .. }
185 ));
186 }
187
188 #[test]
189 fn parse_error_action_variants_serde() {
190 let actions = [
191 ParseErrorAction::Dlq,
192 ParseErrorAction::Skip,
193 ParseErrorAction::FailBatch,
194 ];
195 for action in actions {
196 let json = serde_json::to_string(&action).unwrap();
197 let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
198 assert_eq!(action, back);
199 }
200 }
201}