Skip to main content

pjson_rs_domain/value_objects/
backpressure.rs

1use serde::{Deserialize, Serialize};
2
3/// Signal indicating client's receive buffer state for backpressure control
4///
5/// Clients send backpressure signals to inform the server about their processing
6/// capacity. The server uses these signals to throttle or pause frame transmission.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8pub enum BackpressureSignal {
9    /// Client is ready for more data, no throttling needed
10    #[default]
11    Ok,
12
13    /// Client's buffer is filling up, server should slow down transmission
14    SlowDown,
15
16    /// Client's buffer is full, server must pause transmission
17    Pause,
18}
19
20impl BackpressureSignal {
21    /// Returns true if this signal indicates the server should pause
22    pub fn should_pause(&self) -> bool {
23        matches!(self, BackpressureSignal::Pause)
24    }
25
26    /// Returns true if this signal indicates the server should slow down
27    pub fn should_throttle(&self) -> bool {
28        matches!(
29            self,
30            BackpressureSignal::SlowDown | BackpressureSignal::Pause
31        )
32    }
33
34    /// Get suggested delay in milliseconds based on backpressure signal
35    pub fn suggested_delay_ms(&self) -> u64 {
36        match self {
37            BackpressureSignal::Ok => 0,
38            BackpressureSignal::SlowDown => 100,
39            BackpressureSignal::Pause => u64::MAX, // Indefinite pause until resumed
40        }
41    }
42}
43
44impl std::fmt::Display for BackpressureSignal {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        match self {
47            BackpressureSignal::Ok => write!(f, "OK"),
48            BackpressureSignal::SlowDown => write!(f, "SLOW_DOWN"),
49            BackpressureSignal::Pause => write!(f, "PAUSE"),
50        }
51    }
52}
53
54/// Credit-based flow control state
55///
56/// Tracks available credits for frame transmission. Each frame consumes credits,
57/// and the client replenishes credits by sending backpressure signals.
58#[derive(Debug, Clone)]
59pub struct FlowControlCredits {
60    available: usize,
61    max_credits: usize,
62}
63
64impl FlowControlCredits {
65    /// Create new credit tracker with maximum credits
66    pub fn new(max_credits: usize) -> Self {
67        Self {
68            available: max_credits,
69            max_credits,
70        }
71    }
72
73    /// Check if enough credits are available for a frame
74    pub fn has_credits(&self, required: usize) -> bool {
75        self.available >= required
76    }
77
78    /// Consume credits for frame transmission
79    ///
80    /// Returns an error if not enough credits are available
81    pub fn consume(&mut self, amount: usize) -> Result<(), String> {
82        if self.available < amount {
83            return Err(format!(
84                "Insufficient credits: needed {}, available {}",
85                amount, self.available
86            ));
87        }
88        self.available = self.available.saturating_sub(amount);
89        Ok(())
90    }
91
92    /// Add credits back to the pool
93    ///
94    /// Credits are capped at max_credits
95    pub fn add(&mut self, amount: usize) {
96        self.available = (self.available.saturating_add(amount)).min(self.max_credits);
97    }
98
99    /// Get current available credits
100    pub fn available(&self) -> usize {
101        self.available
102    }
103
104    /// Get maximum allowed credits
105    pub fn max_credits(&self) -> usize {
106        self.max_credits
107    }
108
109    /// Check if credits are exhausted
110    pub fn is_exhausted(&self) -> bool {
111        self.available == 0
112    }
113
114    /// Reset credits to maximum
115    pub fn reset(&mut self) {
116        self.available = self.max_credits;
117    }
118}
119
120impl Default for FlowControlCredits {
121    fn default() -> Self {
122        Self::new(1000) // Default 1000 credits
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn test_backpressure_signal_default() {
132        let signal = BackpressureSignal::default();
133        assert_eq!(signal, BackpressureSignal::Ok);
134        assert!(!signal.should_pause());
135        assert!(!signal.should_throttle());
136    }
137
138    #[test]
139    fn test_backpressure_signal_should_pause() {
140        assert!(BackpressureSignal::Pause.should_pause());
141        assert!(!BackpressureSignal::SlowDown.should_pause());
142        assert!(!BackpressureSignal::Ok.should_pause());
143    }
144
145    #[test]
146    fn test_backpressure_signal_should_throttle() {
147        assert!(BackpressureSignal::Pause.should_throttle());
148        assert!(BackpressureSignal::SlowDown.should_throttle());
149        assert!(!BackpressureSignal::Ok.should_throttle());
150    }
151
152    #[test]
153    fn test_backpressure_signal_suggested_delay() {
154        assert_eq!(BackpressureSignal::Ok.suggested_delay_ms(), 0);
155        assert_eq!(BackpressureSignal::SlowDown.suggested_delay_ms(), 100);
156        assert_eq!(BackpressureSignal::Pause.suggested_delay_ms(), u64::MAX);
157    }
158
159    #[test]
160    fn test_backpressure_signal_display() {
161        assert_eq!(BackpressureSignal::Ok.to_string(), "OK");
162        assert_eq!(BackpressureSignal::SlowDown.to_string(), "SLOW_DOWN");
163        assert_eq!(BackpressureSignal::Pause.to_string(), "PAUSE");
164    }
165
166    #[test]
167    fn test_backpressure_signal_serialization() {
168        let signal = BackpressureSignal::SlowDown;
169        let json = serde_json::to_string(&signal).unwrap();
170        let deserialized: BackpressureSignal = serde_json::from_str(&json).unwrap();
171        assert_eq!(signal, deserialized);
172    }
173
174    #[test]
175    fn test_flow_control_credits_new() {
176        let credits = FlowControlCredits::new(100);
177        assert_eq!(credits.available(), 100);
178        assert_eq!(credits.max_credits(), 100);
179        assert!(!credits.is_exhausted());
180    }
181
182    #[test]
183    fn test_flow_control_credits_default() {
184        let credits = FlowControlCredits::default();
185        assert_eq!(credits.available(), 1000);
186        assert_eq!(credits.max_credits(), 1000);
187    }
188
189    #[test]
190    fn test_flow_control_credits_has_credits() {
191        let credits = FlowControlCredits::new(100);
192        assert!(credits.has_credits(50));
193        assert!(credits.has_credits(100));
194        assert!(!credits.has_credits(101));
195    }
196
197    #[test]
198    fn test_flow_control_credits_consume() {
199        let mut credits = FlowControlCredits::new(100);
200
201        assert!(credits.consume(30).is_ok());
202        assert_eq!(credits.available(), 70);
203
204        assert!(credits.consume(70).is_ok());
205        assert_eq!(credits.available(), 0);
206        assert!(credits.is_exhausted());
207
208        assert!(credits.consume(1).is_err());
209    }
210
211    #[test]
212    fn test_flow_control_credits_add() {
213        let mut credits = FlowControlCredits::new(100);
214
215        credits.consume(50).unwrap();
216        assert_eq!(credits.available(), 50);
217
218        credits.add(30);
219        assert_eq!(credits.available(), 80);
220
221        // Cannot exceed max
222        credits.add(100);
223        assert_eq!(credits.available(), 100);
224    }
225
226    #[test]
227    fn test_flow_control_credits_reset() {
228        let mut credits = FlowControlCredits::new(100);
229
230        credits.consume(80).unwrap();
231        assert_eq!(credits.available(), 20);
232
233        credits.reset();
234        assert_eq!(credits.available(), 100);
235        assert!(!credits.is_exhausted());
236    }
237
238    #[test]
239    fn test_flow_control_credits_saturating_operations() {
240        let mut credits = FlowControlCredits::new(100);
241
242        // Consume all credits
243        credits.consume(100).unwrap();
244
245        // Try to consume more (should fail gracefully)
246        assert!(credits.consume(1).is_err());
247        assert_eq!(credits.available(), 0);
248
249        // Add back more than max
250        credits.add(usize::MAX);
251        assert_eq!(credits.available(), 100); // Should cap at max
252    }
253
254    #[test]
255    fn test_flow_control_credits_is_exhausted() {
256        let mut credits = FlowControlCredits::new(10);
257
258        assert!(!credits.is_exhausted());
259
260        credits.consume(10).unwrap();
261        assert!(credits.is_exhausted());
262
263        credits.add(1);
264        assert!(!credits.is_exhausted());
265    }
266
267    #[test]
268    fn test_flow_control_credits_error_message() {
269        let mut credits = FlowControlCredits::new(50);
270
271        let result = credits.consume(100);
272        assert!(result.is_err());
273
274        let error_msg = result.unwrap_err();
275        assert!(error_msg.contains("Insufficient credits"));
276        assert!(error_msg.contains("needed 100"));
277        assert!(error_msg.contains("available 50"));
278    }
279}