nodedb_bridge/
backpressure.rs1use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
20
21#[derive(Debug, Clone, Copy)]
23pub struct BackpressureConfig {
24 pub throttle_enter: u8,
26 pub throttle_exit: u8,
28 pub suspend_enter: u8,
30 pub suspend_exit: u8,
32}
33
34impl Default for BackpressureConfig {
35 fn default() -> Self {
36 Self {
37 throttle_enter: 85,
38 throttle_exit: 75,
39 suspend_enter: 95,
40 suspend_exit: 85,
41 }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47#[repr(u8)]
48pub enum PressureState {
49 Normal = 0,
51 Throttled = 1,
53 Suspended = 2,
55}
56
57impl PressureState {
58 fn from_u8(v: u8) -> Self {
59 match v {
60 0 => Self::Normal,
61 1 => Self::Throttled,
62 _ => Self::Suspended,
63 }
64 }
65}
66
67pub struct BackpressureController {
72 config: BackpressureConfig,
73
74 state: AtomicU8,
76
77 throttle_count: AtomicU64,
79
80 suspend_count: AtomicU64,
82}
83
84impl BackpressureController {
85 pub fn new(config: BackpressureConfig) -> Self {
87 Self {
88 config,
89 state: AtomicU8::new(PressureState::Normal as u8),
90 throttle_count: AtomicU64::new(0),
91 suspend_count: AtomicU64::new(0),
92 }
93 }
94
95 pub fn update(&self, utilization_percent: u8) -> Option<PressureState> {
100 let current = PressureState::from_u8(self.state.load(Ordering::Relaxed));
101
102 let new_state = match current {
103 PressureState::Normal => {
104 if utilization_percent >= self.config.suspend_enter {
105 PressureState::Suspended
106 } else if utilization_percent >= self.config.throttle_enter {
107 PressureState::Throttled
108 } else {
109 return None;
110 }
111 }
112 PressureState::Throttled => {
113 if utilization_percent >= self.config.suspend_enter {
114 PressureState::Suspended
115 } else if utilization_percent < self.config.throttle_exit {
116 PressureState::Normal
117 } else {
118 return None;
119 }
120 }
121 PressureState::Suspended => {
122 if utilization_percent < self.config.suspend_exit {
123 if utilization_percent < self.config.throttle_exit {
124 PressureState::Normal
125 } else {
126 PressureState::Throttled
127 }
128 } else {
129 return None;
130 }
131 }
132 };
133
134 self.state.store(new_state as u8, Ordering::Release);
135
136 match new_state {
137 PressureState::Throttled => {
138 self.throttle_count.fetch_add(1, Ordering::Relaxed);
139 }
140 PressureState::Suspended => {
141 self.suspend_count.fetch_add(1, Ordering::Relaxed);
142 }
143 PressureState::Normal => {}
144 }
145
146 Some(new_state)
147 }
148
149 pub fn state(&self) -> PressureState {
151 PressureState::from_u8(self.state.load(Ordering::Acquire))
152 }
153
154 pub fn throttle_transitions(&self) -> u64 {
156 self.throttle_count.load(Ordering::Relaxed)
157 }
158
159 pub fn suspend_transitions(&self) -> u64 {
161 self.suspend_count.load(Ordering::Relaxed)
162 }
163}
164
165impl Default for BackpressureController {
166 fn default() -> Self {
167 Self::new(BackpressureConfig::default())
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn starts_normal() {
177 let bp = BackpressureController::default();
178 assert_eq!(bp.state(), PressureState::Normal);
179 }
180
181 #[test]
182 fn normal_to_throttled_at_85() {
183 let bp = BackpressureController::default();
184 assert!(bp.update(84).is_none());
185 assert_eq!(bp.update(85), Some(PressureState::Throttled));
186 assert_eq!(bp.state(), PressureState::Throttled);
187 assert_eq!(bp.throttle_transitions(), 1);
188 }
189
190 #[test]
191 fn throttled_to_suspended_at_95() {
192 let bp = BackpressureController::default();
193 bp.update(85); assert_eq!(bp.update(95), Some(PressureState::Suspended));
195 assert_eq!(bp.state(), PressureState::Suspended);
196 assert_eq!(bp.suspend_transitions(), 1);
197 }
198
199 #[test]
200 fn hysteresis_prevents_oscillation() {
201 let bp = BackpressureController::default();
202
203 bp.update(86);
205 assert_eq!(bp.state(), PressureState::Throttled);
206
207 assert!(bp.update(80).is_none());
209 assert_eq!(bp.state(), PressureState::Throttled);
210
211 assert_eq!(bp.update(74), Some(PressureState::Normal));
213 assert_eq!(bp.state(), PressureState::Normal);
214 }
215
216 #[test]
217 fn suspended_exits_through_throttled() {
218 let bp = BackpressureController::default();
219
220 bp.update(96);
222 assert_eq!(bp.state(), PressureState::Suspended);
223
224 assert_eq!(bp.update(84), Some(PressureState::Throttled));
226 assert_eq!(bp.state(), PressureState::Throttled);
227 }
228
229 #[test]
230 fn suspended_exits_to_normal_if_low_enough() {
231 let bp = BackpressureController::default();
232
233 bp.update(96); assert_eq!(bp.update(50), Some(PressureState::Normal));
236 assert_eq!(bp.state(), PressureState::Normal);
237 }
238
239 #[test]
240 fn normal_jumps_directly_to_suspended() {
241 let bp = BackpressureController::default();
242 assert_eq!(bp.update(96), Some(PressureState::Suspended));
243 assert_eq!(bp.suspend_transitions(), 1);
244 assert_eq!(bp.throttle_transitions(), 0);
245 }
246
247 #[test]
248 fn no_transition_when_stable() {
249 let bp = BackpressureController::default();
250 assert!(bp.update(50).is_none());
251 assert!(bp.update(60).is_none());
252 assert!(bp.update(70).is_none());
253 }
254}