Skip to main content

liminal/pressure/
policy.rs

1/// Operator-facing severity for pressure alert policy actions.
2#[derive(Clone, Debug, PartialEq, Eq)]
3pub enum AlertSeverity {
4    /// Informational pressure notification.
5    Info,
6    /// Warning pressure notification.
7    Warning,
8    /// Error pressure notification.
9    Error,
10    /// Critical pressure notification.
11    Critical,
12}
13
14/// Action selected by a channel pressure policy when a threshold is active.
15#[derive(Clone, Debug, PartialEq)]
16pub enum PolicyAction {
17    /// Signal producers to reduce their publish rate by the configured factor.
18    SlowProducer {
19        /// Multiplicative rate reduction factor the producer should apply.
20        reduction_factor: f64,
21    },
22    /// Reject messages below the configured priority class while overloaded.
23    ShedLoad {
24        /// Priority threshold below which messages are rejected.
25        priority_threshold: u8,
26    },
27    /// Emit a scale signal for external orchestration systems.
28    ScaleConsumer,
29    /// Emit an observability event at the configured severity.
30    Alert {
31        /// Severity attached to the pressure event.
32        severity: AlertSeverity,
33    },
34}
35
36/// One pressure threshold and the action active at or above that threshold.
37#[derive(Clone, Debug, PartialEq)]
38pub struct PressurePolicy {
39    /// Channel pressure threshold, expected to be between 0.0 and 1.0.
40    pub threshold: f64,
41    /// Action active while channel pressure is at or above the threshold.
42    pub action: PolicyAction,
43}
44
45/// Ordered pressure escalation policy for a channel.
46#[derive(Clone, Debug, Default, PartialEq)]
47pub struct ChannelPolicyConfig {
48    policies: Vec<PressurePolicy>,
49}
50
51impl ChannelPolicyConfig {
52    /// Creates a channel policy configuration ordered by ascending threshold.
53    #[must_use]
54    pub fn new(mut policies: Vec<PressurePolicy>) -> Self {
55        policies.sort_by(|left, right| left.threshold.total_cmp(&right.threshold));
56        Self { policies }
57    }
58
59    /// Returns the configured pressure policies in escalation order.
60    #[must_use]
61    pub fn policies(&self) -> &[PressurePolicy] {
62        &self.policies
63    }
64
65    /// Returns whether this channel has no configured pressure policies.
66    #[must_use]
67    pub fn is_empty(&self) -> bool {
68        self.policies.is_empty()
69    }
70
71    /// Returns the number of configured pressure policies for this channel.
72    #[must_use]
73    pub fn len(&self) -> usize {
74        self.policies.len()
75    }
76
77    /// Returns every action active for the supplied channel pressure score.
78    #[must_use]
79    pub fn actions_for_pressure(&self, pressure: f64) -> Vec<PolicyAction> {
80        let mut actions = Vec::new();
81        for policy in &self.policies {
82            if policy.threshold <= pressure {
83                actions.push(policy.action.clone());
84            } else {
85                break;
86            }
87        }
88        actions
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::{AlertSeverity, ChannelPolicyConfig, PolicyAction, PressurePolicy};
95
96    fn action_name(action: &PolicyAction) -> &'static str {
97        match action {
98            PolicyAction::SlowProducer { .. } => "slow-producer",
99            PolicyAction::ShedLoad { .. } => "shed-load",
100            PolicyAction::ScaleConsumer => "scale-consumer",
101            PolicyAction::Alert { .. } => "alert",
102        }
103    }
104
105    fn close_to(actual: f64, expected: f64) -> bool {
106        (actual - expected).abs() < f64::EPSILON
107    }
108
109    #[test]
110    fn policy_action_defines_exact_pressure_actions() {
111        let slow = PolicyAction::SlowProducer {
112            reduction_factor: 0.5,
113        };
114        let shed = PolicyAction::ShedLoad {
115            priority_threshold: 3,
116        };
117        let scale = PolicyAction::ScaleConsumer;
118        let alert = PolicyAction::Alert {
119            severity: AlertSeverity::Warning,
120        };
121
122        assert_eq!(action_name(&slow), "slow-producer");
123        assert_eq!(action_name(&shed), "shed-load");
124        assert_eq!(action_name(&scale), "scale-consumer");
125        assert_eq!(action_name(&alert), "alert");
126    }
127
128    #[test]
129    fn pressure_policy_constructs_with_action_and_threshold() {
130        let policy = PressurePolicy {
131            threshold: 0.7,
132            action: PolicyAction::SlowProducer {
133                reduction_factor: 0.25,
134            },
135        };
136
137        assert!(close_to(policy.threshold, 0.7));
138        assert!(matches!(
139            policy.action,
140            PolicyAction::SlowProducer { reduction_factor } if close_to(reduction_factor, 0.25)
141        ));
142    }
143
144    #[test]
145    fn channel_policy_config_stores_policies_in_threshold_order() {
146        let high = PressurePolicy {
147            threshold: 0.9,
148            action: PolicyAction::ShedLoad {
149                priority_threshold: 2,
150            },
151        };
152        let low = PressurePolicy {
153            threshold: 0.5,
154            action: PolicyAction::SlowProducer {
155                reduction_factor: 0.5,
156            },
157        };
158
159        let config = ChannelPolicyConfig::new(vec![high, low]);
160
161        assert_eq!(config.len(), 2);
162        assert!(close_to(config.policies()[0].threshold, 0.5));
163        assert!(close_to(config.policies()[1].threshold, 0.9));
164    }
165
166    #[test]
167    fn policies_compose_in_escalation_sequence() {
168        let slow = PressurePolicy {
169            threshold: 0.5,
170            action: PolicyAction::SlowProducer {
171                reduction_factor: 0.5,
172            },
173        };
174        let shed = PressurePolicy {
175            threshold: 0.8,
176            action: PolicyAction::ShedLoad {
177                priority_threshold: 4,
178            },
179        };
180        let config = ChannelPolicyConfig::new(vec![shed, slow]);
181
182        let moderate = config.actions_for_pressure(0.6);
183        let high = config.actions_for_pressure(0.9);
184        let reduced = config.actions_for_pressure(0.6);
185        let clear = config.actions_for_pressure(0.3);
186
187        assert_eq!(moderate.len(), 1);
188        assert!(matches!(
189            moderate.as_slice(),
190            [PolicyAction::SlowProducer { reduction_factor }] if close_to(*reduction_factor, 0.5)
191        ));
192        assert_eq!(high.len(), 2);
193        assert!(matches!(high[0], PolicyAction::SlowProducer { .. }));
194        assert!(matches!(high[1], PolicyAction::ShedLoad { .. }));
195        assert_eq!(reduced.len(), 1);
196        assert!(clear.is_empty());
197    }
198
199    #[test]
200    fn pressure_root_re_exports_policy_types() {
201        use crate::pressure::{AlertSeverity as RootSeverity, PolicyAction as RootAction};
202
203        let action = RootAction::Alert {
204            severity: RootSeverity::Critical,
205        };
206
207        assert!(matches!(
208            action,
209            RootAction::Alert {
210                severity: RootSeverity::Critical
211            }
212        ));
213    }
214}