Skip to main content

hyperi_rustlib/deployment/
keda.rs

1// Project:   hyperi-rustlib
2// File:      src/deployment/keda.rs
3// Purpose:   KEDA autoscaling configuration and contract types
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! KEDA autoscaling configuration.
10//!
11//! [`KedaConfig`] lives in the app's config cascade so thresholds are
12//! overridable via env vars (e.g., `DFE_LOADER__KEDA__KAFKA_LAG_THRESHOLD=5000`).
13//!
14//! [`KedaContract`] is the subset validated against Helm `values.yaml`.
15
16use serde::{Deserialize, Serialize};
17
18/// KEDA autoscaling configuration for the app config cascade.
19///
20/// Include this in your app's `Config` struct so KEDA thresholds
21/// participate in the figment cascade and are env-var overridable.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(default)]
24pub struct KedaConfig {
25    /// Whether KEDA scaling is enabled.
26    pub enabled: bool,
27    /// Minimum replica count (0 = scale-to-zero).
28    pub min_replicas: u32,
29    /// Maximum replica count.
30    pub max_replicas: u32,
31    /// Seconds between KEDA polling the scaler.
32    pub polling_interval: u32,
33    /// Seconds before scale-down after load drops.
34    pub cooldown_period: u32,
35    /// Scale when consumer group lag exceeds this per partition.
36    pub kafka_lag_threshold: u64,
37    /// Wake from zero replicas when lag exceeds this.
38    pub activation_lag_threshold: u64,
39    /// Enable CPU-based scaling trigger.
40    pub cpu_enabled: bool,
41    /// CPU utilisation percentage threshold.
42    pub cpu_threshold: u32,
43    /// Enable a Prometheus trigger on the app's `{metric_prefix}_scaling_pressure`
44    /// gauge -- the correlated-composite horizontal-scaling signal (the rustlib
45    /// 2.8.10 `ScalingEngine`). Opt-in: the Prometheus `serverAddress` is
46    /// cluster-specific and must be set in `values.yaml` before enabling.
47    pub scaling_pressure_enabled: bool,
48    /// Per-pod `scaling_pressure` target for the trigger. The gauge is 0-100
49    /// per pod; KEDA adds pods to keep the pod pressure at/below this.
50    pub scaling_pressure_threshold: u32,
51}
52
53impl Default for KedaConfig {
54    fn default() -> Self {
55        Self {
56            enabled: true,
57            min_replicas: 1,
58            max_replicas: 10,
59            polling_interval: 15,
60            cooldown_period: 300,
61            kafka_lag_threshold: 1000,
62            activation_lag_threshold: 0,
63            cpu_enabled: true,
64            cpu_threshold: 80,
65            // Opt-in: the engine gauge is new and the Prometheus serverAddress
66            // is cluster-specific -- emitting a trigger with an empty address
67            // would fail KEDA. Operators enable after wiring serverAddress.
68            scaling_pressure_enabled: false,
69            scaling_pressure_threshold: 70,
70        }
71    }
72}
73
74/// KEDA contract points validated against Helm `values.yaml`.
75///
76/// Built from [`KedaConfig`] defaults. Use [`KedaContract::from_config`]
77/// to convert.
78///
79/// `#[serde(default)]` keeps the contract forward/backward compatible across
80/// versions: an older artefact JSON missing newer trigger fields (e.g. the
81/// 2.8.12 `scaling_pressure_*` pair) deserialises with those fields defaulted
82/// rather than erroring.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(default)]
85pub struct KedaContract {
86    pub min_replicas: u32,
87    pub max_replicas: u32,
88    pub polling_interval: u32,
89    pub cooldown_period: u32,
90    pub kafka_lag_threshold: u64,
91    pub activation_lag_threshold: u64,
92    pub cpu_enabled: bool,
93    pub cpu_threshold: u32,
94    pub scaling_pressure_enabled: bool,
95    pub scaling_pressure_threshold: u32,
96}
97
98impl KedaContract {
99    /// Build a contract from a [`KedaConfig`].
100    #[must_use]
101    pub fn from_config(config: &KedaConfig) -> Self {
102        Self {
103            min_replicas: config.min_replicas,
104            max_replicas: config.max_replicas,
105            polling_interval: config.polling_interval,
106            cooldown_period: config.cooldown_period,
107            kafka_lag_threshold: config.kafka_lag_threshold,
108            activation_lag_threshold: config.activation_lag_threshold,
109            cpu_enabled: config.cpu_enabled,
110            cpu_threshold: config.cpu_threshold,
111            scaling_pressure_enabled: config.scaling_pressure_enabled,
112            scaling_pressure_threshold: config.scaling_pressure_threshold,
113        }
114    }
115}
116
117impl Default for KedaContract {
118    fn default() -> Self {
119        Self::from_config(&KedaConfig::default())
120    }
121}
122
123impl From<&KedaConfig> for KedaContract {
124    fn from(config: &KedaConfig) -> Self {
125        Self::from_config(config)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_keda_config_defaults() {
135        let cfg = KedaConfig::default();
136        assert!(cfg.enabled);
137        assert_eq!(cfg.min_replicas, 1);
138        assert_eq!(cfg.max_replicas, 10);
139        assert_eq!(cfg.polling_interval, 15);
140        assert_eq!(cfg.cooldown_period, 300);
141        assert_eq!(cfg.kafka_lag_threshold, 1000);
142        assert_eq!(cfg.activation_lag_threshold, 0);
143        assert!(cfg.cpu_enabled);
144        assert_eq!(cfg.cpu_threshold, 80);
145        // scaling_pressure trigger is opt-in (cluster-specific serverAddress).
146        assert!(!cfg.scaling_pressure_enabled);
147        assert_eq!(cfg.scaling_pressure_threshold, 70);
148    }
149
150    #[test]
151    fn test_keda_contract_from_config() {
152        let cfg = KedaConfig {
153            kafka_lag_threshold: 5000,
154            cpu_threshold: 90,
155            scaling_pressure_enabled: true,
156            scaling_pressure_threshold: 60,
157            ..Default::default()
158        };
159        let contract = KedaContract::from_config(&cfg);
160        assert_eq!(contract.kafka_lag_threshold, 5000);
161        assert_eq!(contract.cpu_threshold, 90);
162        assert!(contract.scaling_pressure_enabled);
163        assert_eq!(contract.scaling_pressure_threshold, 60);
164    }
165
166    #[test]
167    fn test_keda_config_serde_roundtrip() {
168        let cfg = KedaConfig::default();
169        let yaml = serde_yaml_ng::to_string(&cfg).unwrap();
170        let parsed: KedaConfig = serde_yaml_ng::from_str(&yaml).unwrap();
171        assert_eq!(parsed.kafka_lag_threshold, cfg.kafka_lag_threshold);
172        assert_eq!(
173            parsed.scaling_pressure_threshold,
174            cfg.scaling_pressure_threshold
175        );
176    }
177
178    #[test]
179    fn test_keda_contract_deser_tolerates_missing_scaling_pressure() {
180        // An older contract artefact predates the 2.8.12 scaling_pressure
181        // fields. #[serde(default)] must fill them, not error.
182        let legacy = r#"{
183            "min_replicas": 2,
184            "max_replicas": 20,
185            "polling_interval": 15,
186            "cooldown_period": 300,
187            "kafka_lag_threshold": 1000,
188            "activation_lag_threshold": 0,
189            "cpu_enabled": true,
190            "cpu_threshold": 80
191        }"#;
192        let contract: KedaContract = serde_json::from_str(legacy).unwrap();
193        assert_eq!(contract.min_replicas, 2);
194        assert!(!contract.scaling_pressure_enabled);
195        assert_eq!(contract.scaling_pressure_threshold, 70);
196    }
197}