use tracing::debug;
use crate::env_var_with_defaults;
#[derive(Debug, Clone)]
pub struct BackPressureConfig {
pub check_window_ms: u64,
pub max_timeout_ms: u64,
pub min_timeout_ms: u64,
pub suffix_max_size: u64,
pub suffix_fill_threshold: f64,
pub suffix_rate_threshold: f64,
pub rate_delta_threshold: Option<f64>,
pub timeout_stepdown_rate: f64,
pub max_head_stale_timeout_ms: u64,
}
impl Default for BackPressureConfig {
fn default() -> Self {
Self {
check_window_ms: 30, max_timeout_ms: 100, min_timeout_ms: 10, suffix_max_size: 10_000,
suffix_fill_threshold: 0.7, suffix_rate_threshold: 0.5, rate_delta_threshold: Some(100.0),
timeout_stepdown_rate: 0.8,
max_head_stale_timeout_ms: 200,
}
}
}
impl BackPressureConfig {
pub fn from_env() -> Self {
let defaults = BackPressureConfig::default();
let max_timeout_ms = env_var_with_defaults!("BACKPRESSURE_MAX_TIMEOUT_MS", u64, defaults.max_timeout_ms);
let config = Self {
check_window_ms: env_var_with_defaults!("BACKPRESSURE_CHECK_WINDOW_MS", u64, defaults.check_window_ms),
max_timeout_ms,
min_timeout_ms: env_var_with_defaults!("BACKPRESSURE_MIN_TIMEOUT_MS", u64, defaults.min_timeout_ms).min(max_timeout_ms),
suffix_max_size: env_var_with_defaults!("BACKPRESSURE_SUFFIX_MAX_SIZE", u64, defaults.suffix_max_size),
suffix_fill_threshold: env_var_with_defaults!("BACKPRESSURE_SUFFIX_FILL_THRESHOLD", f64, defaults.suffix_fill_threshold).clamp(0.0, 1.0),
suffix_rate_threshold: env_var_with_defaults!("BACKPRESSURE_SUFFIX_RATE_THRESHOLD", f64, defaults.suffix_rate_threshold).clamp(0.0, 1.0),
rate_delta_threshold: env_var_with_defaults!("BACKPRESSURE_RATE_DELTA_THRESHOLD", Option::<f64>, defaults.rate_delta_threshold.unwrap()),
max_head_stale_timeout_ms: env_var_with_defaults!("BACKPRESSURE_MAX_HEAD_STALE_TIME_MS", u64, defaults.max_head_stale_timeout_ms),
timeout_stepdown_rate: env_var_with_defaults!("BACKPRESSURE_TIMEOUT_STEPDOWN_RATE", f64, defaults.timeout_stepdown_rate).clamp(0.0, 1.0),
};
debug!("Backpressure config {config:#?}");
config
}
pub fn builder() -> BackPressureConfigBuilder {
BackPressureConfigBuilder::default()
}
}
#[derive(Default)]
pub struct BackPressureConfigBuilder {
check_window_ms: Option<u64>,
max_timeout_ms: Option<u64>,
min_timeout_ms: Option<u64>,
suffix_max_size: Option<u64>,
suffix_fill_threshold: Option<f64>,
suffix_rate_threshold: Option<f64>,
rate_delta_threshold: Option<f64>,
max_head_stale_timeout_ms: Option<u64>,
}
impl BackPressureConfigBuilder {
pub fn check_window_ms(mut self, window_ms: u64) -> Self {
self.check_window_ms = Some(window_ms);
self
}
pub fn max_timeout_ms(mut self, timeout: u64) -> Self {
self.max_timeout_ms = Some(timeout);
self
}
pub fn min_timeout_ms(mut self, timeout: u64) -> Self {
self.min_timeout_ms = Some(timeout);
self
}
pub fn suffix_max_size(mut self, size: u64) -> Self {
self.suffix_max_size = Some(size);
self
}
pub fn suffix_fill_threshold(mut self, threshold: f64) -> Self {
self.suffix_fill_threshold = Some(threshold);
self
}
pub fn suffix_rate_threshold(mut self, threshold: f64) -> Self {
self.suffix_rate_threshold = Some(threshold);
self
}
pub fn rate_delta_threshold(mut self, threshold: f64) -> Self {
self.rate_delta_threshold = Some(threshold);
self
}
pub fn max_head_stale_timeout_ms(mut self, timeout: u64) -> Self {
self.max_head_stale_timeout_ms = Some(timeout);
self
}
pub fn build(self) -> BackPressureConfig {
let defaults = BackPressureConfig::default();
let suffix_fill_threshold = self.suffix_fill_threshold.unwrap_or(defaults.suffix_fill_threshold).clamp(0.0, 1.0);
let suffix_rate_threshold = self
.suffix_rate_threshold
.unwrap_or(defaults.suffix_rate_threshold)
.clamp(0.0, suffix_fill_threshold);
let max_timeout_ms = self.max_timeout_ms.unwrap_or(defaults.max_timeout_ms);
let min_timeout_ms = self.min_timeout_ms.unwrap_or(defaults.min_timeout_ms).min(max_timeout_ms);
BackPressureConfig {
max_timeout_ms,
min_timeout_ms,
suffix_fill_threshold,
suffix_rate_threshold,
check_window_ms: self.check_window_ms.unwrap_or(defaults.check_window_ms),
suffix_max_size: self.suffix_max_size.unwrap_or(defaults.suffix_max_size),
rate_delta_threshold: self.rate_delta_threshold.or(defaults.rate_delta_threshold),
max_head_stale_timeout_ms: self.max_head_stale_timeout_ms.unwrap_or(defaults.max_head_stale_timeout_ms),
timeout_stepdown_rate: defaults.timeout_stepdown_rate,
}
}
}