rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::time::Duration;

use super::{
    BreakerCallError, BreakerConfig, BreakerPolicyConfig, BreakerState, CircuitBreaker,
    SharedCircuitBreaker, WindowConfig,
};

#[test]
fn opens_after_threshold() {
    let mut breaker = CircuitBreaker::new(2, Duration::from_secs(1));

    assert!(breaker.allow());
    breaker.record_failure();
    assert_eq!(breaker.state(), BreakerState::Closed);
    breaker.record_failure();

    assert_eq!(breaker.state(), BreakerState::Open);
    assert!(!breaker.allow());
}

#[test]
fn half_opens_after_reset_timeout() {
    let mut breaker = CircuitBreaker::new(1, Duration::from_millis(0));
    breaker.record_failure();

    assert_eq!(breaker.state(), BreakerState::HalfOpen);
    breaker.record_success();
    assert_eq!(breaker.state(), BreakerState::Closed);
}

#[tokio::test]
async fn shared_breaker_rejects_after_failures() {
    let breaker = SharedCircuitBreaker::new(BreakerConfig {
        failure_threshold: 1,
        reset_timeout: Duration::from_secs(5),
    });

    breaker.allow().await.expect("allow").record_failure().await;

    assert_eq!(breaker.state().await, BreakerState::Open);
    assert!(breaker.allow().await.is_err());
}

#[tokio::test]
async fn fallback_runs_when_breaker_is_open() {
    let breaker = SharedCircuitBreaker::new(BreakerConfig {
        failure_threshold: 1,
        reset_timeout: Duration::from_secs(5),
    });
    let _ = breaker.do_request(|| async { Err::<(), _>("down") }).await;

    let result = breaker
        .do_with_fallback(
            || async { Ok::<_, &str>("primary") },
            |_| async { Ok::<_, &str>("fallback") },
        )
        .await;

    assert_eq!(result, Ok("fallback"));
}

#[tokio::test]
async fn acceptable_error_does_not_open_breaker() {
    let breaker = SharedCircuitBreaker::new(BreakerConfig {
        failure_threshold: 1,
        reset_timeout: Duration::from_secs(5),
    });

    let result = breaker
        .do_with_acceptable(
            || async { Err::<(), _>("not-found") },
            |err| *err == "not-found",
        )
        .await;

    assert_eq!(result, Err(BreakerCallError::Inner("not-found")));
    assert_eq!(breaker.state().await, BreakerState::Closed);
}

#[tokio::test]
async fn rolling_policy_can_drop_before_consecutive_threshold() {
    let breaker = SharedCircuitBreaker::with_policy(
        BreakerConfig {
            failure_threshold: 100,
            reset_timeout: Duration::from_secs(5),
        },
        BreakerPolicyConfig {
            window: WindowConfig {
                buckets: 2,
                bucket_duration: Duration::from_secs(10),
            },
            min_request_count: 2,
            failure_ratio_percent: 50,
            drop_ratio_percent: 100,
            half_open_max_calls: 1,
            force_pass_interval: Duration::from_secs(5),
            ..BreakerPolicyConfig::default()
        },
    );

    let _ = breaker.do_request(|| async { Err::<(), _>("fail") }).await;
    let _ = breaker.do_request(|| async { Ok::<_, &str>(()) }).await;

    assert!(breaker.allow().await.is_err());
    assert_eq!(breaker.snapshot().await.window.drops, 1);
}

#[tokio::test]
async fn google_sre_policy_can_drop_when_total_outpaces_successes() {
    let breaker = SharedCircuitBreaker::with_policy(
        BreakerConfig {
            failure_threshold: 100,
            reset_timeout: Duration::from_secs(5),
        },
        BreakerPolicyConfig {
            window: WindowConfig {
                buckets: 2,
                bucket_duration: Duration::from_secs(10),
            },
            sre_rejection_enabled: true,
            sre_k_millis: 1100,
            sre_protection: 1,
            ..BreakerPolicyConfig::default()
        },
    );

    for _ in 0..8 {
        let _ = breaker.do_request(|| async { Err::<(), _>("fail") }).await;
    }
    let _ = breaker.do_request(|| async { Ok::<_, &str>(()) }).await;

    let mut dropped = false;
    for _ in 0..32 {
        if breaker.allow().await.is_err() {
            dropped = true;
            break;
        }
    }

    assert!(dropped);
}