use super::concurrency_controller::ConcurrencyController;
use crate::sampling::tps_sampler::TpsData;
use std::collections::VecDeque;
#[allow(unused_imports)]
use tracing::{debug, error, info, instrument, trace, warn, Instrument};
const STARTING_TPS: f64 = 256.;
const SAMPLE_WINDOW: usize = 20;
pub(crate) struct ErrorRateController {
samples: VecDeque<TpsData>,
goal_tps: f64,
state: ErrorRateState,
error_rate: f64,
cc: ConcurrencyController,
}
impl ErrorRateController {
pub fn new(error_rate: f64) -> Self {
assert!(error_rate > 0.);
let cc = ConcurrencyController::new(STARTING_TPS);
Self {
samples: VecDeque::new(),
goal_tps: STARTING_TPS,
state: ErrorRateState::BigStep,
error_rate,
cc,
}
}
pub fn goal_tps(&self) -> f64 {
self.goal_tps
}
pub fn concurrency_count(&self) -> u64 {
self.cc.concurrent_count()
}
pub fn is_underpowered(&self) -> bool {
self.state == ErrorRateState::Underpowered
}
#[allow(unused)]
pub fn is_stable(&self) -> bool {
self.state == ErrorRateState::Stable
}
pub fn push(&mut self, sample: TpsData) {
self.cc.push(sample.tps());
if self.cc.is_stable() {
trace!("ConcurrencyController is stable");
self.samples.push_front(sample);
if self.samples.len() > SAMPLE_WINDOW {
let _ = self.samples.pop_back();
#[allow(clippy::collapsible_if)]
if self.analyze() {
if self.cc.set_goal_tps(self.goal_tps) {
self.clear();
}
}
}
} else if let Some(max_tps) = self.cc.is_underpowered() {
self.state = ErrorRateState::Underpowered;
self.goal_tps = max_tps;
self.cc.set_goal_tps(self.goal_tps);
} else {
trace!("ConcurrencyController is stabalizing.");
}
}
pub fn clear(&mut self) {
self.samples.clear();
}
fn analyze(&mut self) -> bool {
let mean_error_rate: f64 = self.samples.iter().map(|x| x.error_rate()).sum();
let mean_error_rate = mean_error_rate / self.samples.len() as f64;
let diff: f64 = self.error_rate - mean_error_rate;
if mean_error_rate == 0.0 {
if self.state != ErrorRateState::Underpowered {
debug!(
"Error rate of 0% with goal {}%; increasing TPS.",
self.error_rate * 100.
);
self.increase_tps();
true
} else {
trace!("Error rate of 0% but state Underpowered.");
false
}
} else if diff.abs() < 0.05 {
self.stabalize();
false
} else if diff.is_sign_positive() {
debug!(
"Error rate of {:.2}% with goal {}%; increasing TPS.",
mean_error_rate * 100.,
self.error_rate * 100.
);
self.increase_tps();
true
} else {
debug!(
"Error rate of {:.2}% with goal {}%; decreasing TPS.",
mean_error_rate * 100.,
self.error_rate * 100.
);
self.decrease_tps();
true
}
}
pub fn stabalize(&mut self) {
if self.state != ErrorRateState::Stable {
self.state = ErrorRateState::Stable;
debug!(
"Error rate controller is stable. Goal: {:.2}%, acheiving: {:.2}% at {:.2} TPS.",
self.error_rate * 100.,
self.samples[0].error_rate() * 100.,
self.goal_tps,
);
}
}
pub fn increase_tps(&mut self) {
match self.state {
ErrorRateState::Underpowered => {}
ErrorRateState::BigStep => {
self.goal_tps *= 2.;
debug!("New goal tps: {}", self.goal_tps);
}
ErrorRateState::SmallStep => {
self.goal_tps += (self.goal_tps * 0.1).floor();
debug!("New goal tps: {}", self.goal_tps);
}
ErrorRateState::Stable => {
self.state = ErrorRateState::SmallStep;
self.increase_tps();
}
}
}
pub fn decrease_tps(&mut self) {
match self.state {
ErrorRateState::Underpowered => {
self.state = ErrorRateState::SmallStep;
self.decrease_tps();
}
ErrorRateState::BigStep => {
self.goal_tps /= 2.;
self.state = ErrorRateState::SmallStep;
}
ErrorRateState::SmallStep => {
self.goal_tps -= (self.goal_tps * 0.1).floor();
}
ErrorRateState::Stable => {
self.state = ErrorRateState::SmallStep;
self.decrease_tps();
}
}
}
}
#[derive(PartialEq, Debug)]
enum ErrorRateState {
BigStep,
SmallStep,
Stable,
Underpowered,
}