rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use crate::resil::{
    BreakerConfig, BreakerError, BreakerPolicyConfig, BreakerSnapshot, BreakerState, RollingWindow,
    WindowOutcome,
};

#[derive(Debug)]
pub(crate) struct CircuitBreakerState {
    config: BreakerConfig,
    policy: BreakerPolicyConfig,
    state: BreakerState,
    failures: u32,
    opened_at: Option<std::time::Instant>,
    last_force_pass_at: Option<std::time::Instant>,
    half_open_in_flight: u32,
    decisions: u64,
    window: RollingWindow,
}

impl CircuitBreakerState {
    pub(crate) fn new(config: BreakerConfig, policy: BreakerPolicyConfig) -> Self {
        Self {
            window: RollingWindow::new(policy.window),
            config: BreakerConfig {
                failure_threshold: config.failure_threshold.max(1),
                reset_timeout: config.reset_timeout,
            },
            policy: BreakerPolicyConfig {
                failure_ratio_percent: policy.failure_ratio_percent.min(100),
                drop_ratio_percent: policy.drop_ratio_percent.min(100),
                half_open_max_calls: policy.half_open_max_calls.max(1),
                sre_k_millis: policy.sre_k_millis.max(1000),
                sre_protection: policy.sre_protection.max(1),
                ..policy
            },
            state: BreakerState::Closed,
            failures: 0,
            opened_at: None,
            last_force_pass_at: None,
            half_open_in_flight: 0,
            decisions: 0,
        }
    }

    pub(crate) fn state(&mut self) -> BreakerState {
        self.refresh_state();
        self.state
    }

    pub(crate) fn allow(&mut self) -> Result<(), BreakerError> {
        self.refresh_state();
        match self.state {
            BreakerState::Closed => self.allow_closed(),
            BreakerState::Open => {
                self.window.record(WindowOutcome::Drop);
                Err(BreakerError::Open)
            }
            BreakerState::HalfOpen => {
                if self.half_open_in_flight < self.policy.half_open_max_calls {
                    self.half_open_in_flight += 1;
                    Ok(())
                } else {
                    self.window.record(WindowOutcome::Drop);
                    Err(BreakerError::Open)
                }
            }
        }
    }

    pub(crate) fn record_success(&mut self) {
        self.window.record(WindowOutcome::Success);
        self.failures = 0;
        if self.state == BreakerState::HalfOpen {
            self.half_open_in_flight = self.half_open_in_flight.saturating_sub(1);
        }
        self.close();
    }

    pub(crate) fn record_failure(&mut self) {
        self.window.record(WindowOutcome::Failure);
        if self.state == BreakerState::HalfOpen {
            self.half_open_in_flight = self.half_open_in_flight.saturating_sub(1);
            self.open();
            return;
        }

        self.failures = self.failures.saturating_add(1);
        if self.failures >= self.config.failure_threshold || self.window_is_unhealthy() {
            self.open();
        }
    }

    pub(crate) fn snapshot(&mut self) -> BreakerSnapshot {
        BreakerSnapshot {
            state: self.state(),
            consecutive_failures: self.failures,
            half_open_in_flight: self.half_open_in_flight,
            window: self.window.snapshot(),
        }
    }

    fn allow_closed(&mut self) -> Result<(), BreakerError> {
        if self.policy.sre_rejection_enabled && self.should_drop_sre() {
            self.window.record(WindowOutcome::Drop);
            return Err(BreakerError::Dropped);
        }
        if self.window_is_unhealthy() && self.should_drop() {
            self.window.record(WindowOutcome::Drop);
            return Err(BreakerError::Dropped);
        }
        Ok(())
    }

    fn refresh_state(&mut self) {
        if self.state != BreakerState::Open {
            return;
        }

        let Some(opened_at) = self.opened_at else {
            self.state = BreakerState::HalfOpen;
            return;
        };

        if opened_at.elapsed() >= self.config.reset_timeout {
            self.state = BreakerState::HalfOpen;
            self.half_open_in_flight = 0;
            self.last_force_pass_at = Some(std::time::Instant::now());
            return;
        }

        if self.policy.force_pass_interval.is_zero() {
            return;
        }

        let force_pass_due = self
            .last_force_pass_at
            .map(|last| last.elapsed() >= self.policy.force_pass_interval)
            .unwrap_or_else(|| opened_at.elapsed() >= self.policy.force_pass_interval);
        if force_pass_due {
            self.state = BreakerState::HalfOpen;
            self.half_open_in_flight = 0;
            self.last_force_pass_at = Some(std::time::Instant::now());
        }
    }

    fn window_is_unhealthy(&self) -> bool {
        let snapshot = self.window.snapshot();
        snapshot.successes + snapshot.failures >= self.policy.min_request_count
            && snapshot.failure_ratio() * 100.0 >= self.policy.failure_ratio_percent as f64
    }

    fn should_drop(&mut self) -> bool {
        self.decisions = self.decisions.wrapping_add(1);
        let bucket = self.decisions.wrapping_mul(37) % 100;
        bucket < u64::from(self.policy.drop_ratio_percent)
    }

    fn should_drop_sre(&mut self) -> bool {
        let snapshot = self.window.snapshot();
        let accepts = snapshot.successes;
        let total = snapshot.successes + snapshot.failures + snapshot.drops;
        if total <= self.policy.sre_protection || accepts == 0 {
            return false;
        }

        let weighted_accepts = self.policy.sre_k_millis as f64 / 1000.0 * accepts as f64;
        let drop_ratio = (total.saturating_sub(self.policy.sre_protection) as f64
            - weighted_accepts)
            / (total + 1) as f64;
        if drop_ratio <= 0.0 {
            return false;
        }

        self.decisions = self.decisions.wrapping_add(1);
        let bucket = (self.decisions.wrapping_mul(37) % 10_000) as f64 / 10_000.0;
        bucket < drop_ratio.min(1.0)
    }

    fn open(&mut self) {
        self.state = BreakerState::Open;
        self.opened_at = Some(std::time::Instant::now());
        self.last_force_pass_at = None;
        self.half_open_in_flight = 0;
    }

    fn close(&mut self) {
        self.state = BreakerState::Closed;
        self.opened_at = None;
        self.last_force_pass_at = None;
        self.half_open_in_flight = 0;
    }
}