1use crate::priority_queue::{PriorityEventQueue, QueueStats};
8use hojicha_core::core::Message;
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone)]
14pub struct AutoScaleConfig {
15 pub min_size: usize,
17
18 pub max_size: usize,
20
21 pub target_utilization: f64,
23
24 pub evaluation_interval: usize,
26
27 pub strategy: ScalingStrategy,
29
30 pub cooldown: Duration,
32
33 pub debug: bool,
35}
36
37impl Default for AutoScaleConfig {
38 fn default() -> Self {
39 Self {
40 min_size: 100,
41 max_size: 10_000,
42 target_utilization: 0.5,
43 evaluation_interval: 100,
44 strategy: ScalingStrategy::Conservative,
45 cooldown: Duration::from_secs(5),
46 debug: false,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ScalingStrategy {
54 Conservative,
56
57 Aggressive,
59
60 Predictive,
62
63 Adaptive,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq)]
69pub enum ScalingDecision {
70 Grow(usize),
72
73 Shrink(usize),
75
76 NoChange,
78}
79
80pub struct QueueAutoScaler {
82 config: AutoScaleConfig,
83
84 utilization_history: VecDeque<f64>,
86
87 scaling_history: VecDeque<ScalingOutcome>,
89
90 events_since_evaluation: usize,
92
93 last_scaling_time: Option<Instant>,
95
96 event_rate: EventRateTracker,
98
99 peak_utilization: f64,
101}
102
103#[derive(Debug, Clone)]
105struct ScalingOutcome {
106 decision: ScalingDecision,
107 #[allow(dead_code)]
108 timestamp: Instant,
109 utilization_before: f64,
110 utilization_after: f64,
111 dropped_events_before: usize,
112 dropped_events_after: usize,
113}
114
115struct EventRateTracker {
117 buckets: VecDeque<(Instant, usize)>,
118 window: Duration,
119}
120
121impl EventRateTracker {
122 fn new(window: Duration) -> Self {
123 Self {
124 buckets: VecDeque::new(),
125 window,
126 }
127 }
128
129 fn record_event(&mut self) {
130 let now = Instant::now();
131
132 while let Some((time, _)) = self.buckets.front() {
134 if now.duration_since(*time) > self.window {
135 self.buckets.pop_front();
136 } else {
137 break;
138 }
139 }
140
141 if let Some((time, count)) = self.buckets.back_mut() {
143 if now.duration_since(*time) < Duration::from_secs(1) {
144 *count += 1;
145 } else {
146 self.buckets.push_back((now, 1));
147 }
148 } else {
149 self.buckets.push_back((now, 1));
150 }
151 }
152
153 fn events_per_second(&self) -> f64 {
154 if self.buckets.is_empty() {
155 return 0.0;
156 }
157
158 let total_events: usize = self.buckets.iter().map(|(_, c)| c).sum();
159 let duration =
160 if let (Some(first), Some(last)) = (self.buckets.front(), self.buckets.back()) {
161 last.0.duration_since(first.0).as_secs_f64()
162 } else {
163 1.0
164 };
165
166 if duration > 0.0 {
167 total_events as f64 / duration
168 } else {
169 total_events as f64
170 }
171 }
172
173 fn is_increasing(&self) -> bool {
174 if self.buckets.len() < 3 {
175 return false;
176 }
177
178 let recent: Vec<_> = self.buckets.iter().rev().take(3).map(|(_, c)| *c).collect();
179 recent.windows(2).all(|w| w[0] >= w[1])
180 }
181}
182
183impl QueueAutoScaler {
184 pub fn new(config: AutoScaleConfig) -> Self {
186 Self {
187 config,
188 utilization_history: VecDeque::with_capacity(100),
189 scaling_history: VecDeque::with_capacity(50),
190 events_since_evaluation: 0,
191 last_scaling_time: None,
192 event_rate: EventRateTracker::new(Duration::from_secs(60)),
193 peak_utilization: 0.0,
194 }
195 }
196
197 pub fn on_event_processed<M: Message>(
199 &mut self,
200 queue: &mut PriorityEventQueue<M>,
201 ) -> Option<ScalingDecision> {
202 self.events_since_evaluation += 1;
203 self.event_rate.record_event();
204
205 if self.events_since_evaluation >= self.config.evaluation_interval {
207 self.events_since_evaluation = 0;
208 return self.evaluate_scaling(queue);
209 }
210
211 None
212 }
213
214 pub fn evaluate_scaling<M: Message>(
216 &mut self,
217 queue: &mut PriorityEventQueue<M>,
218 ) -> Option<ScalingDecision> {
219 let stats = queue.stats();
220
221 self.utilization_history.push_back(stats.utilization);
223 if self.utilization_history.len() > 100 {
224 self.utilization_history.pop_front();
225 }
226
227 self.peak_utilization = self.peak_utilization.max(stats.utilization);
228
229 if let Some(last_time) = self.last_scaling_time {
231 if Instant::now().duration_since(last_time) < self.config.cooldown {
232 return None;
233 }
234 }
235
236 let decision = match self.config.strategy {
238 ScalingStrategy::Conservative => self.conservative_scaling(&stats),
239 ScalingStrategy::Aggressive => self.aggressive_scaling(&stats),
240 ScalingStrategy::Predictive => self.predictive_scaling(&stats),
241 ScalingStrategy::Adaptive => self.adaptive_scaling(&stats),
242 };
243
244 if decision != ScalingDecision::NoChange {
246 let utilization_before = stats.utilization;
247 let dropped_before = stats.dropped_events;
248
249 let result = match decision {
250 ScalingDecision::Grow(amount) => {
251 let new_size = (stats.max_size + amount).min(self.config.max_size);
252 queue.resize(new_size)
253 }
254 ScalingDecision::Shrink(amount) => {
255 let new_size =
256 (stats.max_size.saturating_sub(amount)).max(self.config.min_size);
257 queue.resize(new_size)
258 }
259 ScalingDecision::NoChange => Ok(()),
260 };
261
262 if result.is_ok() {
263 self.last_scaling_time = Some(Instant::now());
264
265 let new_stats = queue.stats();
266 self.scaling_history.push_back(ScalingOutcome {
267 decision,
268 timestamp: Instant::now(),
269 utilization_before,
270 utilization_after: new_stats.utilization,
271 dropped_events_before: dropped_before,
272 dropped_events_after: new_stats.dropped_events,
273 });
274
275 if self.scaling_history.len() > 50 {
276 self.scaling_history.pop_front();
277 }
278
279 if self.config.debug {
280 log::debug!(
281 "Queue scaling: {:?} (size: {} -> {}, util: {:.1}% -> {:.1}%)",
282 decision,
283 stats.max_size,
284 new_stats.max_size,
285 utilization_before * 100.0,
286 new_stats.utilization * 100.0
287 );
288 }
289
290 return Some(decision);
291 }
292 }
293
294 None
295 }
296
297 fn conservative_scaling(&self, stats: &QueueStats) -> ScalingDecision {
299 let avg_utilization = self.average_utilization();
300
301 if stats.utilization > 0.9 || stats.backpressure_active {
302 let growth = (stats.max_size as f64 * 0.2) as usize;
304 ScalingDecision::Grow(growth.max(10))
305 } else if avg_utilization < 0.2 && stats.max_size > self.config.min_size {
306 let shrink = (stats.max_size as f64 * 0.1) as usize;
308 ScalingDecision::Shrink(shrink.max(10))
309 } else {
310 ScalingDecision::NoChange
311 }
312 }
313
314 fn aggressive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
316 if stats.utilization > 0.8 {
317 let growth = stats.max_size;
319 ScalingDecision::Grow(growth)
320 } else if stats.utilization < 0.1 && stats.max_size > self.config.min_size {
321 let shrink = stats.max_size / 2;
323 ScalingDecision::Shrink(shrink)
324 } else if stats.utilization > 0.6 {
325 let growth = (stats.max_size as f64 * 0.5) as usize;
327 ScalingDecision::Grow(growth)
328 } else {
329 ScalingDecision::NoChange
330 }
331 }
332
333 fn predictive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
335 let event_rate = self.event_rate.events_per_second();
336 let is_rate_increasing = self.event_rate.is_increasing();
337
338 if is_rate_increasing && stats.utilization > 0.5 {
340 let predicted_need = (event_rate * 10.0) as usize; let growth = predicted_need.saturating_sub(stats.current_size);
343 if growth > 0 {
344 return ScalingDecision::Grow(growth);
345 }
346 }
347
348 if self.peak_utilization > 0.95 && stats.utilization > 0.7 {
350 let growth = (stats.max_size as f64 * 0.3) as usize;
352 ScalingDecision::Grow(growth)
353 } else if stats.utilization < 0.15 && !is_rate_increasing {
354 let shrink = (stats.max_size as f64 * 0.2) as usize;
356 ScalingDecision::Shrink(shrink)
357 } else {
358 ScalingDecision::NoChange
359 }
360 }
361
362 fn adaptive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
364 let recent_successes = self
366 .scaling_history
367 .iter()
368 .rev()
369 .take(5)
370 .filter(|outcome| {
371 let util_improved = match outcome.decision {
373 ScalingDecision::Grow(_) => {
374 outcome.utilization_after < outcome.utilization_before
375 }
376 ScalingDecision::Shrink(_) => outcome.utilization_after < 0.8,
377 ScalingDecision::NoChange => true,
378 };
379 let no_new_drops = outcome.dropped_events_after == outcome.dropped_events_before;
380 util_improved && no_new_drops
381 })
382 .count();
383
384 let success_rate = if self.scaling_history.len() >= 5 {
385 recent_successes as f64 / 5.0
386 } else {
387 0.5 };
389
390 if success_rate > 0.8 {
392 self.aggressive_scaling(stats)
394 } else if success_rate < 0.4 {
395 self.conservative_scaling(stats)
397 } else {
398 self.predictive_scaling(stats)
400 }
401 }
402
403 fn average_utilization(&self) -> f64 {
405 if self.utilization_history.is_empty() {
406 return 0.0;
407 }
408
409 let sum: f64 = self.utilization_history.iter().sum();
410 sum / self.utilization_history.len() as f64
411 }
412
413 pub fn metrics(&self) -> ScalingMetrics {
415 ScalingMetrics {
416 average_utilization: self.average_utilization(),
417 peak_utilization: self.peak_utilization,
418 events_per_second: self.event_rate.events_per_second(),
419 scaling_operations: self.scaling_history.len(),
420 last_scaling: self.last_scaling_time,
421 }
422 }
423}
424
425#[derive(Debug, Clone)]
427pub struct ScalingMetrics {
428 pub average_utilization: f64,
430 pub peak_utilization: f64,
432 pub events_per_second: f64,
434 pub scaling_operations: usize,
436 pub last_scaling: Option<Instant>,
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_event_rate_tracker() {
446 let mut tracker = EventRateTracker::new(Duration::from_secs(10));
447
448 for _ in 0..10 {
450 tracker.record_event();
451 }
452
453 assert!(tracker.events_per_second() > 0.0);
455 }
456
457 #[test]
458 fn test_scaling_strategies() {
459 let config = AutoScaleConfig::default();
460 let scaler = QueueAutoScaler::new(config);
461
462 let stats = QueueStats {
463 current_size: 90,
464 max_size: 100,
465 utilization: 0.9,
466 backpressure_active: true,
467 dropped_events: 0,
468 };
469
470 let decision = scaler.conservative_scaling(&stats);
472 assert!(matches!(decision, ScalingDecision::Grow(_)));
473
474 let aggressive = scaler.aggressive_scaling(&stats);
476 if let (ScalingDecision::Grow(c), ScalingDecision::Grow(a)) = (decision, aggressive) {
477 assert!(a > c);
478 }
479 }
480}