hojicha_runtime/
queue_scaling.rs

1//! Auto-scaling functionality for dynamic queue resizing
2//!
3//! This module provides automatic queue scaling based on load patterns,
4//! allowing the queue to grow during high load and shrink during low load
5//! to optimize memory usage.
6
7use crate::priority_queue::{PriorityEventQueue, QueueStats};
8use hojicha_core::core::Message;
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12/// Configuration for automatic queue scaling
13#[derive(Debug, Clone)]
14pub struct AutoScaleConfig {
15    /// Minimum queue size (never shrink below)
16    pub min_size: usize,
17
18    /// Maximum queue size (never grow above)
19    pub max_size: usize,
20
21    /// Target utilization (0.0 to 1.0)
22    pub target_utilization: f64,
23
24    /// How often to evaluate scaling (in number of events)
25    pub evaluation_interval: usize,
26
27    /// Scaling strategy
28    pub strategy: ScalingStrategy,
29
30    /// Cooldown period between scaling operations
31    pub cooldown: Duration,
32
33    /// Enable debug logging
34    pub debug: bool,
35}
36
37impl Default for AutoScaleConfig {
38    fn default() -> Self {
39        Self {
40            min_size: 100,
41            max_size: 10_000,
42            target_utilization: 0.5,
43            evaluation_interval: 100,
44            strategy: ScalingStrategy::Conservative,
45            cooldown: Duration::from_secs(5),
46            debug: false,
47        }
48    }
49}
50
51/// Scaling strategies with different aggressiveness levels
52#[derive(Debug, Clone, Copy, PartialEq)]
53pub enum ScalingStrategy {
54    /// Conservative: Small incremental changes
55    Conservative,
56
57    /// Aggressive: Large rapid changes
58    Aggressive,
59
60    /// Predictive: Based on historical patterns
61    Predictive,
62
63    /// Adaptive: Adjusts strategy based on success rate
64    Adaptive,
65}
66
67/// Decision made by the auto-scaler
68#[derive(Debug, Clone, Copy, PartialEq)]
69pub enum ScalingDecision {
70    /// Grow the queue by specified amount
71    Grow(usize),
72
73    /// Shrink the queue by specified amount
74    Shrink(usize),
75
76    /// No change needed
77    NoChange,
78}
79
80/// Auto-scaler that manages dynamic queue resizing
81pub struct QueueAutoScaler {
82    config: AutoScaleConfig,
83
84    /// History of utilization measurements
85    utilization_history: VecDeque<f64>,
86
87    /// History of scaling decisions and their outcomes
88    scaling_history: VecDeque<ScalingOutcome>,
89
90    /// Events processed since last evaluation
91    events_since_evaluation: usize,
92
93    /// Last time a scaling operation was performed
94    last_scaling_time: Option<Instant>,
95
96    /// Running average of event rate
97    event_rate: EventRateTracker,
98
99    /// Peak utilization seen
100    peak_utilization: f64,
101}
102
103/// Tracks the outcome of a scaling decision
104#[derive(Debug, Clone)]
105struct ScalingOutcome {
106    decision: ScalingDecision,
107    #[allow(dead_code)]
108    timestamp: Instant,
109    utilization_before: f64,
110    utilization_after: f64,
111    dropped_events_before: usize,
112    dropped_events_after: usize,
113}
114
115/// Tracks event processing rate over time
116struct EventRateTracker {
117    buckets: VecDeque<(Instant, usize)>,
118    window: Duration,
119}
120
121impl EventRateTracker {
122    fn new(window: Duration) -> Self {
123        Self {
124            buckets: VecDeque::new(),
125            window,
126        }
127    }
128
129    fn record_event(&mut self) {
130        let now = Instant::now();
131
132        // Remove old buckets outside the window
133        while let Some((time, _)) = self.buckets.front() {
134            if now.duration_since(*time) > self.window {
135                self.buckets.pop_front();
136            } else {
137                break;
138            }
139        }
140
141        // Add to current bucket or create new one
142        if let Some((time, count)) = self.buckets.back_mut() {
143            if now.duration_since(*time) < Duration::from_secs(1) {
144                *count += 1;
145            } else {
146                self.buckets.push_back((now, 1));
147            }
148        } else {
149            self.buckets.push_back((now, 1));
150        }
151    }
152
153    fn events_per_second(&self) -> f64 {
154        if self.buckets.is_empty() {
155            return 0.0;
156        }
157
158        let total_events: usize = self.buckets.iter().map(|(_, c)| c).sum();
159        let duration =
160            if let (Some(first), Some(last)) = (self.buckets.front(), self.buckets.back()) {
161                last.0.duration_since(first.0).as_secs_f64()
162            } else {
163                1.0
164            };
165
166        if duration > 0.0 {
167            total_events as f64 / duration
168        } else {
169            total_events as f64
170        }
171    }
172
173    fn is_increasing(&self) -> bool {
174        if self.buckets.len() < 3 {
175            return false;
176        }
177
178        let recent: Vec<_> = self.buckets.iter().rev().take(3).map(|(_, c)| *c).collect();
179        recent.windows(2).all(|w| w[0] >= w[1])
180    }
181}
182
183impl QueueAutoScaler {
184    /// Create a new auto-scaler with the given configuration
185    pub fn new(config: AutoScaleConfig) -> Self {
186        Self {
187            config,
188            utilization_history: VecDeque::with_capacity(100),
189            scaling_history: VecDeque::with_capacity(50),
190            events_since_evaluation: 0,
191            last_scaling_time: None,
192            event_rate: EventRateTracker::new(Duration::from_secs(60)),
193            peak_utilization: 0.0,
194        }
195    }
196
197    /// Process an event and potentially trigger scaling
198    pub fn on_event_processed<M: Message>(
199        &mut self,
200        queue: &mut PriorityEventQueue<M>,
201    ) -> Option<ScalingDecision> {
202        self.events_since_evaluation += 1;
203        self.event_rate.record_event();
204
205        // Check if it's time to evaluate scaling
206        if self.events_since_evaluation >= self.config.evaluation_interval {
207            self.events_since_evaluation = 0;
208            return self.evaluate_scaling(queue);
209        }
210
211        None
212    }
213
214    /// Evaluate whether scaling is needed
215    pub fn evaluate_scaling<M: Message>(
216        &mut self,
217        queue: &mut PriorityEventQueue<M>,
218    ) -> Option<ScalingDecision> {
219        let stats = queue.stats();
220
221        // Update history
222        self.utilization_history.push_back(stats.utilization);
223        if self.utilization_history.len() > 100 {
224            self.utilization_history.pop_front();
225        }
226
227        self.peak_utilization = self.peak_utilization.max(stats.utilization);
228
229        // Check cooldown
230        if let Some(last_time) = self.last_scaling_time {
231            if Instant::now().duration_since(last_time) < self.config.cooldown {
232                return None;
233            }
234        }
235
236        // Make scaling decision based on strategy
237        let decision = match self.config.strategy {
238            ScalingStrategy::Conservative => self.conservative_scaling(&stats),
239            ScalingStrategy::Aggressive => self.aggressive_scaling(&stats),
240            ScalingStrategy::Predictive => self.predictive_scaling(&stats),
241            ScalingStrategy::Adaptive => self.adaptive_scaling(&stats),
242        };
243
244        // Apply the decision if there is one
245        if decision != ScalingDecision::NoChange {
246            let utilization_before = stats.utilization;
247            let dropped_before = stats.dropped_events;
248
249            let result = match decision {
250                ScalingDecision::Grow(amount) => {
251                    let new_size = (stats.max_size + amount).min(self.config.max_size);
252                    queue.resize(new_size)
253                }
254                ScalingDecision::Shrink(amount) => {
255                    let new_size =
256                        (stats.max_size.saturating_sub(amount)).max(self.config.min_size);
257                    queue.resize(new_size)
258                }
259                ScalingDecision::NoChange => Ok(()),
260            };
261
262            if result.is_ok() {
263                self.last_scaling_time = Some(Instant::now());
264
265                let new_stats = queue.stats();
266                self.scaling_history.push_back(ScalingOutcome {
267                    decision,
268                    timestamp: Instant::now(),
269                    utilization_before,
270                    utilization_after: new_stats.utilization,
271                    dropped_events_before: dropped_before,
272                    dropped_events_after: new_stats.dropped_events,
273                });
274
275                if self.scaling_history.len() > 50 {
276                    self.scaling_history.pop_front();
277                }
278
279                if self.config.debug {
280                    log::debug!(
281                        "Queue scaling: {:?} (size: {} -> {}, util: {:.1}% -> {:.1}%)",
282                        decision,
283                        stats.max_size,
284                        new_stats.max_size,
285                        utilization_before * 100.0,
286                        new_stats.utilization * 100.0
287                    );
288                }
289
290                return Some(decision);
291            }
292        }
293
294        None
295    }
296
297    /// Conservative scaling strategy - small incremental changes
298    fn conservative_scaling(&self, stats: &QueueStats) -> ScalingDecision {
299        let avg_utilization = self.average_utilization();
300
301        if stats.utilization > 0.9 || stats.backpressure_active {
302            // High load - grow by 20%
303            let growth = (stats.max_size as f64 * 0.2) as usize;
304            ScalingDecision::Grow(growth.max(10))
305        } else if avg_utilization < 0.2 && stats.max_size > self.config.min_size {
306            // Very low load for sustained period - shrink by 10%
307            let shrink = (stats.max_size as f64 * 0.1) as usize;
308            ScalingDecision::Shrink(shrink.max(10))
309        } else {
310            ScalingDecision::NoChange
311        }
312    }
313
314    /// Aggressive scaling strategy - large rapid changes
315    fn aggressive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
316        if stats.utilization > 0.8 {
317            // High load - double the size
318            let growth = stats.max_size;
319            ScalingDecision::Grow(growth)
320        } else if stats.utilization < 0.1 && stats.max_size > self.config.min_size {
321            // Very low load - halve the size
322            let shrink = stats.max_size / 2;
323            ScalingDecision::Shrink(shrink)
324        } else if stats.utilization > 0.6 {
325            // Moderate load - grow by 50%
326            let growth = (stats.max_size as f64 * 0.5) as usize;
327            ScalingDecision::Grow(growth)
328        } else {
329            ScalingDecision::NoChange
330        }
331    }
332
333    /// Predictive scaling strategy - based on historical patterns
334    fn predictive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
335        let event_rate = self.event_rate.events_per_second();
336        let is_rate_increasing = self.event_rate.is_increasing();
337
338        // Predict future needs based on event rate trend
339        if is_rate_increasing && stats.utilization > 0.5 {
340            // Rate is increasing and we're above 50% - proactively grow
341            let predicted_need = (event_rate * 10.0) as usize; // Predict 10 seconds ahead
342            let growth = predicted_need.saturating_sub(stats.current_size);
343            if growth > 0 {
344                return ScalingDecision::Grow(growth);
345            }
346        }
347
348        // Use peak utilization for decisions
349        if self.peak_utilization > 0.95 && stats.utilization > 0.7 {
350            // We've hit peak before and are climbing - grow early
351            let growth = (stats.max_size as f64 * 0.3) as usize;
352            ScalingDecision::Grow(growth)
353        } else if stats.utilization < 0.15 && !is_rate_increasing {
354            // Low utilization and rate not increasing - safe to shrink
355            let shrink = (stats.max_size as f64 * 0.2) as usize;
356            ScalingDecision::Shrink(shrink)
357        } else {
358            ScalingDecision::NoChange
359        }
360    }
361
362    /// Adaptive scaling strategy - adjusts based on past success
363    fn adaptive_scaling(&self, stats: &QueueStats) -> ScalingDecision {
364        // Analyze recent scaling outcomes
365        let recent_successes = self
366            .scaling_history
367            .iter()
368            .rev()
369            .take(5)
370            .filter(|outcome| {
371                // Consider it successful if utilization improved without dropping events
372                let util_improved = match outcome.decision {
373                    ScalingDecision::Grow(_) => {
374                        outcome.utilization_after < outcome.utilization_before
375                    }
376                    ScalingDecision::Shrink(_) => outcome.utilization_after < 0.8,
377                    ScalingDecision::NoChange => true,
378                };
379                let no_new_drops = outcome.dropped_events_after == outcome.dropped_events_before;
380                util_improved && no_new_drops
381            })
382            .count();
383
384        let success_rate = if self.scaling_history.len() >= 5 {
385            recent_successes as f64 / 5.0
386        } else {
387            0.5 // Assume neutral if not enough history
388        };
389
390        // Adjust aggressiveness based on success rate
391        if success_rate > 0.8 {
392            // High success - be more aggressive
393            self.aggressive_scaling(stats)
394        } else if success_rate < 0.4 {
395            // Low success - be more conservative
396            self.conservative_scaling(stats)
397        } else {
398            // Medium success - use predictive
399            self.predictive_scaling(stats)
400        }
401    }
402
403    /// Get the average utilization over recent history
404    fn average_utilization(&self) -> f64 {
405        if self.utilization_history.is_empty() {
406            return 0.0;
407        }
408
409        let sum: f64 = self.utilization_history.iter().sum();
410        sum / self.utilization_history.len() as f64
411    }
412
413    /// Get current scaling metrics for monitoring
414    pub fn metrics(&self) -> ScalingMetrics {
415        ScalingMetrics {
416            average_utilization: self.average_utilization(),
417            peak_utilization: self.peak_utilization,
418            events_per_second: self.event_rate.events_per_second(),
419            scaling_operations: self.scaling_history.len(),
420            last_scaling: self.last_scaling_time,
421        }
422    }
423}
424
425/// Metrics about the auto-scaling behavior
426#[derive(Debug, Clone)]
427pub struct ScalingMetrics {
428    /// Average queue utilization over the monitoring period
429    pub average_utilization: f64,
430    /// Peak queue utilization observed
431    pub peak_utilization: f64,
432    /// Current event processing rate in events per second
433    pub events_per_second: f64,
434    /// Total number of scaling operations performed
435    pub scaling_operations: usize,
436    /// Timestamp of the last scaling operation
437    pub last_scaling: Option<Instant>,
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443
444    #[test]
445    fn test_event_rate_tracker() {
446        let mut tracker = EventRateTracker::new(Duration::from_secs(10));
447
448        // Record some events
449        for _ in 0..10 {
450            tracker.record_event();
451        }
452
453        // Should have non-zero rate
454        assert!(tracker.events_per_second() > 0.0);
455    }
456
457    #[test]
458    fn test_scaling_strategies() {
459        let config = AutoScaleConfig::default();
460        let scaler = QueueAutoScaler::new(config);
461
462        let stats = QueueStats {
463            current_size: 90,
464            max_size: 100,
465            utilization: 0.9,
466            backpressure_active: true,
467            dropped_events: 0,
468        };
469
470        // Conservative should suggest growth
471        let decision = scaler.conservative_scaling(&stats);
472        assert!(matches!(decision, ScalingDecision::Grow(_)));
473
474        // Aggressive should suggest larger growth
475        let aggressive = scaler.aggressive_scaling(&stats);
476        if let (ScalingDecision::Grow(c), ScalingDecision::Grow(a)) = (decision, aggressive) {
477            assert!(a > c);
478        }
479    }
480}