Skip to main content

hyperi_rustlib/deployment/
keda.rs

1// Project:   hyperi-rustlib
2// File:      src/deployment/keda.rs
3// Purpose:   KEDA autoscaling configuration and contract types
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! KEDA autoscaling configuration.
10//!
11//! [`KedaConfig`] lives in the app's config cascade so thresholds are
12//! overridable via env vars (e.g., `DFE_LOADER__KEDA__KAFKA_LAG_THRESHOLD=5000`).
13//!
14//! [`KedaContract`] is the subset validated against Helm `values.yaml`.
15
16use serde::{Deserialize, Serialize};
17
18/// KEDA autoscaling configuration for the app config cascade.
19///
20/// Include this in your app's `Config` struct so KEDA thresholds
21/// participate in the figment cascade and are env-var overridable.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(default)]
24pub struct KedaConfig {
25    /// Whether KEDA scaling is enabled.
26    pub enabled: bool,
27    /// Minimum replica count (0 = scale-to-zero).
28    pub min_replicas: u32,
29    /// Maximum replica count.
30    pub max_replicas: u32,
31    /// Seconds between KEDA polling the scaler.
32    pub polling_interval: u32,
33    /// Seconds before scale-down after load drops.
34    pub cooldown_period: u32,
35    /// Scale when consumer group lag exceeds this per partition.
36    pub kafka_lag_threshold: u64,
37    /// Wake from zero replicas when lag exceeds this.
38    pub activation_lag_threshold: u64,
39    /// Enable CPU-based scaling trigger.
40    pub cpu_enabled: bool,
41    /// CPU utilisation percentage threshold.
42    pub cpu_threshold: u32,
43    /// Enable a Prometheus trigger on the app's `{metric_prefix}_scaling_pressure`
44    /// gauge -- the correlated-composite horizontal-scaling signal (the rustlib
45    /// 2.8.10 `ScalingEngine`). Opt-in: the Prometheus `serverAddress` is
46    /// cluster-specific and must be set in `values.yaml` before enabling.
47    pub scaling_pressure_enabled: bool,
48    /// Per-pod `scaling_pressure` target for the trigger. The gauge is 0-100
49    /// per pod; KEDA adds pods to keep the pod pressure at/below this.
50    pub scaling_pressure_threshold: u32,
51}
52
53impl Default for KedaConfig {
54    fn default() -> Self {
55        Self {
56            enabled: true,
57            min_replicas: 1,
58            max_replicas: 10,
59            polling_interval: 15,
60            cooldown_period: 300,
61            kafka_lag_threshold: 1000,
62            activation_lag_threshold: 0,
63            cpu_enabled: true,
64            cpu_threshold: 80,
65            // Opt-in: the engine gauge is new and the Prometheus serverAddress
66            // is cluster-specific -- emitting a trigger with an empty address
67            // would fail KEDA. Operators enable after wiring serverAddress.
68            scaling_pressure_enabled: false,
69            scaling_pressure_threshold: 70,
70        }
71    }
72}
73
74/// KEDA contract points validated against Helm `values.yaml`.
75///
76/// Built from [`KedaConfig`] defaults. Use [`KedaContract::from_config`]
77/// to convert.
78///
79/// `#[serde(default)]` keeps the contract forward/backward compatible across
80/// versions: an older artefact JSON missing newer trigger fields (e.g. the
81/// 2.8.12 `scaling_pressure_*` pair) deserialises with those fields defaulted
82/// rather than erroring.
83///
84/// `#[non_exhaustive]`: downstream crates MUST NOT construct this via a struct
85/// literal (`KedaContract { .. }`). The contract is a strict subset of
86/// [`KedaConfig`] and [`KedaContract::from_config`] is total, so construct via
87/// one of:
88///   - `KedaConfig { ..real values.., ..Default::default() }.into()`
89///   - [`KedaContract::from_config`]`(&cfg)`
90///   - [`KedaContract::default`]`()` + field mutation
91///
92/// This keeps future contract-field additions non-breaking for consumers: a
93/// new field is filled by `from_config`/`Default` rather than forcing every
94/// downstream literal to add it (the 2.8.12 source break this prevents).
95#[derive(Debug, Clone, Serialize, Deserialize)]
96#[serde(default)]
97#[non_exhaustive]
98pub struct KedaContract {
99    pub min_replicas: u32,
100    pub max_replicas: u32,
101    pub polling_interval: u32,
102    pub cooldown_period: u32,
103    pub kafka_lag_threshold: u64,
104    pub activation_lag_threshold: u64,
105    pub cpu_enabled: bool,
106    pub cpu_threshold: u32,
107    pub scaling_pressure_enabled: bool,
108    pub scaling_pressure_threshold: u32,
109}
110
111impl KedaContract {
112    /// Build a contract from a [`KedaConfig`].
113    #[must_use]
114    pub fn from_config(config: &KedaConfig) -> Self {
115        Self {
116            min_replicas: config.min_replicas,
117            max_replicas: config.max_replicas,
118            polling_interval: config.polling_interval,
119            cooldown_period: config.cooldown_period,
120            kafka_lag_threshold: config.kafka_lag_threshold,
121            activation_lag_threshold: config.activation_lag_threshold,
122            cpu_enabled: config.cpu_enabled,
123            cpu_threshold: config.cpu_threshold,
124            scaling_pressure_enabled: config.scaling_pressure_enabled,
125            scaling_pressure_threshold: config.scaling_pressure_threshold,
126        }
127    }
128}
129
130impl Default for KedaContract {
131    fn default() -> Self {
132        Self::from_config(&KedaConfig::default())
133    }
134}
135
136impl From<&KedaConfig> for KedaContract {
137    fn from(config: &KedaConfig) -> Self {
138        Self::from_config(config)
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn test_keda_config_defaults() {
148        let cfg = KedaConfig::default();
149        assert!(cfg.enabled);
150        assert_eq!(cfg.min_replicas, 1);
151        assert_eq!(cfg.max_replicas, 10);
152        assert_eq!(cfg.polling_interval, 15);
153        assert_eq!(cfg.cooldown_period, 300);
154        assert_eq!(cfg.kafka_lag_threshold, 1000);
155        assert_eq!(cfg.activation_lag_threshold, 0);
156        assert!(cfg.cpu_enabled);
157        assert_eq!(cfg.cpu_threshold, 80);
158        // scaling_pressure trigger is opt-in (cluster-specific serverAddress).
159        assert!(!cfg.scaling_pressure_enabled);
160        assert_eq!(cfg.scaling_pressure_threshold, 70);
161    }
162
163    #[test]
164    fn test_keda_contract_from_config() {
165        let cfg = KedaConfig {
166            kafka_lag_threshold: 5000,
167            cpu_threshold: 90,
168            scaling_pressure_enabled: true,
169            scaling_pressure_threshold: 60,
170            ..Default::default()
171        };
172        let contract = KedaContract::from_config(&cfg);
173        assert_eq!(contract.kafka_lag_threshold, 5000);
174        assert_eq!(contract.cpu_threshold, 90);
175        assert!(contract.scaling_pressure_enabled);
176        assert_eq!(contract.scaling_pressure_threshold, 60);
177    }
178
179    #[test]
180    fn test_keda_config_serde_roundtrip() {
181        let cfg = KedaConfig::default();
182        let yaml = serde_yaml_ng::to_string(&cfg).unwrap();
183        let parsed: KedaConfig = serde_yaml_ng::from_str(&yaml).unwrap();
184        assert_eq!(parsed.kafka_lag_threshold, cfg.kafka_lag_threshold);
185        assert_eq!(
186            parsed.scaling_pressure_threshold,
187            cfg.scaling_pressure_threshold
188        );
189    }
190
191    #[test]
192    fn test_keda_contract_deser_tolerates_missing_scaling_pressure() {
193        // An older contract artefact predates the 2.8.12 scaling_pressure
194        // fields. #[serde(default)] must fill them, not error.
195        let legacy = r#"{
196            "min_replicas": 2,
197            "max_replicas": 20,
198            "polling_interval": 15,
199            "cooldown_period": 300,
200            "kafka_lag_threshold": 1000,
201            "activation_lag_threshold": 0,
202            "cpu_enabled": true,
203            "cpu_threshold": 80
204        }"#;
205        let contract: KedaContract = serde_json::from_str(legacy).unwrap();
206        assert_eq!(contract.min_replicas, 2);
207        assert!(!contract.scaling_pressure_enabled);
208        assert_eq!(contract.scaling_pressure_threshold, 70);
209    }
210}