liminal/pressure/
policy.rs1#[derive(Clone, Debug, PartialEq, Eq)]
3pub enum AlertSeverity {
4 Info,
6 Warning,
8 Error,
10 Critical,
12}
13
14#[derive(Clone, Debug, PartialEq)]
16pub enum PolicyAction {
17 SlowProducer {
19 reduction_factor: f64,
21 },
22 ShedLoad {
24 priority_threshold: u8,
26 },
27 ScaleConsumer,
29 Alert {
31 severity: AlertSeverity,
33 },
34}
35
36#[derive(Clone, Debug, PartialEq)]
38pub struct PressurePolicy {
39 pub threshold: f64,
41 pub action: PolicyAction,
43}
44
45#[derive(Clone, Debug, Default, PartialEq)]
47pub struct ChannelPolicyConfig {
48 policies: Vec<PressurePolicy>,
49}
50
51impl ChannelPolicyConfig {
52 #[must_use]
54 pub fn new(mut policies: Vec<PressurePolicy>) -> Self {
55 policies.sort_by(|left, right| left.threshold.total_cmp(&right.threshold));
56 Self { policies }
57 }
58
59 #[must_use]
61 pub fn policies(&self) -> &[PressurePolicy] {
62 &self.policies
63 }
64
65 #[must_use]
67 pub fn is_empty(&self) -> bool {
68 self.policies.is_empty()
69 }
70
71 #[must_use]
73 pub fn len(&self) -> usize {
74 self.policies.len()
75 }
76
77 #[must_use]
79 pub fn actions_for_pressure(&self, pressure: f64) -> Vec<PolicyAction> {
80 let mut actions = Vec::new();
81 for policy in &self.policies {
82 if policy.threshold <= pressure {
83 actions.push(policy.action.clone());
84 } else {
85 break;
86 }
87 }
88 actions
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::{AlertSeverity, ChannelPolicyConfig, PolicyAction, PressurePolicy};
95
96 fn action_name(action: &PolicyAction) -> &'static str {
97 match action {
98 PolicyAction::SlowProducer { .. } => "slow-producer",
99 PolicyAction::ShedLoad { .. } => "shed-load",
100 PolicyAction::ScaleConsumer => "scale-consumer",
101 PolicyAction::Alert { .. } => "alert",
102 }
103 }
104
105 fn close_to(actual: f64, expected: f64) -> bool {
106 (actual - expected).abs() < f64::EPSILON
107 }
108
109 #[test]
110 fn policy_action_defines_exact_pressure_actions() {
111 let slow = PolicyAction::SlowProducer {
112 reduction_factor: 0.5,
113 };
114 let shed = PolicyAction::ShedLoad {
115 priority_threshold: 3,
116 };
117 let scale = PolicyAction::ScaleConsumer;
118 let alert = PolicyAction::Alert {
119 severity: AlertSeverity::Warning,
120 };
121
122 assert_eq!(action_name(&slow), "slow-producer");
123 assert_eq!(action_name(&shed), "shed-load");
124 assert_eq!(action_name(&scale), "scale-consumer");
125 assert_eq!(action_name(&alert), "alert");
126 }
127
128 #[test]
129 fn pressure_policy_constructs_with_action_and_threshold() {
130 let policy = PressurePolicy {
131 threshold: 0.7,
132 action: PolicyAction::SlowProducer {
133 reduction_factor: 0.25,
134 },
135 };
136
137 assert!(close_to(policy.threshold, 0.7));
138 assert!(matches!(
139 policy.action,
140 PolicyAction::SlowProducer { reduction_factor } if close_to(reduction_factor, 0.25)
141 ));
142 }
143
144 #[test]
145 fn channel_policy_config_stores_policies_in_threshold_order() {
146 let high = PressurePolicy {
147 threshold: 0.9,
148 action: PolicyAction::ShedLoad {
149 priority_threshold: 2,
150 },
151 };
152 let low = PressurePolicy {
153 threshold: 0.5,
154 action: PolicyAction::SlowProducer {
155 reduction_factor: 0.5,
156 },
157 };
158
159 let config = ChannelPolicyConfig::new(vec![high, low]);
160
161 assert_eq!(config.len(), 2);
162 assert!(close_to(config.policies()[0].threshold, 0.5));
163 assert!(close_to(config.policies()[1].threshold, 0.9));
164 }
165
166 #[test]
167 fn policies_compose_in_escalation_sequence() {
168 let slow = PressurePolicy {
169 threshold: 0.5,
170 action: PolicyAction::SlowProducer {
171 reduction_factor: 0.5,
172 },
173 };
174 let shed = PressurePolicy {
175 threshold: 0.8,
176 action: PolicyAction::ShedLoad {
177 priority_threshold: 4,
178 },
179 };
180 let config = ChannelPolicyConfig::new(vec![shed, slow]);
181
182 let moderate = config.actions_for_pressure(0.6);
183 let high = config.actions_for_pressure(0.9);
184 let reduced = config.actions_for_pressure(0.6);
185 let clear = config.actions_for_pressure(0.3);
186
187 assert_eq!(moderate.len(), 1);
188 assert!(matches!(
189 moderate.as_slice(),
190 [PolicyAction::SlowProducer { reduction_factor }] if close_to(*reduction_factor, 0.5)
191 ));
192 assert_eq!(high.len(), 2);
193 assert!(matches!(high[0], PolicyAction::SlowProducer { .. }));
194 assert!(matches!(high[1], PolicyAction::ShedLoad { .. }));
195 assert_eq!(reduced.len(), 1);
196 assert!(clear.is_empty());
197 }
198
199 #[test]
200 fn pressure_root_re_exports_policy_types() {
201 use crate::pressure::{AlertSeverity as RootSeverity, PolicyAction as RootAction};
202
203 let action = RootAction::Alert {
204 severity: RootSeverity::Critical,
205 };
206
207 assert!(matches!(
208 action,
209 RootAction::Alert {
210 severity: RootSeverity::Critical
211 }
212 ));
213 }
214}