1use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
2use std::time::{Duration, Instant};
3
4const TARGET_UTILIZATION: f64 = 0.85;
5const MIN_THREADS: usize = 1;
6const MAX_THREADS: usize = 256;
7const SCALE_UP_THRESHOLD: f64 = 0.70;
8const SCALE_DOWN_THRESHOLD: f64 = 0.30;
9const SCALE_COOLDOWN_MS: u64 = 100;
10
11pub struct DynamicThreadPool {
12 min_threads: usize,
13 max_threads: usize,
14 target_utilization: f64,
15 current_threads: AtomicUsize,
16 active_workers: AtomicUsize,
17 total_requests: AtomicU64,
18 current_rps: AtomicU64,
19 last_scale_time: std::sync::Mutex<Instant>,
20}
21
22impl DynamicThreadPool {
23 pub fn new() -> Self {
24 Self {
25 min_threads: MIN_THREADS,
26 max_threads: MAX_THREADS,
27 target_utilization: TARGET_UTILIZATION,
28 current_threads: AtomicUsize::new(MIN_THREADS),
29 active_workers: AtomicUsize::new(0),
30 total_requests: AtomicU64::new(0),
31 current_rps: AtomicU64::new(0),
32 last_scale_time: std::sync::Mutex::new(Instant::now()),
33 }
34 }
35
36 pub fn with_limits(min: usize, max: usize) -> Self {
37 Self {
38 min_threads: min.max(1),
39 max_threads: max.max(min).min(MAX_THREADS),
40 target_utilization: TARGET_UTILIZATION,
41 current_threads: AtomicUsize::new(min),
42 active_workers: AtomicUsize::new(0),
43 total_requests: AtomicU64::new(0),
44 current_rps: AtomicU64::new(0),
45 last_scale_time: std::sync::Mutex::new(Instant::now()),
46 }
47 }
48
49 pub fn current_threads(&self) -> usize {
50 self.current_threads.load(Ordering::Relaxed)
51 }
52
53 pub fn set_thread_count(&self, count: usize) {
54 self.current_threads.store(count, Ordering::Relaxed);
55 }
56
57 pub fn min_threads(&self) -> usize {
58 self.min_threads
59 }
60
61 pub fn max_threads(&self) -> usize {
62 self.max_threads
63 }
64
65 pub fn target_utilization(&self) -> f64 {
66 self.target_utilization
67 }
68
69 pub fn active_workers(&self) -> usize {
70 self.active_workers.load(Ordering::Relaxed)
71 }
72
73 pub fn total_requests(&self) -> u64 {
74 self.total_requests.load(Ordering::Relaxed)
75 }
76
77 pub fn current_rps(&self) -> u64 {
78 self.current_rps.load(Ordering::Relaxed)
79 }
80
81 pub fn utilization(&self) -> f64 {
82 let workers = self.active_workers.load(Ordering::Relaxed);
83 let threads = self.current_threads.load(Ordering::Relaxed);
84 if threads == 0 {
85 return 0.0;
86 }
87 workers as f64 / threads as f64
88 }
89
90 pub fn record_request(&self) {
91 self.total_requests.fetch_add(1, Ordering::Relaxed);
92 self.active_workers.fetch_add(1, Ordering::Relaxed);
93 }
94
95 pub fn release_request(&self) {
96 self.active_workers.fetch_sub(1, Ordering::Relaxed);
97 }
98
99 pub fn should_scale(&self) -> Option<bool> {
100 let utilization = self.utilization();
101 let now = Instant::now();
102 let mut last_scale = self.last_scale_time.lock().unwrap();
103
104 if now.duration_since(*last_scale) < Duration::from_millis(SCALE_COOLDOWN_MS) {
105 return None;
106 }
107
108 if utilization > SCALE_UP_THRESHOLD {
109 let current = self.current_threads.load(Ordering::Relaxed);
110 if current < self.max_threads {
111 *last_scale = now;
112 return Some(true);
113 }
114 }
115
116 if utilization < SCALE_DOWN_THRESHOLD {
117 let current = self.current_threads.load(Ordering::Relaxed);
118 if current > self.min_threads {
119 *last_scale = now;
120 return Some(false);
121 }
122 }
123
124 None
125 }
126}
127
128pub struct RpsTracker {
129 request_count: AtomicU64,
130 last_update: std::sync::Mutex<Instant>,
131 window_duration: Duration,
132}
133
134impl RpsTracker {
135 pub fn new(window_ms: u64) -> Self {
136 Self {
137 request_count: AtomicU64::new(0),
138 last_update: std::sync::Mutex::new(Instant::now()),
139 window_duration: Duration::from_millis(window_ms),
140 }
141 }
142
143 pub fn record(&self) {
144 self.request_count.fetch_add(1, Ordering::Relaxed);
145 }
146
147 pub fn get_rps(&self) -> u64 {
148 let now = Instant::now();
149 let mut last = self.last_update.lock().unwrap();
150
151 if now.duration_since(*last) >= self.window_duration {
152 let prev = self.request_count.swap(0, Ordering::Relaxed);
153 *last = now;
154 return prev * 1000 / self.window_duration.as_millis() as u64;
155 }
156
157 self.request_count.load(Ordering::Relaxed) * 1000 / self.window_duration.as_millis() as u64
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 #[test]
166 fn test_pool_creation() {
167 let pool = DynamicThreadPool::new();
168 assert_eq!(pool.current_threads(), MIN_THREADS);
169 assert_eq!(pool.utilization(), 0.0);
170 }
171
172 #[test]
173 fn test_pool_with_custom_limits() {
174 let pool = DynamicThreadPool::with_limits(2, 8);
175 assert_eq!(pool.current_threads(), 2);
176 }
177
178 #[test]
179 fn test_request_tracking() {
180 let pool = DynamicThreadPool::new();
181 pool.record_request();
182 assert_eq!(pool.active_workers(), 1);
183 assert_eq!(pool.total_requests(), 1);
184 pool.release_request();
185 assert_eq!(pool.active_workers(), 0);
186 }
187
188 #[test]
189 fn test_utilization_calculation() {
190 let pool = DynamicThreadPool::with_limits(4, 16);
191 assert_eq!(pool.utilization(), 0.0);
192 pool.record_request();
193 pool.record_request();
194 assert!((pool.utilization() - 0.5).abs() < 0.001);
195 }
196}