1use serde::{Deserialize, Serialize};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum BackpressureStrategy {
24 Drop,
26 Block,
28 Throttle {
30 rate_hz: f64,
32 },
33 SpillToDisk {
35 path: String,
37 },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct BackpressureConfig {
47 pub strategy: BackpressureStrategy,
49 pub high_watermark: usize,
51 pub low_watermark: usize,
53 pub window_ms: u64,
55}
56
57impl Default for BackpressureConfig {
58 fn default() -> Self {
59 Self {
60 strategy: BackpressureStrategy::Drop,
61 high_watermark: 1000,
62 low_watermark: 500,
63 window_ms: 1000,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct BackpressureStats {
75 pub items_dropped: u64,
77 pub items_throttled: u64,
79 pub current_queue_depth: usize,
81 pub peak_queue_depth: usize,
83 pub backpressure_events: u64,
85}
86
87#[derive(Debug, Clone, PartialEq)]
93pub enum BackpressureDecision {
94 Accept,
96 Drop,
98 ThrottleDelay(u64),
100}
101
102#[derive(Debug)]
108pub struct BackpressureController {
109 config: BackpressureConfig,
110 queue_depth: usize,
112 stats: BackpressureStats,
113 throttle_tokens: f64,
115 last_tick_ms: u64,
117 in_backpressure: bool,
119}
120
121impl BackpressureController {
122 pub fn new(config: BackpressureConfig) -> Self {
124 let initial_tokens = if let BackpressureStrategy::Throttle { rate_hz } = config.strategy {
125 rate_hz.max(0.0)
126 } else {
127 0.0
128 };
129 Self {
130 config,
131 queue_depth: 0,
132 stats: BackpressureStats::default(),
133 throttle_tokens: initial_tokens,
134 last_tick_ms: 0,
135 in_backpressure: false,
136 }
137 }
138
139 pub fn try_accept(&mut self, now_ms: u64) -> BackpressureDecision {
143 self.replenish_tokens(now_ms);
145
146 let above_high = self.is_above_high_watermark();
147 let was_in_backpressure = self.in_backpressure;
148
149 if above_high && !was_in_backpressure {
150 self.in_backpressure = true;
151 self.stats.backpressure_events += 1;
152 } else if self.is_below_low_watermark() {
153 self.in_backpressure = false;
154 }
155
156 if !self.in_backpressure {
157 self.queue_depth += 1;
159 if self.queue_depth > self.stats.peak_queue_depth {
160 self.stats.peak_queue_depth = self.queue_depth;
161 }
162 self.stats.current_queue_depth = self.queue_depth;
163 return BackpressureDecision::Accept;
164 }
165
166 match &self.config.strategy {
168 BackpressureStrategy::Drop => {
169 self.stats.items_dropped += 1;
170 BackpressureDecision::Drop
171 }
172 BackpressureStrategy::Block => {
173 BackpressureDecision::ThrottleDelay(u64::MAX)
175 }
176 BackpressureStrategy::Throttle { rate_hz } => {
177 if self.throttle_tokens >= 1.0 {
178 self.throttle_tokens -= 1.0;
179 self.queue_depth += 1;
180 if self.queue_depth > self.stats.peak_queue_depth {
181 self.stats.peak_queue_depth = self.queue_depth;
182 }
183 self.stats.current_queue_depth = self.queue_depth;
184 BackpressureDecision::Accept
185 } else {
186 let delay_ms = if *rate_hz > 0.0 {
188 ((1.0 - self.throttle_tokens) / rate_hz * 1000.0).ceil() as u64
189 } else {
190 u64::MAX
191 };
192 self.stats.items_throttled += 1;
193 BackpressureDecision::ThrottleDelay(delay_ms)
194 }
195 }
196 BackpressureStrategy::SpillToDisk { .. } => {
197 self.stats.items_dropped += 1;
199 BackpressureDecision::Drop
200 }
201 }
202 }
203
204 pub fn record_dequeue(&mut self) {
206 self.queue_depth = self.queue_depth.saturating_sub(1);
207 self.stats.current_queue_depth = self.queue_depth;
208 }
209
210 pub fn stats(&self) -> &BackpressureStats {
212 &self.stats
213 }
214
215 pub fn reset_stats(&mut self) {
218 let current = self.queue_depth;
219 self.stats = BackpressureStats {
220 current_queue_depth: current,
221 peak_queue_depth: current,
222 ..Default::default()
223 };
224 }
225
226 pub fn is_above_high_watermark(&self) -> bool {
228 self.queue_depth >= self.config.high_watermark
229 }
230
231 pub fn is_below_low_watermark(&self) -> bool {
233 self.queue_depth <= self.config.low_watermark
234 }
235
236 pub fn replenish_tokens(&mut self, now_ms: u64) {
240 if let BackpressureStrategy::Throttle { rate_hz } = self.config.strategy {
241 if self.last_tick_ms == 0 {
242 self.last_tick_ms = now_ms;
243 return;
244 }
245 let elapsed_ms = now_ms.saturating_sub(self.last_tick_ms);
246 let new_tokens = rate_hz * (elapsed_ms as f64 / 1000.0);
247 self.throttle_tokens = (self.throttle_tokens + new_tokens).min(rate_hz);
248 self.last_tick_ms = now_ms;
249 }
250 }
251
252 pub fn current_depth(&self) -> usize {
254 self.queue_depth
255 }
256}
257
258#[cfg(test)]
263mod tests {
264 use super::*;
265
266 fn drop_config(high: usize, low: usize) -> BackpressureConfig {
267 BackpressureConfig {
268 strategy: BackpressureStrategy::Drop,
269 high_watermark: high,
270 low_watermark: low,
271 window_ms: 1000,
272 }
273 }
274
275 fn throttle_config(rate_hz: f64, high: usize, low: usize) -> BackpressureConfig {
276 BackpressureConfig {
277 strategy: BackpressureStrategy::Throttle { rate_hz },
278 high_watermark: high,
279 low_watermark: low,
280 window_ms: 1000,
281 }
282 }
283
284 #[test]
287 fn test_new_default_config() {
288 let ctrl = BackpressureController::new(BackpressureConfig::default());
289 assert_eq!(ctrl.current_depth(), 0);
290 assert_eq!(ctrl.stats().items_dropped, 0);
291 assert!(!ctrl.is_above_high_watermark());
292 assert!(ctrl.is_below_low_watermark());
293 }
294
295 #[test]
296 fn test_new_throttle_sets_initial_tokens() {
297 let ctrl = BackpressureController::new(throttle_config(100.0, 50, 25));
298 assert!((ctrl.throttle_tokens - 100.0).abs() < 1e-9);
299 }
300
301 #[test]
304 fn test_accept_below_high_watermark() {
305 let mut ctrl = BackpressureController::new(drop_config(10, 5));
306 for i in 0..9 {
307 let decision = ctrl.try_accept(i as u64 * 10);
308 assert_eq!(decision, BackpressureDecision::Accept, "step {i}");
309 }
310 assert_eq!(ctrl.current_depth(), 9);
311 }
312
313 #[test]
316 fn test_drop_at_high_watermark() {
317 let mut ctrl = BackpressureController::new(drop_config(3, 1));
318 ctrl.try_accept(0);
319 ctrl.try_accept(1);
320 ctrl.try_accept(2); let decision = ctrl.try_accept(3);
322 assert_eq!(decision, BackpressureDecision::Drop);
323 assert_eq!(ctrl.stats().items_dropped, 1);
324 }
325
326 #[test]
327 fn test_drop_increments_counter() {
328 let mut ctrl = BackpressureController::new(drop_config(2, 1));
329 ctrl.try_accept(0);
330 ctrl.try_accept(1); ctrl.try_accept(2);
332 ctrl.try_accept(3);
333 assert_eq!(ctrl.stats().items_dropped, 2);
334 }
335
336 #[test]
337 fn test_backpressure_event_counted() {
338 let mut ctrl = BackpressureController::new(drop_config(2, 1));
339 ctrl.try_accept(0);
340 ctrl.try_accept(1);
341 ctrl.try_accept(2); ctrl.try_accept(3); assert_eq!(ctrl.stats().backpressure_events, 1);
344 }
345
346 #[test]
349 fn test_dequeue_decrements_depth() {
350 let mut ctrl = BackpressureController::new(drop_config(10, 2));
351 ctrl.try_accept(0);
352 ctrl.try_accept(1);
353 ctrl.try_accept(2);
354 ctrl.record_dequeue();
355 assert_eq!(ctrl.current_depth(), 2);
356 assert_eq!(ctrl.stats().current_queue_depth, 2);
357 }
358
359 #[test]
360 fn test_dequeue_saturates_at_zero() {
361 let mut ctrl = BackpressureController::new(drop_config(10, 2));
362 ctrl.record_dequeue(); assert_eq!(ctrl.current_depth(), 0);
364 }
365
366 #[test]
367 fn test_recovery_after_dequeue() {
368 let mut ctrl = BackpressureController::new(drop_config(3, 1));
369 ctrl.try_accept(0);
371 ctrl.try_accept(1);
372 ctrl.try_accept(2); ctrl.record_dequeue(); ctrl.record_dequeue(); let decision = ctrl.try_accept(100);
378 assert_eq!(decision, BackpressureDecision::Accept);
379 }
380
381 #[test]
384 fn test_peak_depth_tracked() {
385 let mut ctrl = BackpressureController::new(drop_config(20, 5));
386 for i in 0..10u64 {
387 ctrl.try_accept(i);
388 }
389 for _ in 0..5 {
390 ctrl.record_dequeue();
391 }
392 assert_eq!(ctrl.stats().peak_queue_depth, 10);
393 assert_eq!(ctrl.current_depth(), 5);
394 }
395
396 #[test]
399 fn test_reset_stats_clears_counters() {
400 let mut ctrl = BackpressureController::new(drop_config(2, 1));
401 ctrl.try_accept(0);
402 ctrl.try_accept(1);
403 ctrl.try_accept(2); ctrl.try_accept(3);
405 ctrl.reset_stats();
406 assert_eq!(ctrl.stats().items_dropped, 0);
407 assert_eq!(ctrl.stats().backpressure_events, 0);
408 assert_eq!(ctrl.stats().items_throttled, 0);
409 assert_eq!(ctrl.stats().current_queue_depth, ctrl.current_depth());
411 }
412
413 #[test]
414 fn test_reset_stats_preserves_depth() {
415 let mut ctrl = BackpressureController::new(drop_config(10, 2));
416 ctrl.try_accept(0);
417 ctrl.try_accept(1);
418 ctrl.try_accept(2);
419 ctrl.reset_stats();
420 assert_eq!(ctrl.current_depth(), 3);
421 }
422
423 #[test]
426 fn test_above_high_watermark() {
427 let mut ctrl = BackpressureController::new(drop_config(5, 2));
428 for i in 0..5u64 {
429 ctrl.try_accept(i);
430 }
431 assert!(ctrl.is_above_high_watermark());
432 }
433
434 #[test]
435 fn test_below_low_watermark() {
436 let ctrl = BackpressureController::new(drop_config(5, 2));
437 assert!(ctrl.is_below_low_watermark());
438 }
439
440 #[test]
441 fn test_between_watermarks() {
442 let mut ctrl = BackpressureController::new(drop_config(10, 2));
443 for i in 0..5u64 {
444 ctrl.try_accept(i);
445 }
446 assert!(!ctrl.is_above_high_watermark());
447 assert!(!ctrl.is_below_low_watermark());
448 }
449
450 #[test]
453 fn test_throttle_accept_when_tokens_available() {
454 let mut ctrl = BackpressureController::new(throttle_config(10.0, 2, 1));
455 let d = ctrl.try_accept(0);
457 assert_eq!(d, BackpressureDecision::Accept);
458 }
459
460 #[test]
461 fn test_throttle_delay_when_no_tokens() {
462 let mut ctrl = BackpressureController::new(throttle_config(1.0, 2, 1));
463 ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0);
469 let decision = ctrl.try_accept(0);
471 assert!(
472 matches!(decision, BackpressureDecision::ThrottleDelay(_)),
473 "expected ThrottleDelay, got {decision:?}"
474 );
475 assert!(ctrl.stats().items_throttled > 0);
476 }
477
478 #[test]
479 fn test_throttle_replenish_over_time() {
480 let mut ctrl = BackpressureController::new(throttle_config(10.0, 2, 1));
481 ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(1000);
488 let decision = ctrl.try_accept(1000);
490 assert_ne!(decision, BackpressureDecision::Drop);
491 }
492
493 #[test]
494 fn test_replenish_tokens_noop_without_throttle() {
495 let mut ctrl = BackpressureController::new(drop_config(10, 2));
496 let tokens_before = ctrl.throttle_tokens;
497 ctrl.replenish_tokens(5000);
498 assert!((ctrl.throttle_tokens - tokens_before).abs() < 1e-9);
499 }
500
501 #[test]
502 fn test_replenish_tokens_first_tick() {
503 let mut ctrl = BackpressureController::new(throttle_config(10.0, 10, 5));
504 ctrl.replenish_tokens(1000); assert_eq!(ctrl.last_tick_ms, 1000);
506 }
507
508 #[test]
509 fn test_replenish_tokens_capped_at_rate_hz() {
510 let mut ctrl = BackpressureController::new(throttle_config(5.0, 10, 5));
511 ctrl.last_tick_ms = 1;
512 ctrl.replenish_tokens(100_000); assert!(ctrl.throttle_tokens <= 5.0 + 1e-9);
514 }
515
516 #[test]
519 fn test_block_strategy_returns_max_delay() {
520 let config = BackpressureConfig {
521 strategy: BackpressureStrategy::Block,
522 high_watermark: 2,
523 low_watermark: 1,
524 window_ms: 100,
525 };
526 let mut ctrl = BackpressureController::new(config);
527 ctrl.try_accept(0); ctrl.try_accept(0); let d = ctrl.try_accept(0);
530 assert_eq!(d, BackpressureDecision::ThrottleDelay(u64::MAX));
531 }
532
533 #[test]
536 fn test_spill_to_disk_records_as_dropped() {
537 let config = BackpressureConfig {
538 strategy: BackpressureStrategy::SpillToDisk {
539 path: "/tmp/spill.bin".to_string(),
540 },
541 high_watermark: 2,
542 low_watermark: 1,
543 window_ms: 100,
544 };
545 let mut ctrl = BackpressureController::new(config);
546 ctrl.try_accept(0);
547 ctrl.try_accept(0); let d = ctrl.try_accept(0);
549 assert_eq!(d, BackpressureDecision::Drop);
550 assert_eq!(ctrl.stats().items_dropped, 1);
551 }
552
553 #[test]
556 fn test_current_depth_tracks_accept_and_dequeue() {
557 let mut ctrl = BackpressureController::new(drop_config(100, 50));
558 assert_eq!(ctrl.current_depth(), 0);
559 ctrl.try_accept(0);
560 ctrl.try_accept(1);
561 assert_eq!(ctrl.current_depth(), 2);
562 ctrl.record_dequeue();
563 assert_eq!(ctrl.current_depth(), 1);
564 }
565
566 #[test]
569 fn test_zero_rate_throttle_returns_max_delay() {
570 let mut ctrl = BackpressureController::new(throttle_config(0.0, 2, 1));
571 ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0); let d = ctrl.try_accept(0);
576 assert!(matches!(d, BackpressureDecision::ThrottleDelay(_)));
577 }
578
579 #[test]
580 fn test_high_watermark_equals_one() {
581 let mut ctrl = BackpressureController::new(drop_config(1, 0));
582 let d = ctrl.try_accept(0); assert_eq!(d, BackpressureDecision::Accept);
584 let d2 = ctrl.try_accept(0);
586 assert_eq!(d2, BackpressureDecision::Drop);
587 }
588
589 #[test]
590 fn test_stats_ref_is_consistent() {
591 let mut ctrl = BackpressureController::new(drop_config(5, 2));
592 ctrl.try_accept(0);
593 ctrl.try_accept(1);
594 let s = ctrl.stats();
595 assert_eq!(s.current_queue_depth, 2);
596 assert_eq!(s.peak_queue_depth, 2);
597 assert_eq!(s.items_dropped, 0);
598 }
599
600 #[test]
601 fn test_multiple_backpressure_cycles() {
602 let mut ctrl = BackpressureController::new(drop_config(3, 1));
604 ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0); ctrl.record_dequeue(); ctrl.record_dequeue(); ctrl.try_accept(0); ctrl.try_accept(0); ctrl.try_accept(0); assert_eq!(ctrl.stats().backpressure_events, 2);
617 }
618}