Skip to main content

nodedb_bridge/
backpressure.rs

1//! Adaptive backpressure controller.
2//!
3//! Monitors SPSC queue utilization and drives state transitions:
4//!
5//! ```text
6//! ┌──────────┐   >85%   ┌──────────────┐  >95%  ┌───────────┐
7//! │  Normal  │ ───────→ │  Throttled   │ ──────→│ Suspended │
8//! └──────────┘          └──────────────┘        └───────────┘
9//!      ↑    <75%              ↑    <85%               │
10//!      └────────────────────  └───────────────────────┘
11//! ```
12//!
13//! - **Normal**: Data Plane processes all I/O at full speed.
14//! - **Throttled**: Data Plane reduces read depth (fewer concurrent io_uring reads).
15//! - **Suspended**: Data Plane suspends new read submissions (except replay-critical I/O).
16//!
17//! Hysteresis (10% gap) prevents oscillation at threshold boundaries.
18
19use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
20
21/// Backpressure thresholds (percentage of queue capacity).
22#[derive(Debug, Clone, Copy)]
23pub struct BackpressureConfig {
24    /// Enter Throttled state when utilization exceeds this (default: 85%).
25    pub throttle_enter: u8,
26    /// Exit Throttled back to Normal when utilization drops below this (default: 75%).
27    pub throttle_exit: u8,
28    /// Enter Suspended state when utilization exceeds this (default: 95%).
29    pub suspend_enter: u8,
30    /// Exit Suspended back to Throttled when utilization drops below this (default: 85%).
31    pub suspend_exit: u8,
32}
33
34impl Default for BackpressureConfig {
35    fn default() -> Self {
36        Self {
37            throttle_enter: 85,
38            throttle_exit: 75,
39            suspend_enter: 95,
40            suspend_exit: 85,
41        }
42    }
43}
44
45/// Current backpressure state.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47#[repr(u8)]
48pub enum PressureState {
49    /// Full speed — all I/O permitted.
50    Normal = 0,
51    /// Reduced read depth — Data Plane should limit concurrent io_uring reads.
52    Throttled = 1,
53    /// New reads suspended — only replay-critical I/O permitted.
54    Suspended = 2,
55}
56
57impl PressureState {
58    fn from_u8(v: u8) -> Self {
59        match v {
60            0 => Self::Normal,
61            1 => Self::Throttled,
62            _ => Self::Suspended,
63        }
64    }
65}
66
67/// Tracks backpressure state and transition counts.
68///
69/// Called by the bridge on every push/pop to update state. The Data Plane
70/// reads the current state to decide its I/O behavior.
71pub struct BackpressureController {
72    config: BackpressureConfig,
73
74    /// Current state (atomic for cross-thread reads).
75    state: AtomicU8,
76
77    /// Number of transitions into Throttled.
78    throttle_count: AtomicU64,
79
80    /// Number of transitions into Suspended.
81    suspend_count: AtomicU64,
82}
83
84impl BackpressureController {
85    /// Create a new controller with the given config.
86    pub fn new(config: BackpressureConfig) -> Self {
87        Self {
88            config,
89            state: AtomicU8::new(PressureState::Normal as u8),
90            throttle_count: AtomicU64::new(0),
91            suspend_count: AtomicU64::new(0),
92        }
93    }
94
95    /// Update the backpressure state based on current queue utilization.
96    ///
97    /// Returns the new state if a transition occurred, or `None` if unchanged.
98    /// The caller SHOULD emit a tracing event on transitions.
99    pub fn update(&self, utilization_percent: u8) -> Option<PressureState> {
100        let current = PressureState::from_u8(self.state.load(Ordering::Relaxed));
101
102        let new_state = match current {
103            PressureState::Normal => {
104                if utilization_percent >= self.config.suspend_enter {
105                    PressureState::Suspended
106                } else if utilization_percent >= self.config.throttle_enter {
107                    PressureState::Throttled
108                } else {
109                    return None;
110                }
111            }
112            PressureState::Throttled => {
113                if utilization_percent >= self.config.suspend_enter {
114                    PressureState::Suspended
115                } else if utilization_percent < self.config.throttle_exit {
116                    PressureState::Normal
117                } else {
118                    return None;
119                }
120            }
121            PressureState::Suspended => {
122                if utilization_percent < self.config.suspend_exit {
123                    if utilization_percent < self.config.throttle_exit {
124                        PressureState::Normal
125                    } else {
126                        PressureState::Throttled
127                    }
128                } else {
129                    return None;
130                }
131            }
132        };
133
134        self.state.store(new_state as u8, Ordering::Release);
135
136        match new_state {
137            PressureState::Throttled => {
138                self.throttle_count.fetch_add(1, Ordering::Relaxed);
139            }
140            PressureState::Suspended => {
141                self.suspend_count.fetch_add(1, Ordering::Relaxed);
142            }
143            PressureState::Normal => {}
144        }
145
146        Some(new_state)
147    }
148
149    /// Current backpressure state.
150    pub fn state(&self) -> PressureState {
151        PressureState::from_u8(self.state.load(Ordering::Acquire))
152    }
153
154    /// Number of times the controller transitioned into Throttled.
155    pub fn throttle_transitions(&self) -> u64 {
156        self.throttle_count.load(Ordering::Relaxed)
157    }
158
159    /// Number of times the controller transitioned into Suspended.
160    pub fn suspend_transitions(&self) -> u64 {
161        self.suspend_count.load(Ordering::Relaxed)
162    }
163}
164
165impl Default for BackpressureController {
166    fn default() -> Self {
167        Self::new(BackpressureConfig::default())
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn starts_normal() {
177        let bp = BackpressureController::default();
178        assert_eq!(bp.state(), PressureState::Normal);
179    }
180
181    #[test]
182    fn normal_to_throttled_at_85() {
183        let bp = BackpressureController::default();
184        assert!(bp.update(84).is_none());
185        assert_eq!(bp.update(85), Some(PressureState::Throttled));
186        assert_eq!(bp.state(), PressureState::Throttled);
187        assert_eq!(bp.throttle_transitions(), 1);
188    }
189
190    #[test]
191    fn throttled_to_suspended_at_95() {
192        let bp = BackpressureController::default();
193        bp.update(85); // → Throttled
194        assert_eq!(bp.update(95), Some(PressureState::Suspended));
195        assert_eq!(bp.state(), PressureState::Suspended);
196        assert_eq!(bp.suspend_transitions(), 1);
197    }
198
199    #[test]
200    fn hysteresis_prevents_oscillation() {
201        let bp = BackpressureController::default();
202
203        // Enter throttled.
204        bp.update(86);
205        assert_eq!(bp.state(), PressureState::Throttled);
206
207        // Drop to 80% — still above exit threshold (75%), stays throttled.
208        assert!(bp.update(80).is_none());
209        assert_eq!(bp.state(), PressureState::Throttled);
210
211        // Drop to 74% — below exit threshold, returns to normal.
212        assert_eq!(bp.update(74), Some(PressureState::Normal));
213        assert_eq!(bp.state(), PressureState::Normal);
214    }
215
216    #[test]
217    fn suspended_exits_through_throttled() {
218        let bp = BackpressureController::default();
219
220        // Normal → Suspended (jump past 95%).
221        bp.update(96);
222        assert_eq!(bp.state(), PressureState::Suspended);
223
224        // Drop to 84% — below suspend_exit but above throttle_exit.
225        assert_eq!(bp.update(84), Some(PressureState::Throttled));
226        assert_eq!(bp.state(), PressureState::Throttled);
227    }
228
229    #[test]
230    fn suspended_exits_to_normal_if_low_enough() {
231        let bp = BackpressureController::default();
232
233        bp.update(96); // → Suspended
234        // Drop dramatically below throttle_exit.
235        assert_eq!(bp.update(50), Some(PressureState::Normal));
236        assert_eq!(bp.state(), PressureState::Normal);
237    }
238
239    #[test]
240    fn normal_jumps_directly_to_suspended() {
241        let bp = BackpressureController::default();
242        assert_eq!(bp.update(96), Some(PressureState::Suspended));
243        assert_eq!(bp.suspend_transitions(), 1);
244        assert_eq!(bp.throttle_transitions(), 0);
245    }
246
247    #[test]
248    fn no_transition_when_stable() {
249        let bp = BackpressureController::default();
250        assert!(bp.update(50).is_none());
251        assert!(bp.update(60).is_none());
252        assert!(bp.update(70).is_none());
253    }
254}