pjson_rs_domain/value_objects/
backpressure.rs1use serde::{Deserialize, Serialize};
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8#[non_exhaustive]
9pub enum BackpressureSignal {
10 #[default]
12 Ok,
13
14 SlowDown,
16
17 Pause,
19}
20
21impl BackpressureSignal {
22 pub fn should_pause(&self) -> bool {
24 matches!(self, BackpressureSignal::Pause)
25 }
26
27 pub fn should_throttle(&self) -> bool {
29 matches!(
30 self,
31 BackpressureSignal::SlowDown | BackpressureSignal::Pause
32 )
33 }
34
35 pub fn suggested_delay_ms(&self) -> u64 {
37 match self {
38 BackpressureSignal::Ok => 0,
39 BackpressureSignal::SlowDown => 100,
40 BackpressureSignal::Pause => u64::MAX, }
42 }
43}
44
45impl std::fmt::Display for BackpressureSignal {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 BackpressureSignal::Ok => write!(f, "OK"),
49 BackpressureSignal::SlowDown => write!(f, "SLOW_DOWN"),
50 BackpressureSignal::Pause => write!(f, "PAUSE"),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
60pub struct FlowControlCredits {
61 available: usize,
62 max_credits: usize,
63}
64
65impl FlowControlCredits {
66 pub fn new(max_credits: usize) -> Self {
68 Self {
69 available: max_credits,
70 max_credits,
71 }
72 }
73
74 pub fn has_credits(&self, required: usize) -> bool {
76 self.available >= required
77 }
78
79 pub fn consume(&mut self, amount: usize) -> Result<(), String> {
83 if self.available < amount {
84 return Err(format!(
85 "Insufficient credits: needed {}, available {}",
86 amount, self.available
87 ));
88 }
89 self.available = self.available.saturating_sub(amount);
90 Ok(())
91 }
92
93 pub fn add(&mut self, amount: usize) {
97 self.available = (self.available.saturating_add(amount)).min(self.max_credits);
98 }
99
100 pub fn available(&self) -> usize {
102 self.available
103 }
104
105 pub fn max_credits(&self) -> usize {
107 self.max_credits
108 }
109
110 pub fn is_exhausted(&self) -> bool {
112 self.available == 0
113 }
114
115 pub fn reset(&mut self) {
117 self.available = self.max_credits;
118 }
119}
120
121impl Default for FlowControlCredits {
122 fn default() -> Self {
123 Self::new(1000) }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_backpressure_signal_default() {
133 let signal = BackpressureSignal::default();
134 assert_eq!(signal, BackpressureSignal::Ok);
135 assert!(!signal.should_pause());
136 assert!(!signal.should_throttle());
137 }
138
139 #[test]
140 fn test_backpressure_signal_should_pause() {
141 assert!(BackpressureSignal::Pause.should_pause());
142 assert!(!BackpressureSignal::SlowDown.should_pause());
143 assert!(!BackpressureSignal::Ok.should_pause());
144 }
145
146 #[test]
147 fn test_backpressure_signal_should_throttle() {
148 assert!(BackpressureSignal::Pause.should_throttle());
149 assert!(BackpressureSignal::SlowDown.should_throttle());
150 assert!(!BackpressureSignal::Ok.should_throttle());
151 }
152
153 #[test]
154 fn test_backpressure_signal_suggested_delay() {
155 assert_eq!(BackpressureSignal::Ok.suggested_delay_ms(), 0);
156 assert_eq!(BackpressureSignal::SlowDown.suggested_delay_ms(), 100);
157 assert_eq!(BackpressureSignal::Pause.suggested_delay_ms(), u64::MAX);
158 }
159
160 #[test]
161 fn test_backpressure_signal_display() {
162 assert_eq!(BackpressureSignal::Ok.to_string(), "OK");
163 assert_eq!(BackpressureSignal::SlowDown.to_string(), "SLOW_DOWN");
164 assert_eq!(BackpressureSignal::Pause.to_string(), "PAUSE");
165 }
166
167 #[test]
168 fn test_backpressure_signal_serialization() {
169 let signal = BackpressureSignal::SlowDown;
170 let json = serde_json::to_string(&signal).unwrap();
171 let deserialized: BackpressureSignal = serde_json::from_str(&json).unwrap();
172 assert_eq!(signal, deserialized);
173 }
174
175 #[test]
176 fn test_flow_control_credits_new() {
177 let credits = FlowControlCredits::new(100);
178 assert_eq!(credits.available(), 100);
179 assert_eq!(credits.max_credits(), 100);
180 assert!(!credits.is_exhausted());
181 }
182
183 #[test]
184 fn test_flow_control_credits_default() {
185 let credits = FlowControlCredits::default();
186 assert_eq!(credits.available(), 1000);
187 assert_eq!(credits.max_credits(), 1000);
188 }
189
190 #[test]
191 fn test_flow_control_credits_has_credits() {
192 let credits = FlowControlCredits::new(100);
193 assert!(credits.has_credits(50));
194 assert!(credits.has_credits(100));
195 assert!(!credits.has_credits(101));
196 }
197
198 #[test]
199 fn test_flow_control_credits_consume() {
200 let mut credits = FlowControlCredits::new(100);
201
202 assert!(credits.consume(30).is_ok());
203 assert_eq!(credits.available(), 70);
204
205 assert!(credits.consume(70).is_ok());
206 assert_eq!(credits.available(), 0);
207 assert!(credits.is_exhausted());
208
209 assert!(credits.consume(1).is_err());
210 }
211
212 #[test]
213 fn test_flow_control_credits_add() {
214 let mut credits = FlowControlCredits::new(100);
215
216 credits.consume(50).unwrap();
217 assert_eq!(credits.available(), 50);
218
219 credits.add(30);
220 assert_eq!(credits.available(), 80);
221
222 credits.add(100);
224 assert_eq!(credits.available(), 100);
225 }
226
227 #[test]
228 fn test_flow_control_credits_reset() {
229 let mut credits = FlowControlCredits::new(100);
230
231 credits.consume(80).unwrap();
232 assert_eq!(credits.available(), 20);
233
234 credits.reset();
235 assert_eq!(credits.available(), 100);
236 assert!(!credits.is_exhausted());
237 }
238
239 #[test]
240 fn test_flow_control_credits_saturating_operations() {
241 let mut credits = FlowControlCredits::new(100);
242
243 credits.consume(100).unwrap();
245
246 assert!(credits.consume(1).is_err());
248 assert_eq!(credits.available(), 0);
249
250 credits.add(usize::MAX);
252 assert_eq!(credits.available(), 100); }
254
255 #[test]
256 fn test_flow_control_credits_is_exhausted() {
257 let mut credits = FlowControlCredits::new(10);
258
259 assert!(!credits.is_exhausted());
260
261 credits.consume(10).unwrap();
262 assert!(credits.is_exhausted());
263
264 credits.add(1);
265 assert!(!credits.is_exhausted());
266 }
267
268 #[test]
269 fn test_flow_control_credits_error_message() {
270 let mut credits = FlowControlCredits::new(50);
271
272 let result = credits.consume(100);
273 assert!(result.is_err());
274
275 let error_msg = result.unwrap_err();
276 assert!(error_msg.contains("Insufficient credits"));
277 assert!(error_msg.contains("needed 100"));
278 assert!(error_msg.contains("available 50"));
279 }
280}