Skip to main content

vapor_http/
scheduler.rs

1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::time::{Duration, Instant};
3
4const TARGET_UTILIZATION: f64 = 0.85;
5const MIN_THREADS: usize = 1;
6const MAX_THREADS: usize = 256;
7const SCALE_UP_THRESHOLD: f64 = 0.70;
8const SCALE_DOWN_THRESHOLD: f64 = 0.30;
9const SCALE_COOLDOWN_MS: u64 = 100;
10
11pub struct DynamicThreadPool {
12    min_threads: usize,
13    max_threads: usize,
14    target_utilization: f64,
15    current_threads: AtomicUsize,
16    active_workers: AtomicUsize,
17    total_requests: AtomicU64,
18    current_rps: AtomicU64,
19    last_scale_time: std::sync::Mutex<Instant>,
20}
21
22impl DynamicThreadPool {
23    pub fn new() -> Self {
24        Self {
25            min_threads: MIN_THREADS,
26            max_threads: MAX_THREADS,
27            target_utilization: TARGET_UTILIZATION,
28            current_threads: AtomicUsize::new(MIN_THREADS),
29            active_workers: AtomicUsize::new(0),
30            total_requests: AtomicU64::new(0),
31            current_rps: AtomicU64::new(0),
32            last_scale_time: std::sync::Mutex::new(Instant::now()),
33        }
34    }
35
36    pub fn with_limits(min: usize, max: usize) -> Self {
37        Self {
38            min_threads: min.max(1),
39            max_threads: max.max(min).min(MAX_THREADS),
40            target_utilization: TARGET_UTILIZATION,
41            current_threads: AtomicUsize::new(min),
42            active_workers: AtomicUsize::new(0),
43            total_requests: AtomicU64::new(0),
44            current_rps: AtomicU64::new(0),
45            last_scale_time: std::sync::Mutex::new(Instant::now()),
46        }
47    }
48
49    pub fn current_threads(&self) -> usize {
50        self.current_threads.load(Ordering::Relaxed)
51    }
52
53    pub fn set_thread_count(&self, count: usize) {
54        self.current_threads.store(count, Ordering::Relaxed);
55    }
56
57    pub fn min_threads(&self) -> usize {
58        self.min_threads
59    }
60
61    pub fn max_threads(&self) -> usize {
62        self.max_threads
63    }
64
65    pub fn target_utilization(&self) -> f64 {
66        self.target_utilization
67    }
68
69    pub fn active_workers(&self) -> usize {
70        self.active_workers.load(Ordering::Relaxed)
71    }
72
73    pub fn total_requests(&self) -> u64 {
74        self.total_requests.load(Ordering::Relaxed)
75    }
76
77    pub fn current_rps(&self) -> u64 {
78        self.current_rps.load(Ordering::Relaxed)
79    }
80
81    pub fn utilization(&self) -> f64 {
82        let workers = self.active_workers.load(Ordering::Relaxed);
83        let threads = self.current_threads.load(Ordering::Relaxed);
84        if threads == 0 {
85            return 0.0;
86        }
87        workers as f64 / threads as f64
88    }
89
90    pub fn record_request(&self) {
91        self.total_requests.fetch_add(1, Ordering::Relaxed);
92        self.active_workers.fetch_add(1, Ordering::Relaxed);
93    }
94
95    pub fn release_request(&self) {
96        self.active_workers.fetch_sub(1, Ordering::Relaxed);
97    }
98
99    pub fn should_scale(&self) -> Option<bool> {
100        let utilization = self.utilization();
101        let now = Instant::now();
102        let mut last_scale = self.last_scale_time.lock().unwrap();
103
104        if now.duration_since(*last_scale) < Duration::from_millis(SCALE_COOLDOWN_MS) {
105            return None;
106        }
107
108        if utilization > SCALE_UP_THRESHOLD {
109            let current = self.current_threads.load(Ordering::Relaxed);
110            if current < self.max_threads {
111                *last_scale = now;
112                return Some(true);
113            }
114        }
115
116        if utilization < SCALE_DOWN_THRESHOLD {
117            let current = self.current_threads.load(Ordering::Relaxed);
118            if current > self.min_threads {
119                *last_scale = now;
120                return Some(false);
121            }
122        }
123
124        None
125    }
126}
127
128pub struct RpsTracker {
129    request_count: AtomicU64,
130    last_update: std::sync::Mutex<Instant>,
131    window_duration: Duration,
132}
133
134impl RpsTracker {
135    pub fn new(window_ms: u64) -> Self {
136        Self {
137            request_count: AtomicU64::new(0),
138            last_update: std::sync::Mutex::new(Instant::now()),
139            window_duration: Duration::from_millis(window_ms),
140        }
141    }
142
143    pub fn record(&self) {
144        self.request_count.fetch_add(1, Ordering::Relaxed);
145    }
146
147    pub fn get_rps(&self) -> u64 {
148        let now = Instant::now();
149        let mut last = self.last_update.lock().unwrap();
150
151        if now.duration_since(*last) >= self.window_duration {
152            let prev = self.request_count.swap(0, Ordering::Relaxed);
153            *last = now;
154            return prev * 1000 / self.window_duration.as_millis() as u64;
155        }
156
157        self.request_count.load(Ordering::Relaxed) * 1000 / self.window_duration.as_millis() as u64
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn test_pool_creation() {
167        let pool = DynamicThreadPool::new();
168        assert_eq!(pool.current_threads(), MIN_THREADS);
169        assert_eq!(pool.utilization(), 0.0);
170    }
171
172    #[test]
173    fn test_pool_with_custom_limits() {
174        let pool = DynamicThreadPool::with_limits(2, 8);
175        assert_eq!(pool.current_threads(), 2);
176    }
177
178    #[test]
179    fn test_request_tracking() {
180        let pool = DynamicThreadPool::new();
181        pool.record_request();
182        assert_eq!(pool.active_workers(), 1);
183        assert_eq!(pool.total_requests(), 1);
184        pool.release_request();
185        assert_eq!(pool.active_workers(), 0);
186    }
187
188    #[test]
189    fn test_utilization_calculation() {
190        let pool = DynamicThreadPool::with_limits(4, 16);
191        assert_eq!(pool.utilization(), 0.0);
192        pool.record_request();
193        pool.record_request();
194        assert!((pool.utilization() - 0.5).abs() < 0.001);
195    }
196}