use crate::dispatch::lookup::*;
use arc_swap::ArcSwap;
mod defaults {
use super::FeedbackAlgorithm;
pub const INITIAL_INTERVAL_MS: u64 = 500;
pub const MIN_INTERVAL_MS: u64 = 20;
pub const MAX_INTERVAL_MS: u64 = 5_000;
pub const ADDITIVE_INCREASE: u64 = 500;
pub const MULTIPLICATIVE_DECREASE_DEN: u64 = 2;
pub const ALGORITHM: FeedbackAlgorithm = super::FeedbackAlgorithm::Aimd;
}
pub mod config_keys {
pub const ALGORITHM: &str = "kompact.dispatch.actor-lookup.gc.algorithm";
}
pub struct ActorRefReaper {
scheduled: bool,
strategy: UpdateStrategy,
}
pub struct UpdateStrategy {
curr: u64,
incr: u64,
decr: u64,
min: u64,
max: u64,
algo: FeedbackAlgorithm,
}
pub enum FeedbackAlgorithm {
Aimd, Mimd, }
impl Default for ActorRefReaper {
fn default() -> Self {
Self::new(
defaults::INITIAL_INTERVAL_MS,
defaults::MIN_INTERVAL_MS,
defaults::MAX_INTERVAL_MS,
defaults::ADDITIVE_INCREASE,
defaults::MULTIPLICATIVE_DECREASE_DEN,
defaults::ALGORITHM,
)
}
}
impl ActorRefReaper {
pub fn new(
initial_interval: u64,
min: u64,
max: u64,
incr: u64,
decr: u64,
algo: FeedbackAlgorithm,
) -> Self {
ActorRefReaper {
scheduled: false,
strategy: UpdateStrategy::new(initial_interval, min, max, incr, decr, algo),
}
}
pub fn from_config(conf: &hocon::Hocon) -> Self {
let algorithm = match conf[config_keys::ALGORITHM].as_string().as_deref() {
Some("AIMD") => FeedbackAlgorithm::Aimd,
Some("MIMD") => FeedbackAlgorithm::Mimd,
Some(s) => panic!(
"Illegal configuration argument for key {}: {}. Allowed values are [AIMD, MIMD]",
config_keys::ALGORITHM,
s
),
None => defaults::ALGORITHM,
};
Self::new(
defaults::INITIAL_INTERVAL_MS,
defaults::MIN_INTERVAL_MS,
defaults::MAX_INTERVAL_MS,
defaults::ADDITIVE_INCREASE,
defaults::MULTIPLICATIVE_DECREASE_DEN,
algorithm,
)
}
pub fn run(&self, table: &ArcSwap<ActorStore>) -> usize {
let mut removed = 0;
table.rcu(|current| {
let mut next = ActorStore::clone(current);
removed = next.cleanup();
next
});
removed
}
pub fn strategy(&self) -> &UpdateStrategy {
&self.strategy
}
pub fn strategy_mut(&mut self) -> &mut UpdateStrategy {
&mut self.strategy
}
pub fn schedule(&mut self) {
self.scheduled = true;
}
pub fn is_scheduled(&self) -> bool {
self.scheduled
}
}
impl UpdateStrategy {
pub fn new(
curr: u64,
min: u64,
max: u64,
incr: u64,
decr: u64,
algo: FeedbackAlgorithm,
) -> Self {
UpdateStrategy {
curr,
incr,
decr,
min,
max,
algo,
}
}
pub fn curr(&self) -> u64 {
self.curr
}
pub fn incr(&mut self) -> u64 {
use self::FeedbackAlgorithm::*;
self.curr = match self.algo {
Aimd => self.curr + self.incr,
Mimd => self.curr * self.incr,
}
.min(self.max);
self.curr
}
pub fn decr(&mut self) -> u64 {
use self::FeedbackAlgorithm::*;
self.curr = match self.algo {
Aimd | Mimd => self.curr / self.decr,
}
.max(self.min);
self.curr
}
}