use serde::{Deserialize, Serialize};
use super::types::PayloadFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ParseErrorAction {
#[default]
Dlq,
Skip,
FailBatch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum PreRouteFilterConfig {
DropFieldMissing {
field: String,
},
DlqFieldValue {
field: String,
value: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchProcessingConfig {
#[serde(default = "default_max_chunk_size")]
pub max_chunk_size: usize,
#[serde(default)]
pub format: PayloadFormat,
#[serde(default)]
pub routing_field: Option<String>,
#[serde(default)]
pub pre_route_filters: Vec<PreRouteFilterConfig>,
#[serde(default = "default_memory_pressure_pause_ms")]
pub memory_pressure_pause_ms: u64,
#[serde(default = "default_parse_error_action")]
pub parse_error_action: ParseErrorAction,
#[serde(default = "default_known_fields")]
pub known_fields: Vec<String>,
}
fn default_max_chunk_size() -> usize {
10_000
}
fn default_memory_pressure_pause_ms() -> u64 {
50
}
fn default_parse_error_action() -> ParseErrorAction {
ParseErrorAction::Dlq
}
fn default_known_fields() -> Vec<String> {
vec![
"_table".to_string(),
"_timestamp".to_string(),
"_source".to_string(),
"host".to_string(),
"source_type".to_string(),
"event_type".to_string(),
]
}
impl Default for BatchProcessingConfig {
fn default() -> Self {
Self {
max_chunk_size: default_max_chunk_size(),
format: PayloadFormat::default(),
routing_field: None,
pre_route_filters: vec![],
memory_pressure_pause_ms: default_memory_pressure_pause_ms(),
parse_error_action: default_parse_error_action(),
known_fields: default_known_fields(),
}
}
}
impl BatchProcessingConfig {
pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
let config: Self = if let Some(cfg) = crate::config::try_get() {
cfg.unmarshal_key(key).unwrap_or_default()
} else {
tracing::debug!("Config cascade not initialised, using default BatchProcessingConfig");
Self::default()
};
Ok(config)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_values() {
let config = BatchProcessingConfig::default();
assert_eq!(config.max_chunk_size, 10_000);
assert!(config.routing_field.is_none());
assert_eq!(config.memory_pressure_pause_ms, 50);
assert_eq!(config.known_fields.len(), 6);
assert!(config.known_fields.contains(&"_table".to_string()));
}
#[test]
fn from_cascade_falls_back_to_defaults() {
let config = BatchProcessingConfig::from_cascade("batch_processing").unwrap();
assert_eq!(config.max_chunk_size, 10_000);
}
#[test]
fn parse_error_action_default_is_dlq() {
let action = ParseErrorAction::default();
assert!(matches!(action, ParseErrorAction::Dlq));
}
#[test]
fn serde_roundtrip() {
let config = BatchProcessingConfig::default();
let json = serde_json::to_string(&config).unwrap();
let parsed: BatchProcessingConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.max_chunk_size, 10_000);
}
#[test]
fn pre_route_filter_serde_roundtrip() {
let filter = PreRouteFilterConfig::DropFieldMissing {
field: "_table".to_string(),
};
let json = serde_json::to_string(&filter).unwrap();
let back: PreRouteFilterConfig = serde_json::from_str(&json).unwrap();
assert!(matches!(
back,
PreRouteFilterConfig::DropFieldMissing { .. }
));
}
#[test]
fn parse_error_action_variants_serde() {
let actions = [
ParseErrorAction::Dlq,
ParseErrorAction::Skip,
ParseErrorAction::FailBatch,
];
for action in actions {
let json = serde_json::to_string(&action).unwrap();
let back: ParseErrorAction = serde_json::from_str(&json).unwrap();
assert_eq!(action, back);
}
}
}