use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum BackpressureSignal {
#[default]
Ok,
SlowDown,
Pause,
}
impl BackpressureSignal {
pub fn should_pause(&self) -> bool {
matches!(self, BackpressureSignal::Pause)
}
pub fn should_throttle(&self) -> bool {
matches!(
self,
BackpressureSignal::SlowDown | BackpressureSignal::Pause
)
}
pub fn suggested_delay_ms(&self) -> u64 {
match self {
BackpressureSignal::Ok => 0,
BackpressureSignal::SlowDown => 100,
BackpressureSignal::Pause => u64::MAX, }
}
}
impl std::fmt::Display for BackpressureSignal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BackpressureSignal::Ok => write!(f, "OK"),
BackpressureSignal::SlowDown => write!(f, "SLOW_DOWN"),
BackpressureSignal::Pause => write!(f, "PAUSE"),
}
}
}
#[derive(Debug, Clone)]
pub struct FlowControlCredits {
available: usize,
max_credits: usize,
}
impl FlowControlCredits {
pub fn new(max_credits: usize) -> Self {
Self {
available: max_credits,
max_credits,
}
}
pub fn has_credits(&self, required: usize) -> bool {
self.available >= required
}
pub fn consume(&mut self, amount: usize) -> Result<(), String> {
if self.available < amount {
return Err(format!(
"Insufficient credits: needed {}, available {}",
amount, self.available
));
}
self.available = self.available.saturating_sub(amount);
Ok(())
}
pub fn add(&mut self, amount: usize) {
self.available = (self.available.saturating_add(amount)).min(self.max_credits);
}
pub fn available(&self) -> usize {
self.available
}
pub fn max_credits(&self) -> usize {
self.max_credits
}
pub fn is_exhausted(&self) -> bool {
self.available == 0
}
pub fn reset(&mut self) {
self.available = self.max_credits;
}
}
impl Default for FlowControlCredits {
fn default() -> Self {
Self::new(1000) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_backpressure_signal_default() {
let signal = BackpressureSignal::default();
assert_eq!(signal, BackpressureSignal::Ok);
assert!(!signal.should_pause());
assert!(!signal.should_throttle());
}
#[test]
fn test_backpressure_signal_should_pause() {
assert!(BackpressureSignal::Pause.should_pause());
assert!(!BackpressureSignal::SlowDown.should_pause());
assert!(!BackpressureSignal::Ok.should_pause());
}
#[test]
fn test_backpressure_signal_should_throttle() {
assert!(BackpressureSignal::Pause.should_throttle());
assert!(BackpressureSignal::SlowDown.should_throttle());
assert!(!BackpressureSignal::Ok.should_throttle());
}
#[test]
fn test_backpressure_signal_suggested_delay() {
assert_eq!(BackpressureSignal::Ok.suggested_delay_ms(), 0);
assert_eq!(BackpressureSignal::SlowDown.suggested_delay_ms(), 100);
assert_eq!(BackpressureSignal::Pause.suggested_delay_ms(), u64::MAX);
}
#[test]
fn test_backpressure_signal_display() {
assert_eq!(BackpressureSignal::Ok.to_string(), "OK");
assert_eq!(BackpressureSignal::SlowDown.to_string(), "SLOW_DOWN");
assert_eq!(BackpressureSignal::Pause.to_string(), "PAUSE");
}
#[test]
fn test_backpressure_signal_serialization() {
let signal = BackpressureSignal::SlowDown;
let json = serde_json::to_string(&signal).unwrap();
let deserialized: BackpressureSignal = serde_json::from_str(&json).unwrap();
assert_eq!(signal, deserialized);
}
#[test]
fn test_flow_control_credits_new() {
let credits = FlowControlCredits::new(100);
assert_eq!(credits.available(), 100);
assert_eq!(credits.max_credits(), 100);
assert!(!credits.is_exhausted());
}
#[test]
fn test_flow_control_credits_default() {
let credits = FlowControlCredits::default();
assert_eq!(credits.available(), 1000);
assert_eq!(credits.max_credits(), 1000);
}
#[test]
fn test_flow_control_credits_has_credits() {
let credits = FlowControlCredits::new(100);
assert!(credits.has_credits(50));
assert!(credits.has_credits(100));
assert!(!credits.has_credits(101));
}
#[test]
fn test_flow_control_credits_consume() {
let mut credits = FlowControlCredits::new(100);
assert!(credits.consume(30).is_ok());
assert_eq!(credits.available(), 70);
assert!(credits.consume(70).is_ok());
assert_eq!(credits.available(), 0);
assert!(credits.is_exhausted());
assert!(credits.consume(1).is_err());
}
#[test]
fn test_flow_control_credits_add() {
let mut credits = FlowControlCredits::new(100);
credits.consume(50).unwrap();
assert_eq!(credits.available(), 50);
credits.add(30);
assert_eq!(credits.available(), 80);
credits.add(100);
assert_eq!(credits.available(), 100);
}
#[test]
fn test_flow_control_credits_reset() {
let mut credits = FlowControlCredits::new(100);
credits.consume(80).unwrap();
assert_eq!(credits.available(), 20);
credits.reset();
assert_eq!(credits.available(), 100);
assert!(!credits.is_exhausted());
}
#[test]
fn test_flow_control_credits_saturating_operations() {
let mut credits = FlowControlCredits::new(100);
credits.consume(100).unwrap();
assert!(credits.consume(1).is_err());
assert_eq!(credits.available(), 0);
credits.add(usize::MAX);
assert_eq!(credits.available(), 100); }
#[test]
fn test_flow_control_credits_is_exhausted() {
let mut credits = FlowControlCredits::new(10);
assert!(!credits.is_exhausted());
credits.consume(10).unwrap();
assert!(credits.is_exhausted());
credits.add(1);
assert!(!credits.is_exhausted());
}
#[test]
fn test_flow_control_credits_error_message() {
let mut credits = FlowControlCredits::new(50);
let result = credits.consume(100);
assert!(result.is_err());
let error_msg = result.unwrap_err();
assert!(error_msg.contains("Insufficient credits"));
assert!(error_msg.contains("needed 100"));
assert!(error_msg.contains("available 50"));
}
}