pjson_rs_domain/value_objects/
backpressure.rs1use serde::{Deserialize, Serialize};
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8pub enum BackpressureSignal {
9 #[default]
11 Ok,
12
13 SlowDown,
15
16 Pause,
18}
19
20impl BackpressureSignal {
21 pub fn should_pause(&self) -> bool {
23 matches!(self, BackpressureSignal::Pause)
24 }
25
26 pub fn should_throttle(&self) -> bool {
28 matches!(
29 self,
30 BackpressureSignal::SlowDown | BackpressureSignal::Pause
31 )
32 }
33
34 pub fn suggested_delay_ms(&self) -> u64 {
36 match self {
37 BackpressureSignal::Ok => 0,
38 BackpressureSignal::SlowDown => 100,
39 BackpressureSignal::Pause => u64::MAX, }
41 }
42}
43
44impl std::fmt::Display for BackpressureSignal {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 match self {
47 BackpressureSignal::Ok => write!(f, "OK"),
48 BackpressureSignal::SlowDown => write!(f, "SLOW_DOWN"),
49 BackpressureSignal::Pause => write!(f, "PAUSE"),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
59pub struct FlowControlCredits {
60 available: usize,
61 max_credits: usize,
62}
63
64impl FlowControlCredits {
65 pub fn new(max_credits: usize) -> Self {
67 Self {
68 available: max_credits,
69 max_credits,
70 }
71 }
72
73 pub fn has_credits(&self, required: usize) -> bool {
75 self.available >= required
76 }
77
78 pub fn consume(&mut self, amount: usize) -> Result<(), String> {
82 if self.available < amount {
83 return Err(format!(
84 "Insufficient credits: needed {}, available {}",
85 amount, self.available
86 ));
87 }
88 self.available = self.available.saturating_sub(amount);
89 Ok(())
90 }
91
92 pub fn add(&mut self, amount: usize) {
96 self.available = (self.available.saturating_add(amount)).min(self.max_credits);
97 }
98
99 pub fn available(&self) -> usize {
101 self.available
102 }
103
104 pub fn max_credits(&self) -> usize {
106 self.max_credits
107 }
108
109 pub fn is_exhausted(&self) -> bool {
111 self.available == 0
112 }
113
114 pub fn reset(&mut self) {
116 self.available = self.max_credits;
117 }
118}
119
120impl Default for FlowControlCredits {
121 fn default() -> Self {
122 Self::new(1000) }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_backpressure_signal_default() {
132 let signal = BackpressureSignal::default();
133 assert_eq!(signal, BackpressureSignal::Ok);
134 assert!(!signal.should_pause());
135 assert!(!signal.should_throttle());
136 }
137
138 #[test]
139 fn test_backpressure_signal_should_pause() {
140 assert!(BackpressureSignal::Pause.should_pause());
141 assert!(!BackpressureSignal::SlowDown.should_pause());
142 assert!(!BackpressureSignal::Ok.should_pause());
143 }
144
145 #[test]
146 fn test_backpressure_signal_should_throttle() {
147 assert!(BackpressureSignal::Pause.should_throttle());
148 assert!(BackpressureSignal::SlowDown.should_throttle());
149 assert!(!BackpressureSignal::Ok.should_throttle());
150 }
151
152 #[test]
153 fn test_backpressure_signal_suggested_delay() {
154 assert_eq!(BackpressureSignal::Ok.suggested_delay_ms(), 0);
155 assert_eq!(BackpressureSignal::SlowDown.suggested_delay_ms(), 100);
156 assert_eq!(BackpressureSignal::Pause.suggested_delay_ms(), u64::MAX);
157 }
158
159 #[test]
160 fn test_backpressure_signal_display() {
161 assert_eq!(BackpressureSignal::Ok.to_string(), "OK");
162 assert_eq!(BackpressureSignal::SlowDown.to_string(), "SLOW_DOWN");
163 assert_eq!(BackpressureSignal::Pause.to_string(), "PAUSE");
164 }
165
166 #[test]
167 fn test_backpressure_signal_serialization() {
168 let signal = BackpressureSignal::SlowDown;
169 let json = serde_json::to_string(&signal).unwrap();
170 let deserialized: BackpressureSignal = serde_json::from_str(&json).unwrap();
171 assert_eq!(signal, deserialized);
172 }
173
174 #[test]
175 fn test_flow_control_credits_new() {
176 let credits = FlowControlCredits::new(100);
177 assert_eq!(credits.available(), 100);
178 assert_eq!(credits.max_credits(), 100);
179 assert!(!credits.is_exhausted());
180 }
181
182 #[test]
183 fn test_flow_control_credits_default() {
184 let credits = FlowControlCredits::default();
185 assert_eq!(credits.available(), 1000);
186 assert_eq!(credits.max_credits(), 1000);
187 }
188
189 #[test]
190 fn test_flow_control_credits_has_credits() {
191 let credits = FlowControlCredits::new(100);
192 assert!(credits.has_credits(50));
193 assert!(credits.has_credits(100));
194 assert!(!credits.has_credits(101));
195 }
196
197 #[test]
198 fn test_flow_control_credits_consume() {
199 let mut credits = FlowControlCredits::new(100);
200
201 assert!(credits.consume(30).is_ok());
202 assert_eq!(credits.available(), 70);
203
204 assert!(credits.consume(70).is_ok());
205 assert_eq!(credits.available(), 0);
206 assert!(credits.is_exhausted());
207
208 assert!(credits.consume(1).is_err());
209 }
210
211 #[test]
212 fn test_flow_control_credits_add() {
213 let mut credits = FlowControlCredits::new(100);
214
215 credits.consume(50).unwrap();
216 assert_eq!(credits.available(), 50);
217
218 credits.add(30);
219 assert_eq!(credits.available(), 80);
220
221 credits.add(100);
223 assert_eq!(credits.available(), 100);
224 }
225
226 #[test]
227 fn test_flow_control_credits_reset() {
228 let mut credits = FlowControlCredits::new(100);
229
230 credits.consume(80).unwrap();
231 assert_eq!(credits.available(), 20);
232
233 credits.reset();
234 assert_eq!(credits.available(), 100);
235 assert!(!credits.is_exhausted());
236 }
237
238 #[test]
239 fn test_flow_control_credits_saturating_operations() {
240 let mut credits = FlowControlCredits::new(100);
241
242 credits.consume(100).unwrap();
244
245 assert!(credits.consume(1).is_err());
247 assert_eq!(credits.available(), 0);
248
249 credits.add(usize::MAX);
251 assert_eq!(credits.available(), 100); }
253
254 #[test]
255 fn test_flow_control_credits_is_exhausted() {
256 let mut credits = FlowControlCredits::new(10);
257
258 assert!(!credits.is_exhausted());
259
260 credits.consume(10).unwrap();
261 assert!(credits.is_exhausted());
262
263 credits.add(1);
264 assert!(!credits.is_exhausted());
265 }
266
267 #[test]
268 fn test_flow_control_credits_error_message() {
269 let mut credits = FlowControlCredits::new(50);
270
271 let result = credits.consume(100);
272 assert!(result.is_err());
273
274 let error_msg = result.unwrap_err();
275 assert!(error_msg.contains("Insufficient credits"));
276 assert!(error_msg.contains("needed 100"));
277 assert!(error_msg.contains("available 50"));
278 }
279}