Skip to main content

pjson_rs_domain/value_objects/
backpressure.rs

1use serde::{Deserialize, Serialize};
2
3/// Signal indicating client's receive buffer state for backpressure control
4///
5/// Clients send backpressure signals to inform the server about their processing
6/// capacity. The server uses these signals to throttle or pause frame transmission.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8#[non_exhaustive]
9pub enum BackpressureSignal {
10    /// Client is ready for more data, no throttling needed
11    #[default]
12    Ok,
13
14    /// Client's buffer is filling up, server should slow down transmission
15    SlowDown,
16
17    /// Client's buffer is full, server must pause transmission
18    Pause,
19}
20
21impl BackpressureSignal {
22    /// Returns true if this signal indicates the server should pause
23    pub fn should_pause(&self) -> bool {
24        matches!(self, BackpressureSignal::Pause)
25    }
26
27    /// Returns true if this signal indicates the server should slow down
28    pub fn should_throttle(&self) -> bool {
29        matches!(
30            self,
31            BackpressureSignal::SlowDown | BackpressureSignal::Pause
32        )
33    }
34
35    /// Get suggested delay in milliseconds based on backpressure signal
36    pub fn suggested_delay_ms(&self) -> u64 {
37        match self {
38            BackpressureSignal::Ok => 0,
39            BackpressureSignal::SlowDown => 100,
40            BackpressureSignal::Pause => u64::MAX, // Indefinite pause until resumed
41        }
42    }
43}
44
45impl std::fmt::Display for BackpressureSignal {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            BackpressureSignal::Ok => write!(f, "OK"),
49            BackpressureSignal::SlowDown => write!(f, "SLOW_DOWN"),
50            BackpressureSignal::Pause => write!(f, "PAUSE"),
51        }
52    }
53}
54
55/// Credit-based flow control state
56///
57/// Tracks available credits for frame transmission. Each frame consumes credits,
58/// and the client replenishes credits by sending backpressure signals.
59#[derive(Debug, Clone)]
60pub struct FlowControlCredits {
61    available: usize,
62    max_credits: usize,
63}
64
65impl FlowControlCredits {
66    /// Create new credit tracker with maximum credits
67    pub fn new(max_credits: usize) -> Self {
68        Self {
69            available: max_credits,
70            max_credits,
71        }
72    }
73
74    /// Check if enough credits are available for a frame
75    pub fn has_credits(&self, required: usize) -> bool {
76        self.available >= required
77    }
78
79    /// Consume credits for frame transmission
80    ///
81    /// Returns an error if not enough credits are available
82    pub fn consume(&mut self, amount: usize) -> Result<(), String> {
83        if self.available < amount {
84            return Err(format!(
85                "Insufficient credits: needed {}, available {}",
86                amount, self.available
87            ));
88        }
89        self.available = self.available.saturating_sub(amount);
90        Ok(())
91    }
92
93    /// Add credits back to the pool
94    ///
95    /// Credits are capped at max_credits
96    pub fn add(&mut self, amount: usize) {
97        self.available = (self.available.saturating_add(amount)).min(self.max_credits);
98    }
99
100    /// Get current available credits
101    pub fn available(&self) -> usize {
102        self.available
103    }
104
105    /// Get maximum allowed credits
106    pub fn max_credits(&self) -> usize {
107        self.max_credits
108    }
109
110    /// Check if credits are exhausted
111    pub fn is_exhausted(&self) -> bool {
112        self.available == 0
113    }
114
115    /// Reset credits to maximum
116    pub fn reset(&mut self) {
117        self.available = self.max_credits;
118    }
119}
120
121impl Default for FlowControlCredits {
122    fn default() -> Self {
123        Self::new(1000) // Default 1000 credits
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_backpressure_signal_default() {
133        let signal = BackpressureSignal::default();
134        assert_eq!(signal, BackpressureSignal::Ok);
135        assert!(!signal.should_pause());
136        assert!(!signal.should_throttle());
137    }
138
139    #[test]
140    fn test_backpressure_signal_should_pause() {
141        assert!(BackpressureSignal::Pause.should_pause());
142        assert!(!BackpressureSignal::SlowDown.should_pause());
143        assert!(!BackpressureSignal::Ok.should_pause());
144    }
145
146    #[test]
147    fn test_backpressure_signal_should_throttle() {
148        assert!(BackpressureSignal::Pause.should_throttle());
149        assert!(BackpressureSignal::SlowDown.should_throttle());
150        assert!(!BackpressureSignal::Ok.should_throttle());
151    }
152
153    #[test]
154    fn test_backpressure_signal_suggested_delay() {
155        assert_eq!(BackpressureSignal::Ok.suggested_delay_ms(), 0);
156        assert_eq!(BackpressureSignal::SlowDown.suggested_delay_ms(), 100);
157        assert_eq!(BackpressureSignal::Pause.suggested_delay_ms(), u64::MAX);
158    }
159
160    #[test]
161    fn test_backpressure_signal_display() {
162        assert_eq!(BackpressureSignal::Ok.to_string(), "OK");
163        assert_eq!(BackpressureSignal::SlowDown.to_string(), "SLOW_DOWN");
164        assert_eq!(BackpressureSignal::Pause.to_string(), "PAUSE");
165    }
166
167    #[test]
168    fn test_backpressure_signal_serialization() {
169        let signal = BackpressureSignal::SlowDown;
170        let json = serde_json::to_string(&signal).unwrap();
171        let deserialized: BackpressureSignal = serde_json::from_str(&json).unwrap();
172        assert_eq!(signal, deserialized);
173    }
174
175    #[test]
176    fn test_flow_control_credits_new() {
177        let credits = FlowControlCredits::new(100);
178        assert_eq!(credits.available(), 100);
179        assert_eq!(credits.max_credits(), 100);
180        assert!(!credits.is_exhausted());
181    }
182
183    #[test]
184    fn test_flow_control_credits_default() {
185        let credits = FlowControlCredits::default();
186        assert_eq!(credits.available(), 1000);
187        assert_eq!(credits.max_credits(), 1000);
188    }
189
190    #[test]
191    fn test_flow_control_credits_has_credits() {
192        let credits = FlowControlCredits::new(100);
193        assert!(credits.has_credits(50));
194        assert!(credits.has_credits(100));
195        assert!(!credits.has_credits(101));
196    }
197
198    #[test]
199    fn test_flow_control_credits_consume() {
200        let mut credits = FlowControlCredits::new(100);
201
202        assert!(credits.consume(30).is_ok());
203        assert_eq!(credits.available(), 70);
204
205        assert!(credits.consume(70).is_ok());
206        assert_eq!(credits.available(), 0);
207        assert!(credits.is_exhausted());
208
209        assert!(credits.consume(1).is_err());
210    }
211
212    #[test]
213    fn test_flow_control_credits_add() {
214        let mut credits = FlowControlCredits::new(100);
215
216        credits.consume(50).unwrap();
217        assert_eq!(credits.available(), 50);
218
219        credits.add(30);
220        assert_eq!(credits.available(), 80);
221
222        // Cannot exceed max
223        credits.add(100);
224        assert_eq!(credits.available(), 100);
225    }
226
227    #[test]
228    fn test_flow_control_credits_reset() {
229        let mut credits = FlowControlCredits::new(100);
230
231        credits.consume(80).unwrap();
232        assert_eq!(credits.available(), 20);
233
234        credits.reset();
235        assert_eq!(credits.available(), 100);
236        assert!(!credits.is_exhausted());
237    }
238
239    #[test]
240    fn test_flow_control_credits_saturating_operations() {
241        let mut credits = FlowControlCredits::new(100);
242
243        // Consume all credits
244        credits.consume(100).unwrap();
245
246        // Try to consume more (should fail gracefully)
247        assert!(credits.consume(1).is_err());
248        assert_eq!(credits.available(), 0);
249
250        // Add back more than max
251        credits.add(usize::MAX);
252        assert_eq!(credits.available(), 100); // Should cap at max
253    }
254
255    #[test]
256    fn test_flow_control_credits_is_exhausted() {
257        let mut credits = FlowControlCredits::new(10);
258
259        assert!(!credits.is_exhausted());
260
261        credits.consume(10).unwrap();
262        assert!(credits.is_exhausted());
263
264        credits.add(1);
265        assert!(!credits.is_exhausted());
266    }
267
268    #[test]
269    fn test_flow_control_credits_error_message() {
270        let mut credits = FlowControlCredits::new(50);
271
272        let result = credits.consume(100);
273        assert!(result.is_err());
274
275        let error_msg = result.unwrap_err();
276        assert!(error_msg.contains("Insufficient credits"));
277        assert!(error_msg.contains("needed 100"));
278        assert!(error_msg.contains("available 50"));
279    }
280}