nodedb_bridge/
backpressure.rs1use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
22
23#[derive(Debug, Clone, Copy)]
25pub struct BackpressureConfig {
26 pub throttle_enter: u8,
28 pub throttle_exit: u8,
30 pub suspend_enter: u8,
32 pub suspend_exit: u8,
34}
35
36impl Default for BackpressureConfig {
37 fn default() -> Self {
38 Self {
39 throttle_enter: 85,
40 throttle_exit: 75,
41 suspend_enter: 95,
42 suspend_exit: 85,
43 }
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49#[repr(u8)]
50pub enum PressureState {
51 Normal = 0,
53 Throttled = 1,
55 Suspended = 2,
57}
58
59impl PressureState {
60 fn from_u8(v: u8) -> Self {
61 match v {
62 0 => Self::Normal,
63 1 => Self::Throttled,
64 _ => Self::Suspended,
65 }
66 }
67}
68
69pub struct BackpressureController {
74 config: BackpressureConfig,
75
76 state: AtomicU8,
78
79 throttle_count: AtomicU64,
81
82 suspend_count: AtomicU64,
84}
85
86impl BackpressureController {
87 pub fn new(config: BackpressureConfig) -> Self {
89 Self {
90 config,
91 state: AtomicU8::new(PressureState::Normal as u8),
92 throttle_count: AtomicU64::new(0),
93 suspend_count: AtomicU64::new(0),
94 }
95 }
96
97 pub fn update(&self, utilization_percent: u8) -> Option<PressureState> {
102 let current = PressureState::from_u8(self.state.load(Ordering::Relaxed));
103
104 let new_state = match current {
105 PressureState::Normal => {
106 if utilization_percent >= self.config.suspend_enter {
107 PressureState::Suspended
108 } else if utilization_percent >= self.config.throttle_enter {
109 PressureState::Throttled
110 } else {
111 return None;
112 }
113 }
114 PressureState::Throttled => {
115 if utilization_percent >= self.config.suspend_enter {
116 PressureState::Suspended
117 } else if utilization_percent < self.config.throttle_exit {
118 PressureState::Normal
119 } else {
120 return None;
121 }
122 }
123 PressureState::Suspended => {
124 if utilization_percent < self.config.suspend_exit {
125 if utilization_percent < self.config.throttle_exit {
126 PressureState::Normal
127 } else {
128 PressureState::Throttled
129 }
130 } else {
131 return None;
132 }
133 }
134 };
135
136 self.state.store(new_state as u8, Ordering::Release);
137
138 match new_state {
139 PressureState::Throttled => {
140 self.throttle_count.fetch_add(1, Ordering::Relaxed);
141 }
142 PressureState::Suspended => {
143 self.suspend_count.fetch_add(1, Ordering::Relaxed);
144 }
145 PressureState::Normal => {}
146 }
147
148 Some(new_state)
149 }
150
151 pub fn state(&self) -> PressureState {
153 PressureState::from_u8(self.state.load(Ordering::Acquire))
154 }
155
156 pub fn throttle_transitions(&self) -> u64 {
158 self.throttle_count.load(Ordering::Relaxed)
159 }
160
161 pub fn suspend_transitions(&self) -> u64 {
163 self.suspend_count.load(Ordering::Relaxed)
164 }
165}
166
167impl Default for BackpressureController {
168 fn default() -> Self {
169 Self::new(BackpressureConfig::default())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 #[test]
178 fn starts_normal() {
179 let bp = BackpressureController::default();
180 assert_eq!(bp.state(), PressureState::Normal);
181 }
182
183 #[test]
184 fn normal_to_throttled_at_85() {
185 let bp = BackpressureController::default();
186 assert!(bp.update(84).is_none());
187 assert_eq!(bp.update(85), Some(PressureState::Throttled));
188 assert_eq!(bp.state(), PressureState::Throttled);
189 assert_eq!(bp.throttle_transitions(), 1);
190 }
191
192 #[test]
193 fn throttled_to_suspended_at_95() {
194 let bp = BackpressureController::default();
195 bp.update(85); assert_eq!(bp.update(95), Some(PressureState::Suspended));
197 assert_eq!(bp.state(), PressureState::Suspended);
198 assert_eq!(bp.suspend_transitions(), 1);
199 }
200
201 #[test]
202 fn hysteresis_prevents_oscillation() {
203 let bp = BackpressureController::default();
204
205 bp.update(86);
207 assert_eq!(bp.state(), PressureState::Throttled);
208
209 assert!(bp.update(80).is_none());
211 assert_eq!(bp.state(), PressureState::Throttled);
212
213 assert_eq!(bp.update(74), Some(PressureState::Normal));
215 assert_eq!(bp.state(), PressureState::Normal);
216 }
217
218 #[test]
219 fn suspended_exits_through_throttled() {
220 let bp = BackpressureController::default();
221
222 bp.update(96);
224 assert_eq!(bp.state(), PressureState::Suspended);
225
226 assert_eq!(bp.update(84), Some(PressureState::Throttled));
228 assert_eq!(bp.state(), PressureState::Throttled);
229 }
230
231 #[test]
232 fn suspended_exits_to_normal_if_low_enough() {
233 let bp = BackpressureController::default();
234
235 bp.update(96); assert_eq!(bp.update(50), Some(PressureState::Normal));
238 assert_eq!(bp.state(), PressureState::Normal);
239 }
240
241 #[test]
242 fn normal_jumps_directly_to_suspended() {
243 let bp = BackpressureController::default();
244 assert_eq!(bp.update(96), Some(PressureState::Suspended));
245 assert_eq!(bp.suspend_transitions(), 1);
246 assert_eq!(bp.throttle_transitions(), 0);
247 }
248
249 #[test]
250 fn no_transition_when_stable() {
251 let bp = BackpressureController::default();
252 assert!(bp.update(50).is_none());
253 assert!(bp.update(60).is_none());
254 assert!(bp.update(70).is_none());
255 }
256}