Skip to main content

nodedb_bridge/
backpressure.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Adaptive backpressure controller.
4//!
5//! Monitors SPSC queue utilization and drives state transitions:
6//!
7//! ```text
8//! ┌──────────┐   >85%   ┌──────────────┐  >95%  ┌───────────┐
9//! │  Normal  │ ───────→ │  Throttled   │ ──────→│ Suspended │
10//! └──────────┘          └──────────────┘        └───────────┘
11//!      ↑    <75%              ↑    <85%               │
12//!      └────────────────────  └───────────────────────┘
13//! ```
14//!
15//! - **Normal**: Data Plane processes all I/O at full speed.
16//! - **Throttled**: Data Plane reduces read depth (fewer concurrent io_uring reads).
17//! - **Suspended**: Data Plane suspends new read submissions (except replay-critical I/O).
18//!
19//! Hysteresis (10% gap) prevents oscillation at threshold boundaries.
20
21use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
22
23/// Backpressure thresholds (percentage of queue capacity).
24#[derive(Debug, Clone, Copy)]
25pub struct BackpressureConfig {
26    /// Enter Throttled state when utilization exceeds this (default: 85%).
27    pub throttle_enter: u8,
28    /// Exit Throttled back to Normal when utilization drops below this (default: 75%).
29    pub throttle_exit: u8,
30    /// Enter Suspended state when utilization exceeds this (default: 95%).
31    pub suspend_enter: u8,
32    /// Exit Suspended back to Throttled when utilization drops below this (default: 85%).
33    pub suspend_exit: u8,
34}
35
36impl Default for BackpressureConfig {
37    fn default() -> Self {
38        Self {
39            throttle_enter: 85,
40            throttle_exit: 75,
41            suspend_enter: 95,
42            suspend_exit: 85,
43        }
44    }
45}
46
47/// Current backpressure state.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49#[repr(u8)]
50pub enum PressureState {
51    /// Full speed — all I/O permitted.
52    Normal = 0,
53    /// Reduced read depth — Data Plane should limit concurrent io_uring reads.
54    Throttled = 1,
55    /// New reads suspended — only replay-critical I/O permitted.
56    Suspended = 2,
57}
58
59impl PressureState {
60    fn from_u8(v: u8) -> Self {
61        match v {
62            0 => Self::Normal,
63            1 => Self::Throttled,
64            _ => Self::Suspended,
65        }
66    }
67}
68
69/// Tracks backpressure state and transition counts.
70///
71/// Called by the bridge on every push/pop to update state. The Data Plane
72/// reads the current state to decide its I/O behavior.
73pub struct BackpressureController {
74    config: BackpressureConfig,
75
76    /// Current state (atomic for cross-thread reads).
77    state: AtomicU8,
78
79    /// Number of transitions into Throttled.
80    throttle_count: AtomicU64,
81
82    /// Number of transitions into Suspended.
83    suspend_count: AtomicU64,
84}
85
86impl BackpressureController {
87    /// Create a new controller with the given config.
88    pub fn new(config: BackpressureConfig) -> Self {
89        Self {
90            config,
91            state: AtomicU8::new(PressureState::Normal as u8),
92            throttle_count: AtomicU64::new(0),
93            suspend_count: AtomicU64::new(0),
94        }
95    }
96
97    /// Update the backpressure state based on current queue utilization.
98    ///
99    /// Returns the new state if a transition occurred, or `None` if unchanged.
100    /// The caller SHOULD emit a tracing event on transitions.
101    pub fn update(&self, utilization_percent: u8) -> Option<PressureState> {
102        let current = PressureState::from_u8(self.state.load(Ordering::Relaxed));
103
104        let new_state = match current {
105            PressureState::Normal => {
106                if utilization_percent >= self.config.suspend_enter {
107                    PressureState::Suspended
108                } else if utilization_percent >= self.config.throttle_enter {
109                    PressureState::Throttled
110                } else {
111                    return None;
112                }
113            }
114            PressureState::Throttled => {
115                if utilization_percent >= self.config.suspend_enter {
116                    PressureState::Suspended
117                } else if utilization_percent < self.config.throttle_exit {
118                    PressureState::Normal
119                } else {
120                    return None;
121                }
122            }
123            PressureState::Suspended => {
124                if utilization_percent < self.config.suspend_exit {
125                    if utilization_percent < self.config.throttle_exit {
126                        PressureState::Normal
127                    } else {
128                        PressureState::Throttled
129                    }
130                } else {
131                    return None;
132                }
133            }
134        };
135
136        self.state.store(new_state as u8, Ordering::Release);
137
138        match new_state {
139            PressureState::Throttled => {
140                self.throttle_count.fetch_add(1, Ordering::Relaxed);
141            }
142            PressureState::Suspended => {
143                self.suspend_count.fetch_add(1, Ordering::Relaxed);
144            }
145            PressureState::Normal => {}
146        }
147
148        Some(new_state)
149    }
150
151    /// Current backpressure state.
152    pub fn state(&self) -> PressureState {
153        PressureState::from_u8(self.state.load(Ordering::Acquire))
154    }
155
156    /// Number of times the controller transitioned into Throttled.
157    pub fn throttle_transitions(&self) -> u64 {
158        self.throttle_count.load(Ordering::Relaxed)
159    }
160
161    /// Number of times the controller transitioned into Suspended.
162    pub fn suspend_transitions(&self) -> u64 {
163        self.suspend_count.load(Ordering::Relaxed)
164    }
165}
166
167impl Default for BackpressureController {
168    fn default() -> Self {
169        Self::new(BackpressureConfig::default())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn starts_normal() {
179        let bp = BackpressureController::default();
180        assert_eq!(bp.state(), PressureState::Normal);
181    }
182
183    #[test]
184    fn normal_to_throttled_at_85() {
185        let bp = BackpressureController::default();
186        assert!(bp.update(84).is_none());
187        assert_eq!(bp.update(85), Some(PressureState::Throttled));
188        assert_eq!(bp.state(), PressureState::Throttled);
189        assert_eq!(bp.throttle_transitions(), 1);
190    }
191
192    #[test]
193    fn throttled_to_suspended_at_95() {
194        let bp = BackpressureController::default();
195        bp.update(85); // → Throttled
196        assert_eq!(bp.update(95), Some(PressureState::Suspended));
197        assert_eq!(bp.state(), PressureState::Suspended);
198        assert_eq!(bp.suspend_transitions(), 1);
199    }
200
201    #[test]
202    fn hysteresis_prevents_oscillation() {
203        let bp = BackpressureController::default();
204
205        // Enter throttled.
206        bp.update(86);
207        assert_eq!(bp.state(), PressureState::Throttled);
208
209        // Drop to 80% — still above exit threshold (75%), stays throttled.
210        assert!(bp.update(80).is_none());
211        assert_eq!(bp.state(), PressureState::Throttled);
212
213        // Drop to 74% — below exit threshold, returns to normal.
214        assert_eq!(bp.update(74), Some(PressureState::Normal));
215        assert_eq!(bp.state(), PressureState::Normal);
216    }
217
218    #[test]
219    fn suspended_exits_through_throttled() {
220        let bp = BackpressureController::default();
221
222        // Normal → Suspended (jump past 95%).
223        bp.update(96);
224        assert_eq!(bp.state(), PressureState::Suspended);
225
226        // Drop to 84% — below suspend_exit but above throttle_exit.
227        assert_eq!(bp.update(84), Some(PressureState::Throttled));
228        assert_eq!(bp.state(), PressureState::Throttled);
229    }
230
231    #[test]
232    fn suspended_exits_to_normal_if_low_enough() {
233        let bp = BackpressureController::default();
234
235        bp.update(96); // → Suspended
236        // Drop dramatically below throttle_exit.
237        assert_eq!(bp.update(50), Some(PressureState::Normal));
238        assert_eq!(bp.state(), PressureState::Normal);
239    }
240
241    #[test]
242    fn normal_jumps_directly_to_suspended() {
243        let bp = BackpressureController::default();
244        assert_eq!(bp.update(96), Some(PressureState::Suspended));
245        assert_eq!(bp.suspend_transitions(), 1);
246        assert_eq!(bp.throttle_transitions(), 0);
247    }
248
249    #[test]
250    fn no_transition_when_stable() {
251        let bp = BackpressureController::default();
252        assert!(bp.update(50).is_none());
253        assert!(bp.update(60).is_none());
254        assert!(bp.update(70).is_none());
255    }
256}