vapor-http 0.1.2

A dynamically scaling HTTP server that auto-tunes thread count for 85% CPU utilization
Documentation
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};

const TARGET_UTILIZATION: f64 = 0.85;
const MIN_THREADS: usize = 1;
const MAX_THREADS: usize = 256;
const SCALE_UP_THRESHOLD: f64 = 0.70;
const SCALE_DOWN_THRESHOLD: f64 = 0.30;
const SCALE_COOLDOWN_MS: u64 = 100;

pub struct DynamicThreadPool {
    min_threads: usize,
    max_threads: usize,
    target_utilization: f64,
    current_threads: AtomicUsize,
    active_workers: AtomicUsize,
    total_requests: AtomicU64,
    current_rps: AtomicU64,
    last_scale_time: std::sync::Mutex<Instant>,
}

impl DynamicThreadPool {
    pub fn new() -> Self {
        Self {
            min_threads: MIN_THREADS,
            max_threads: MAX_THREADS,
            target_utilization: TARGET_UTILIZATION,
            current_threads: AtomicUsize::new(MIN_THREADS),
            active_workers: AtomicUsize::new(0),
            total_requests: AtomicU64::new(0),
            current_rps: AtomicU64::new(0),
            last_scale_time: std::sync::Mutex::new(Instant::now()),
        }
    }

    pub fn with_limits(min: usize, max: usize) -> Self {
        Self {
            min_threads: min.max(1),
            max_threads: max.max(min).min(MAX_THREADS),
            target_utilization: TARGET_UTILIZATION,
            current_threads: AtomicUsize::new(min),
            active_workers: AtomicUsize::new(0),
            total_requests: AtomicU64::new(0),
            current_rps: AtomicU64::new(0),
            last_scale_time: std::sync::Mutex::new(Instant::now()),
        }
    }

    pub fn current_threads(&self) -> usize {
        self.current_threads.load(Ordering::Relaxed)
    }

    pub fn set_thread_count(&self, count: usize) {
        self.current_threads.store(count, Ordering::Relaxed);
    }

    pub fn min_threads(&self) -> usize {
        self.min_threads
    }

    pub fn max_threads(&self) -> usize {
        self.max_threads
    }

    pub fn target_utilization(&self) -> f64 {
        self.target_utilization
    }

    pub fn active_workers(&self) -> usize {
        self.active_workers.load(Ordering::Relaxed)
    }

    pub fn total_requests(&self) -> u64 {
        self.total_requests.load(Ordering::Relaxed)
    }

    pub fn current_rps(&self) -> u64 {
        self.current_rps.load(Ordering::Relaxed)
    }

    pub fn utilization(&self) -> f64 {
        let workers = self.active_workers.load(Ordering::Relaxed);
        let threads = self.current_threads.load(Ordering::Relaxed);
        if threads == 0 {
            return 0.0;
        }
        workers as f64 / threads as f64
    }

    pub fn record_request(&self) {
        self.total_requests.fetch_add(1, Ordering::Relaxed);
        self.active_workers.fetch_add(1, Ordering::Relaxed);
    }

    pub fn release_request(&self) {
        self.active_workers.fetch_sub(1, Ordering::Relaxed);
    }

    pub fn should_scale(&self) -> Option<bool> {
        let utilization = self.utilization();
        let now = Instant::now();
        let mut last_scale = self.last_scale_time.lock().unwrap();

        if now.duration_since(*last_scale) < Duration::from_millis(SCALE_COOLDOWN_MS) {
            return None;
        }

        if utilization > SCALE_UP_THRESHOLD {
            let current = self.current_threads.load(Ordering::Relaxed);
            if current < self.max_threads {
                *last_scale = now;
                return Some(true);
            }
        }

        if utilization < SCALE_DOWN_THRESHOLD {
            let current = self.current_threads.load(Ordering::Relaxed);
            if current > self.min_threads {
                *last_scale = now;
                return Some(false);
            }
        }

        None
    }
}

pub struct RpsTracker {
    request_count: AtomicU64,
    last_update: std::sync::Mutex<Instant>,
    window_duration: Duration,
}

impl RpsTracker {
    pub fn new(window_ms: u64) -> Self {
        Self {
            request_count: AtomicU64::new(0),
            last_update: std::sync::Mutex::new(Instant::now()),
            window_duration: Duration::from_millis(window_ms),
        }
    }

    pub fn record(&self) {
        self.request_count.fetch_add(1, Ordering::Relaxed);
    }

    pub fn get_rps(&self) -> u64 {
        let now = Instant::now();
        let mut last = self.last_update.lock().unwrap();

        if now.duration_since(*last) >= self.window_duration {
            let prev = self.request_count.swap(0, Ordering::Relaxed);
            *last = now;
            return prev * 1000 / self.window_duration.as_millis() as u64;
        }

        self.request_count.load(Ordering::Relaxed) * 1000 / self.window_duration.as_millis() as u64
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pool_creation() {
        let pool = DynamicThreadPool::new();
        assert_eq!(pool.current_threads(), MIN_THREADS);
        assert_eq!(pool.utilization(), 0.0);
    }

    #[test]
    fn test_pool_with_custom_limits() {
        let pool = DynamicThreadPool::with_limits(2, 8);
        assert_eq!(pool.current_threads(), 2);
    }

    #[test]
    fn test_request_tracking() {
        let pool = DynamicThreadPool::new();
        pool.record_request();
        assert_eq!(pool.active_workers(), 1);
        assert_eq!(pool.total_requests(), 1);
        pool.release_request();
        assert_eq!(pool.active_workers(), 0);
    }

    #[test]
    fn test_utilization_calculation() {
        let pool = DynamicThreadPool::with_limits(4, 16);
        assert_eq!(pool.utilization(), 0.0);
        pool.record_request();
        pool.record_request();
        assert!((pool.utilization() - 0.5).abs() < 0.001);
    }
}