use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
const TARGET_UTILIZATION: f64 = 0.85;
const MIN_THREADS: usize = 1;
const MAX_THREADS: usize = 256;
const SCALE_UP_THRESHOLD: f64 = 0.70;
const SCALE_DOWN_THRESHOLD: f64 = 0.30;
const SCALE_COOLDOWN_MS: u64 = 100;
pub struct DynamicThreadPool {
min_threads: usize,
max_threads: usize,
target_utilization: f64,
current_threads: AtomicUsize,
active_workers: AtomicUsize,
total_requests: AtomicU64,
current_rps: AtomicU64,
last_scale_time: std::sync::Mutex<Instant>,
}
impl DynamicThreadPool {
pub fn new() -> Self {
Self {
min_threads: MIN_THREADS,
max_threads: MAX_THREADS,
target_utilization: TARGET_UTILIZATION,
current_threads: AtomicUsize::new(MIN_THREADS),
active_workers: AtomicUsize::new(0),
total_requests: AtomicU64::new(0),
current_rps: AtomicU64::new(0),
last_scale_time: std::sync::Mutex::new(Instant::now()),
}
}
pub fn with_limits(min: usize, max: usize) -> Self {
Self {
min_threads: min.max(1),
max_threads: max.max(min).min(MAX_THREADS),
target_utilization: TARGET_UTILIZATION,
current_threads: AtomicUsize::new(min),
active_workers: AtomicUsize::new(0),
total_requests: AtomicU64::new(0),
current_rps: AtomicU64::new(0),
last_scale_time: std::sync::Mutex::new(Instant::now()),
}
}
pub fn current_threads(&self) -> usize {
self.current_threads.load(Ordering::Relaxed)
}
pub fn set_thread_count(&self, count: usize) {
self.current_threads.store(count, Ordering::Relaxed);
}
pub fn min_threads(&self) -> usize {
self.min_threads
}
pub fn max_threads(&self) -> usize {
self.max_threads
}
pub fn target_utilization(&self) -> f64 {
self.target_utilization
}
pub fn active_workers(&self) -> usize {
self.active_workers.load(Ordering::Relaxed)
}
pub fn total_requests(&self) -> u64 {
self.total_requests.load(Ordering::Relaxed)
}
pub fn current_rps(&self) -> u64 {
self.current_rps.load(Ordering::Relaxed)
}
pub fn utilization(&self) -> f64 {
let workers = self.active_workers.load(Ordering::Relaxed);
let threads = self.current_threads.load(Ordering::Relaxed);
if threads == 0 {
return 0.0;
}
workers as f64 / threads as f64
}
pub fn record_request(&self) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
self.active_workers.fetch_add(1, Ordering::Relaxed);
}
pub fn release_request(&self) {
self.active_workers.fetch_sub(1, Ordering::Relaxed);
}
pub fn should_scale(&self) -> Option<bool> {
let utilization = self.utilization();
let now = Instant::now();
let mut last_scale = self.last_scale_time.lock().unwrap();
if now.duration_since(*last_scale) < Duration::from_millis(SCALE_COOLDOWN_MS) {
return None;
}
if utilization > SCALE_UP_THRESHOLD {
let current = self.current_threads.load(Ordering::Relaxed);
if current < self.max_threads {
*last_scale = now;
return Some(true);
}
}
if utilization < SCALE_DOWN_THRESHOLD {
let current = self.current_threads.load(Ordering::Relaxed);
if current > self.min_threads {
*last_scale = now;
return Some(false);
}
}
None
}
}
pub struct RpsTracker {
request_count: AtomicU64,
last_update: std::sync::Mutex<Instant>,
window_duration: Duration,
}
impl RpsTracker {
pub fn new(window_ms: u64) -> Self {
Self {
request_count: AtomicU64::new(0),
last_update: std::sync::Mutex::new(Instant::now()),
window_duration: Duration::from_millis(window_ms),
}
}
pub fn record(&self) {
self.request_count.fetch_add(1, Ordering::Relaxed);
}
pub fn get_rps(&self) -> u64 {
let now = Instant::now();
let mut last = self.last_update.lock().unwrap();
if now.duration_since(*last) >= self.window_duration {
let prev = self.request_count.swap(0, Ordering::Relaxed);
*last = now;
return prev * 1000 / self.window_duration.as_millis() as u64;
}
self.request_count.load(Ordering::Relaxed) * 1000 / self.window_duration.as_millis() as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_creation() {
let pool = DynamicThreadPool::new();
assert_eq!(pool.current_threads(), MIN_THREADS);
assert_eq!(pool.utilization(), 0.0);
}
#[test]
fn test_pool_with_custom_limits() {
let pool = DynamicThreadPool::with_limits(2, 8);
assert_eq!(pool.current_threads(), 2);
}
#[test]
fn test_request_tracking() {
let pool = DynamicThreadPool::new();
pool.record_request();
assert_eq!(pool.active_workers(), 1);
assert_eq!(pool.total_requests(), 1);
pool.release_request();
assert_eq!(pool.active_workers(), 0);
}
#[test]
fn test_utilization_calculation() {
let pool = DynamicThreadPool::with_limits(4, 16);
assert_eq!(pool.utilization(), 0.0);
pool.record_request();
pool.record_request();
assert!((pool.utilization() - 0.5).abs() < 0.001);
}
}