use super::metrics::CacheMetricsInner;
use super::pending_flows::{PendingFlowCache, PendingFlowsConfig};
use crate::template_store::TemplateStore;
use crate::variable_versions::enterprise_registry::EnterpriseFieldRegistry;
use crate::variable_versions::ttl::TtlConfig;
use std::num::NonZeroUsize;
use std::sync::Arc;
pub const DEFAULT_MAX_TEMPLATE_CACHE_SIZE: usize = 1000;
const _: () = assert!(DEFAULT_MAX_TEMPLATE_CACHE_SIZE > 0);
pub(crate) type TemplateId = u16;
pub const MAX_FIELD_COUNT: usize = 10_000;
pub const DEFAULT_MAX_RECORDS_PER_FLOWSET: usize = 1024;
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct Config {
pub max_template_cache_size: usize,
pub max_field_count: usize,
pub max_template_total_size: usize,
pub max_error_sample_size: usize,
pub max_records_per_flowset: usize,
pub ttl_config: Option<TtlConfig>,
pub enterprise_registry: Arc<EnterpriseFieldRegistry>,
pub pending_flows_config: Option<PendingFlowsConfig>,
pub template_store: Option<Arc<dyn TemplateStore>>,
pub template_store_scope: Arc<str>,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConfigError {
InvalidCacheSize(usize),
InvalidPendingCacheSize(usize),
InvalidAllowedVersion(u16),
InvalidFieldCount(usize),
InvalidTemplateTotalSize(usize),
InvalidEntriesPerTemplate(usize),
InvalidEntrySize(usize),
InvalidTtlDuration,
EmptyAllowedVersions,
InvalidRecordsPerFlowset(usize),
InvalidPendingTotalBytes {
max_total_bytes: usize,
max_entry_size_bytes: usize,
},
InvalidMaxSources(usize),
InvalidErrorSampleSize(usize),
}
impl std::error::Error for ConfigError {}
impl std::fmt::Display for ConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ConfigError::InvalidCacheSize(size) => {
write!(
f,
"Invalid template cache size: {}. Must be greater than 0.",
size
)
}
ConfigError::InvalidPendingCacheSize(size) => {
write!(
f,
"Invalid pending flow cache size: {}. Must be greater than 0.",
size
)
}
ConfigError::InvalidAllowedVersion(version) => {
write!(
f,
"Invalid allowed version: {}. Supported versions are 5, 7, 9, 10.",
version
)
}
ConfigError::InvalidFieldCount(count) => {
write!(
f,
"Invalid max field count: {}. Must be greater than 0.",
count
)
}
ConfigError::InvalidTemplateTotalSize(size) => {
write!(
f,
"Invalid max template total size: {}. Must be greater than 0.",
size
)
}
ConfigError::InvalidEntriesPerTemplate(count) => {
write!(
f,
"Invalid max entries per template: {}. Must be greater than 0.",
count
)
}
ConfigError::InvalidEntrySize(size) => {
write!(
f,
"Invalid max entry size: {}. Must be between 1 and 65531 (u16::MAX - 4).",
size
)
}
ConfigError::InvalidTtlDuration => {
write!(f, "Invalid TTL duration: must be greater than zero.")
}
ConfigError::InvalidRecordsPerFlowset(count) => {
write!(
f,
"Invalid max records per flowset: {}. Must be greater than 0.",
count
)
}
ConfigError::EmptyAllowedVersions => {
write!(
f,
"Allowed versions list must not be empty. Supported versions are 5, 7, 9, 10."
)
}
ConfigError::InvalidPendingTotalBytes {
max_total_bytes,
max_entry_size_bytes,
} => {
write!(
f,
"Invalid pending flow config: max_total_bytes ({}) must be >= max_entry_size_bytes ({}).",
max_total_bytes, max_entry_size_bytes
)
}
ConfigError::InvalidMaxSources(size) => {
write!(f, "Invalid max_sources: {}. Must be greater than 0.", size)
}
ConfigError::InvalidErrorSampleSize(size) => {
write!(
f,
"Invalid max_error_sample_size: {}. Must be greater than 0.",
size
)
}
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
max_template_cache_size: DEFAULT_MAX_TEMPLATE_CACHE_SIZE,
max_field_count: MAX_FIELD_COUNT,
max_template_total_size: usize::from(u16::MAX),
max_error_sample_size: 256,
max_records_per_flowset: DEFAULT_MAX_RECORDS_PER_FLOWSET,
ttl_config: None,
enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()),
pending_flows_config: None,
template_store: None,
template_store_scope: Arc::from(""),
}
}
}
impl Config {
pub fn new(max_template_cache_size: usize, ttl_config: Option<TtlConfig>) -> Self {
Self {
max_template_cache_size,
ttl_config,
..Self::default()
}
}
pub fn with_enterprise_registry(
max_template_cache_size: usize,
ttl_config: Option<TtlConfig>,
enterprise_registry: EnterpriseFieldRegistry,
) -> Self {
Self {
enterprise_registry: Arc::new(enterprise_registry),
..Self::new(max_template_cache_size, ttl_config)
}
}
pub fn validate(&self) -> Result<(), ConfigError> {
NonZeroUsize::new(self.max_template_cache_size)
.ok_or(ConfigError::InvalidCacheSize(self.max_template_cache_size))?;
if self.max_field_count == 0 {
return Err(ConfigError::InvalidFieldCount(0));
}
if self.max_template_total_size == 0 {
return Err(ConfigError::InvalidTemplateTotalSize(0));
}
if self.max_records_per_flowset == 0 {
return Err(ConfigError::InvalidRecordsPerFlowset(0));
}
if self.max_error_sample_size == 0 {
return Err(ConfigError::InvalidErrorSampleSize(0));
}
if let Some(ref ttl) = self.ttl_config
&& ttl.duration.is_zero()
{
return Err(ConfigError::InvalidTtlDuration);
}
if let Some(ref pf) = self.pending_flows_config {
PendingFlowCache::validate_config(pf)?;
}
Ok(())
}
}
pub(crate) trait ParserFields {
fn set_max_template_cache_size_field(&mut self, size: usize);
fn set_max_field_count_field(&mut self, count: usize);
fn set_max_template_total_size_field(&mut self, size: usize);
fn set_max_error_sample_size_field(&mut self, size: usize);
fn set_max_records_per_flowset_field(&mut self, count: usize);
fn set_ttl_config_field(&mut self, config: Option<TtlConfig>);
fn set_enterprise_registry(&mut self, _registry: Arc<EnterpriseFieldRegistry>) {}
fn pending_flows(&self) -> &Option<PendingFlowCache>;
fn pending_flows_mut(&mut self) -> &mut Option<PendingFlowCache>;
fn metrics_mut(&mut self) -> &mut CacheMetricsInner;
}
#[allow(private_bounds)]
pub trait ParserConfig: ParserFields {
fn resize_template_caches(&mut self, cache_size: NonZeroUsize);
fn add_config(&mut self, config: Config) -> Result<(), ConfigError> {
let cache_size = NonZeroUsize::new(config.max_template_cache_size).ok_or(
ConfigError::InvalidCacheSize(config.max_template_cache_size),
)?;
if config.max_field_count == 0 {
return Err(ConfigError::InvalidFieldCount(0));
}
if config.max_template_total_size == 0 {
return Err(ConfigError::InvalidTemplateTotalSize(0));
}
if config.max_records_per_flowset == 0 {
return Err(ConfigError::InvalidRecordsPerFlowset(0));
}
if config.max_error_sample_size == 0 {
return Err(ConfigError::InvalidErrorSampleSize(0));
}
if let Some(ref ttl) = config.ttl_config
&& ttl.duration.is_zero()
{
return Err(ConfigError::InvalidTtlDuration);
}
if let Some(ref pf) = config.pending_flows_config {
PendingFlowCache::validate_config(pf)?;
}
self.set_max_template_cache_size_field(config.max_template_cache_size);
self.set_max_field_count_field(config.max_field_count);
self.set_max_template_total_size_field(config.max_template_total_size);
self.set_max_error_sample_size_field(config.max_error_sample_size);
self.set_max_records_per_flowset_field(config.max_records_per_flowset);
self.set_ttl_config_field(config.ttl_config);
self.set_enterprise_registry(config.enterprise_registry);
self.set_pending_flows_config(config.pending_flows_config)?;
self.resize_template_caches(cache_size);
Ok(())
}
fn set_max_template_cache_size(&mut self, size: usize) -> Result<(), ConfigError> {
let cache_size = NonZeroUsize::new(size).ok_or(ConfigError::InvalidCacheSize(size))?;
self.set_max_template_cache_size_field(size);
self.resize_template_caches(cache_size);
Ok(())
}
fn set_ttl_config(&mut self, ttl_config: Option<TtlConfig>) -> Result<(), ConfigError> {
if let Some(ref ttl) = ttl_config
&& ttl.duration.is_zero()
{
return Err(ConfigError::InvalidTtlDuration);
}
self.set_ttl_config_field(ttl_config);
Ok(())
}
fn set_pending_flows_config(
&mut self,
config: Option<PendingFlowsConfig>,
) -> Result<(), ConfigError>;
fn pending_flows_enabled(&self) -> bool {
self.pending_flows().is_some()
}
fn pending_flow_count(&self) -> usize {
self.pending_flows()
.as_ref()
.map(|cache| cache.count())
.unwrap_or(0)
}
fn clear_pending_flows(&mut self) {
let count = self
.pending_flows()
.as_ref()
.map(|cache| cache.count())
.unwrap_or(0);
if let Some(cache) = self.pending_flows_mut() {
cache.clear();
}
if count > 0 {
self.metrics_mut().record_pending_dropped_n(count as u64);
}
}
}