1use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9pub struct AutoScaler {
10 config: ScalingConfig,
11 last_scale: Arc<std::sync::Mutex<Instant>>,
12 current_threads: Arc<AtomicUsize>,
13}
14
15#[derive(Clone, Debug)]
16pub struct ScalingConfig {
17 pub min_threads: usize,
18 pub max_threads: usize,
19 pub target_queue_length: usize,
20 pub scale_up_threshold: f64,
21 pub scale_down_threshold: f64,
22 pub cooldown_period: Duration,
23}
24
25impl Default for ScalingConfig {
26 fn default() -> Self {
27 Self {
28 min_threads: 2,
29 max_threads: std::thread::available_parallelism()
30 .map(|n| n.get() * 2)
31 .unwrap_or(16),
32 target_queue_length: 100,
33 scale_up_threshold: 0.8,
34 scale_down_threshold: 0.3,
35 cooldown_period: Duration::from_secs(30),
36 }
37 }
38}
39
40impl AutoScaler {
41 pub fn new(config: ScalingConfig) -> Self {
42 let initial_threads = std::thread::available_parallelism()
43 .map(|n| n.get())
44 .unwrap_or(4);
45
46 Self {
47 config,
48 last_scale: Arc::new(std::sync::Mutex::new(Instant::now())),
49 current_threads: Arc::new(AtomicUsize::new(initial_threads)),
50 }
51 }
52
53 pub fn evaluate(&self, queue_length: usize, _active_tasks: usize) -> ScalingDecision {
55 let last_scale = self.last_scale.lock().unwrap();
56 if last_scale.elapsed() < self.config.cooldown_period {
57 return ScalingDecision::NoAction;
58 }
59 drop(last_scale);
60
61 let current_threads = self.current_threads.load(Ordering::Relaxed);
62 let utilization = queue_length as f64 / self.config.target_queue_length as f64;
63
64 if utilization > self.config.scale_up_threshold && current_threads < self.config.max_threads {
65 let new_threads = (current_threads + 1).min(self.config.max_threads);
66 ScalingDecision::ScaleUp { from: current_threads, to: new_threads }
67 } else if utilization < self.config.scale_down_threshold && current_threads > self.config.min_threads {
68 let new_threads = (current_threads.saturating_sub(1)).max(self.config.min_threads);
69 ScalingDecision::ScaleDown { from: current_threads, to: new_threads }
70 } else {
71 ScalingDecision::NoAction
72 }
73 }
74
75 pub fn apply_decision(&self, decision: &ScalingDecision) {
77 match decision {
78 ScalingDecision::ScaleUp { to, .. } | ScalingDecision::ScaleDown { to, .. } => {
79 self.current_threads.store(*to, Ordering::Relaxed);
80 let mut last_scale = self.last_scale.lock().unwrap();
81 *last_scale = Instant::now();
82 }
83 ScalingDecision::NoAction => {}
84 }
85 }
86
87 pub fn current_threads(&self) -> usize {
88 self.current_threads.load(Ordering::Relaxed)
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub enum ScalingDecision {
94 ScaleUp { from: usize, to: usize },
95 ScaleDown { from: usize, to: usize },
96 NoAction,
97}
98
99impl std::fmt::Display for ScalingDecision {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 ScalingDecision::ScaleUp { from, to } => write!(f, "Scale UP: {} → {} threads", from, to),
103 ScalingDecision::ScaleDown { from, to } => write!(f, "Scale DOWN: {} → {} threads", from, to),
104 ScalingDecision::NoAction => write!(f, "No scaling action needed"),
105 }
106 }
107}
108
109#[derive(Clone, Debug)]
111pub struct ResourceLimits {
112 pub max_memory_mb: Option<usize>,
113 pub max_cpu_percent: Option<f64>,
114 pub max_queue_size: Option<usize>,
115 pub max_task_duration: Option<Duration>,
116}
117
118impl Default for ResourceLimits {
119 fn default() -> Self {
120 Self {
121 max_memory_mb: None,
122 max_cpu_percent: None,
123 max_queue_size: Some(10000),
124 max_task_duration: Some(Duration::from_secs(300)),
125 }
126 }
127}
128
129impl ResourceLimits {
130 pub fn is_queue_size_exceeded(&self, queue_size: usize) -> bool {
131 self.max_queue_size.map_or(false, |max| queue_size > max)
132 }
133
134 pub fn is_task_duration_exceeded(&self, duration: Duration) -> bool {
135 self.max_task_duration.map_or(false, |max| duration > max)
136 }
137}
138
139
140
141
142