pub mod strategies;
pub mod types;
use types::ThrottleStateView;
pub use types::{AdaptiveThrottleConfig, Direction};
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
use std::sync::Arc;
use atomic_float::AtomicF64;
use rand::random;
use crate::throttle::types::Priority;
#[derive(Debug)]
struct InternalSharedState {
current_balance: AtomicI32,
learned_balance: AtomicF64,
consecutive_ingress: AtomicU32,
consecutive_egress: AtomicU32,
ops_since_nudge: AtomicU32,
config: AdaptiveThrottleConfig,
}
#[derive(Debug, Clone)]
pub struct AdaptiveThrottle {
shared: Arc<InternalSharedState>,
}
impl AdaptiveThrottle {
pub fn new(config: AdaptiveThrottleConfig) -> Self {
let mut cfg = config.clone();
if cfg.adaptive_learning_rate < 0.01 {
cfg.adaptive_learning_rate = 0.01;
}
if cfg.adaptive_learning_rate > 0.2 {
cfg.adaptive_learning_rate = 0.2;
}
let state = InternalSharedState {
current_balance: AtomicI32::new(0),
learned_balance: AtomicF64::new(0.0),
consecutive_ingress: AtomicU32::new(0),
consecutive_egress: AtomicU32::new(0),
ops_since_nudge: AtomicU32::new(0),
config: cfg,
};
Self {
shared: Arc::new(state),
}
}
pub fn begin_work(&self, dir: Direction) -> ThrottleGuard {
let delta = self.shared.config.credit_per_message;
let new_balance = match dir {
Direction::Ingress => {
self
.shared
.current_balance
.fetch_add(delta, Ordering::Relaxed)
+ delta
}
Direction::Egress => {
self
.shared
.current_balance
.fetch_sub(delta, Ordering::Relaxed)
- delta
}
};
let since = self.shared.ops_since_nudge.fetch_add(1, Ordering::Relaxed) + 1;
if since >= self.shared.config.nudge_interval_ops {
let α = self.shared.config.adaptive_learning_rate;
let old_learned = self.shared.learned_balance.load(Ordering::Relaxed);
let updated = α * (new_balance as f64) + (1.0 - α) * old_learned;
self
.shared
.learned_balance
.store(updated, Ordering::Relaxed);
self.shared.ops_since_nudge.store(0, Ordering::Relaxed);
}
match dir {
Direction::Ingress => {
self
.shared
.consecutive_ingress
.fetch_add(1, Ordering::Relaxed);
self.shared.consecutive_egress.store(0, Ordering::Relaxed);
}
Direction::Egress => {
self
.shared
.consecutive_egress
.fetch_add(1, Ordering::Relaxed);
self.shared.consecutive_ingress.store(0, Ordering::Relaxed);
}
}
ThrottleGuard {
shared: self.shared.clone(),
direction: dir,
}
}
}
pub struct ThrottleGuard {
shared: Arc<InternalSharedState>,
direction: Direction,
}
impl ThrottleGuard {
pub fn get_current_balance(&self) -> i32 {
return self.shared.current_balance.load(Ordering::Relaxed);
}
pub fn should_throttle(&self) -> bool {
let cfg = &self.shared.config;
let cons = match self.direction {
Direction::Ingress => self.shared.consecutive_ingress.load(Ordering::Relaxed),
Direction::Egress => self.shared.consecutive_egress.load(Ordering::Relaxed),
};
if cons >= cfg.yield_after_n_consecutive {
self.shared.consecutive_ingress.store(0, Ordering::Relaxed);
self.shared.consecutive_egress.store(0, Ordering::Relaxed);
return true;
}
let state = ThrottleStateView {
current_balance: self.shared.current_balance.load(Ordering::Relaxed),
learned_balance: self.shared.learned_balance.load(Ordering::Relaxed),
config: cfg,
};
let mut p = (cfg.strategy)(&state);
let is_priority_work = match cfg.priority {
Priority::Egress => self.direction == Direction::Egress,
Priority::Ingress => self.direction == Direction::Ingress,
Priority::None => true, };
if !is_priority_work {
let is_imbalanced_towards_priority = match cfg.priority {
Priority::Egress => state.current_balance > state.learned_balance as i32,
Priority::Ingress => state.current_balance < state.learned_balance as i32,
Priority::None => false,
};
if is_imbalanced_towards_priority {
p *= cfg.priority_boost_factor; }
}
let dev = (state.current_balance as f64 - state.learned_balance).abs();
let hard_cut = (cfg.max_imbalance as f64) * 2.0;
if dev >= hard_cut {
p = 1.0;
}
if random::<f64>() < p {
self.shared.consecutive_ingress.store(0, Ordering::Relaxed);
self.shared.consecutive_egress.store(0, Ordering::Relaxed);
return true;
}
false
}
}