use crate::{load_balancer, placement};
use core::fmt;
use crossbeam_queue::ArrayQueue;
use fmt::{Debug, Formatter};
use lazy_static::lazy_static;
use lever::prelude::TTas;
use placement::CoreId;
use std::collections::VecDeque;
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
thread::{self, Thread},
};
use tracing::{debug, trace};
const THREAD_PARK_TIMEOUT: Duration = Duration::from_millis(1);
const FREQUENCY_QUEUE_SIZE: usize = 10;
const DEFAULT_LOW_WATERMARK: u64 = 2;
const SCALER_POLL_INTERVAL: u64 = 90;
const EMA_COEFFICIENT: f64 = 2_f64 / (FREQUENCY_QUEUE_SIZE as f64 + 1_f64);
lazy_static! {
static ref ROUND_ROBIN_PIN: Mutex<CoreId> = Mutex::new(CoreId { id: 0 });
}
pub trait DynamicRunner {
fn run_static(&self, park_timeout: Duration) -> !;
fn run_dynamic(&self, parker: &dyn Fn()) -> !;
fn run_standalone(&self);
}
pub struct DynamicPoolManager {
static_threads: usize,
dynamic_threads: usize,
parked_threads: ArrayQueue<Thread>,
runner: Arc<dyn DynamicRunner + Send + Sync>,
last_frequency: AtomicU64,
frequencies: TTas<VecDeque<u64>>,
}
impl Debug for DynamicPoolManager {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("DynamicPoolManager")
.field("static_threads", &self.static_threads)
.field("dynamic_threads", &self.dynamic_threads)
.field("parked_threads", &self.parked_threads.len())
.field("parked_threads", &self.parked_threads.len())
.field("last_frequency", &self.last_frequency)
.field("frequencies", &self.frequencies.try_lock())
.finish()
}
}
impl DynamicPoolManager {
pub fn new(static_threads: usize, runner: Arc<dyn DynamicRunner + Send + Sync>) -> Self {
let dynamic_threads = 1.max(num_cpus::get().checked_sub(static_threads).unwrap_or(0));
Self {
static_threads,
dynamic_threads,
parked_threads: ArrayQueue::new(dynamic_threads),
runner,
last_frequency: AtomicU64::new(0),
frequencies: TTas::new(VecDeque::with_capacity(
FREQUENCY_QUEUE_SIZE.saturating_add(1),
)),
}
}
pub fn increment_frequency(&self) {
self.last_frequency.fetch_add(1, Ordering::Acquire);
}
pub fn initialize(&'static self) {
trace!("setting up the static thread manager");
(0..self.static_threads).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-driver-static".to_string())
.spawn(move || {
Self::affinity_pinner();
clone.run_static(THREAD_PARK_TIMEOUT);
})
.expect("couldn't spawn static thread");
});
trace!("setting up the dynamic thread manager");
(0..self.dynamic_threads).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-driver-dynamic".to_string())
.spawn(move || {
Self::affinity_pinner();
clone.run_dynamic(&|| self.park_thread());
})
.expect("cannot start dynamic thread");
});
thread::Builder::new()
.name("bastion-pool-manager".to_string())
.spawn(move || {
let poll_interval = Duration::from_millis(SCALER_POLL_INTERVAL);
trace!("setting up the pool manager");
loop {
self.scale_pool();
thread::park_timeout(poll_interval);
}
})
.expect("thread pool manager cannot be started");
}
pub fn provision_threads(&'static self, n: usize) {
for i in 0..n {
if !self.unpark_thread() {
let new_threads = n - i;
trace!(
"no more threads to unpark, spawning {} new threads",
new_threads
);
return self.spawn_threads(new_threads);
}
}
}
fn spawn_threads(&'static self, n: usize) {
(0..n).for_each(|_| {
let clone = Arc::clone(&self.runner);
thread::Builder::new()
.name("bastion-blocking-driver-standalone".to_string())
.spawn(move || {
Self::affinity_pinner();
clone.run_standalone();
})
.unwrap();
})
}
pub fn park_thread(&self) {
let _ = self
.parked_threads
.push(std::thread::current())
.map(|_| {
trace!("parking thread {:?}", std::thread::current().id());
std::thread::park();
})
.map_err(|t| {
debug!("couldn't park thread {:?}", t.id(),);
});
}
fn unpark_thread(&self) -> bool {
trace!("parked_threads: len is {}", self.parked_threads.len());
if let Some(thread) = self.parked_threads.pop() {
debug!("Executor: unpark_thread: unparking {:?}", thread.id());
thread.unpark();
true
} else {
false
}
}
#[inline]
fn affinity_pinner() {
if 1 != *load_balancer::core_count() {
let mut core = ROUND_ROBIN_PIN.lock().unwrap();
placement::set_for_current(*core);
core.id = (core.id + 1) % *load_balancer::core_count();
}
}
#[inline]
fn calculate_ema(freq_queue: &VecDeque<u64>) -> f64 {
freq_queue.iter().enumerate().fold(0_f64, |acc, (i, freq)| {
acc + ((*freq as f64) * ((1_f64 - EMA_COEFFICIENT).powf(i as f64) as f64))
}) * EMA_COEFFICIENT as f64
}
fn scale_pool(&'static self) {
let current_frequency = self.last_frequency.swap(0, Ordering::SeqCst);
let mut freq_queue = self.frequencies.lock();
if freq_queue.len() == 0 {
freq_queue.push_back(0);
}
let frequency = (current_frequency as f64 / SCALER_POLL_INTERVAL as f64) as u64;
let prev_ema_frequency = Self::calculate_ema(&freq_queue);
freq_queue.push_back(frequency);
if freq_queue.len() == FREQUENCY_QUEUE_SIZE.saturating_add(1) {
freq_queue.pop_front();
}
let curr_ema_frequency = Self::calculate_ema(&freq_queue);
if curr_ema_frequency > prev_ema_frequency {
let scale_by: f64 = curr_ema_frequency - prev_ema_frequency;
let scale = num_cpus::get().min(
((DEFAULT_LOW_WATERMARK as f64 * scale_by) + DEFAULT_LOW_WATERMARK as f64) as usize,
);
trace!("unparking {} threads", scale);
self.provision_threads(scale);
} else if (curr_ema_frequency - prev_ema_frequency).abs() < std::f64::EPSILON
&& current_frequency != 0
{
trace!("unparking {} threads", DEFAULT_LOW_WATERMARK);
self.provision_threads(DEFAULT_LOW_WATERMARK as usize);
}
}
}