use std::ops::Sub;
use time::OffsetDateTime;
use tracing::debug;
use super::config::BackPressureConfig;
#[derive(Debug, Clone, PartialEq)]
pub enum BackPressureTimeout {
NoTimeout,
Timeout(u64),
CriticalStop(u64),
}
#[derive(Debug, Default, Clone)]
pub struct BackPressureVersionTracker {
pub version: u64,
pub time_ns: i128,
}
impl BackPressureVersionTracker {
pub fn new(version: u64, time_ns: i128) -> Self {
Self { version, time_ns }
}
pub fn update(&mut self, version: u64, time_ns: i128) {
self.version = version;
self.time_ns = time_ns;
}
pub fn reset(&mut self) {
self.update(0, 0);
}
}
#[derive(Debug, Default)]
pub struct BackPressureController {
pub config: BackPressureConfig,
pub first_suffix_head: BackPressureVersionTracker,
pub last_suffix_head: BackPressureVersionTracker,
pub first_candidate_received: BackPressureVersionTracker,
pub last_candidate_received: BackPressureVersionTracker,
last_head_updated_ns: i128,
}
impl BackPressureController {
pub fn with_config(config: BackPressureConfig) -> Self {
Self { config, ..Default::default() }
}
pub fn reset_version_trackers(&mut self) {
self.first_suffix_head.reset();
self.last_suffix_head.reset();
self.first_candidate_received.reset();
self.last_candidate_received.reset();
}
fn get_suffix_fill_ratio(&self, current_suffix_size: u64) -> f64 {
let suffix_fill_ratio = current_suffix_size as f64 / self.config.suffix_max_size as f64;
suffix_fill_ratio.clamp(0.0, 1.0)
}
pub fn update_suffix_head_trackers(&mut self, tracker: BackPressureVersionTracker) {
if self.first_suffix_head.version == 0 {
self.first_suffix_head = tracker.clone();
}
self.last_head_updated_ns = tracker.time_ns;
self.last_suffix_head = tracker;
}
pub fn update_candidate_received_tracker(&mut self, tracker: BackPressureVersionTracker) {
if self.first_candidate_received.version == 0 {
self.first_candidate_received = tracker.clone();
}
self.last_candidate_received = tracker;
}
pub(crate) fn compute_suffix_fill_score(&self, suffix_fill_ratio: f64) -> f64 {
let suffix_fill_weight_threshold = self.config.suffix_fill_threshold;
let score_to_threshold = suffix_fill_ratio.sub(suffix_fill_weight_threshold).max(0.0);
let suffix_fill_score = score_to_threshold / (1.0 - suffix_fill_weight_threshold);
debug!("Backpressure fill score compute - suffix_fill_ratio = {suffix_fill_ratio} | suffix_fill_weight_threshold = {suffix_fill_weight_threshold} | suffix_fill_score = {suffix_fill_score}");
suffix_fill_score
}
pub(crate) fn compute_rate_score(&self) -> f64 {
let tps_threshold = self.config.rate_delta_threshold;
let input_time_sec = (self.last_candidate_received.time_ns - self.first_candidate_received.time_ns) as f64 / 1_000_000_000_f64;
let input_rate = (self.last_candidate_received.version - self.first_candidate_received.version) as f64 / input_time_sec;
let output_time_sec = (self.last_suffix_head.time_ns - self.first_suffix_head.time_ns) as f64 / 1_000_000_000_f64;
let output_rate = (self.last_suffix_head.version - self.first_suffix_head.version) as f64 / output_time_sec;
let delta_rate = input_rate.sub(output_rate);
let rate_score = match tps_threshold {
Some(threshold) if delta_rate > threshold => {
let log_base = threshold.log10();
let scaled = delta_rate.log10() - log_base;
scaled.clamp(0.0, 1.0)
}
_ => 0.0,
};
debug!(
"Backpressure rate score compute - Input rate = {input_rate} | Output rate = {output_rate} | delta_rate = {delta_rate} | rate_score = {rate_score}"
);
rate_score
}
pub fn calculate_timeout(&self, current_suffix_size: u64) -> u64 {
let suffix_fill_ratio = self.get_suffix_fill_ratio(current_suffix_size);
let suffix_fill_score = self.compute_suffix_fill_score(suffix_fill_ratio);
let rate_score = if suffix_fill_ratio >= self.config.suffix_rate_threshold {
self.compute_rate_score()
} else {
0.0
};
if suffix_fill_score <= 0.0 && rate_score <= 0.0 {
return 0;
}
let min_timeout_ms = self.config.min_timeout_ms;
let max_timeout_ms = self.config.max_timeout_ms;
let suffix_fill_weighted = suffix_fill_score.powf(1.5);
let rate_weighted = 1.0 - suffix_fill_weighted;
let combined_score = suffix_fill_weighted + (rate_weighted * rate_score);
let timeout_ms = min_timeout_ms as f64 + ((max_timeout_ms - min_timeout_ms) as f64 * combined_score);
let timeout_ms = (timeout_ms.round() as u64).clamp(min_timeout_ms, max_timeout_ms);
if timeout_ms > 0 {
debug!("Backpressure timeout calculation - suffix_fill_weighted = {suffix_fill_weighted} | rate_weighted = {rate_weighted} | rate_score = {rate_score} | timeout_ms = {timeout_ms}");
}
timeout_ms
}
pub fn compute_stepdown_timeout(&self, timeout_ms: u64) -> u64 {
((timeout_ms as f64 * self.config.timeout_stepdown_rate).round() as u64).max(self.config.min_timeout_ms)
}
pub fn compute_backpressure(&mut self, current_suffix_len: u64) -> BackPressureTimeout {
let timeout_ms = self.calculate_timeout(current_suffix_len);
self.reset_version_trackers();
if timeout_ms == 0 {
BackPressureTimeout::NoTimeout
} else {
if timeout_ms == self.config.max_timeout_ms {
let stale_head_time_ms = (OffsetDateTime::now_utc().unix_timestamp_nanos() - self.last_head_updated_ns) / 1_000_000;
if (stale_head_time_ms as u64) > self.config.max_head_stale_timeout_ms {
return BackPressureTimeout::CriticalStop(self.config.max_timeout_ms);
}
}
BackPressureTimeout::Timeout(timeout_ms)
}
}
}