use parking_lot::RwLock;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::time::Duration;
use crate::types::Time;
pub const MAX_HALF_OPEN_PROBES: u32 = 0xFFFF;
pub const MIN_HALF_OPEN_PROBES: u32 = 1;
const fn normalize_half_open_max_probes(max_probes: u32) -> u32 {
if max_probes == 0 {
MIN_HALF_OPEN_PROBES
} else if max_probes > MAX_HALF_OPEN_PROBES {
MAX_HALF_OPEN_PROBES
} else {
max_probes
}
}
#[derive(Clone)]
pub struct CircuitBreakerPolicy {
pub name: String,
pub failure_threshold: u32,
pub success_threshold: u32,
pub open_duration: Duration,
pub half_open_max_probes: u32,
pub failure_predicate: FailurePredicate,
pub sliding_window: Option<SlidingWindowConfig>,
pub on_state_change: Option<StateChangeCallback>,
}
impl fmt::Debug for CircuitBreakerPolicy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CircuitBreakerPolicy")
.field("name", &self.name)
.field("failure_threshold", &self.failure_threshold)
.field("success_threshold", &self.success_threshold)
.field("open_duration", &self.open_duration)
.field("half_open_max_probes", &self.half_open_max_probes)
.field("failure_predicate", &self.failure_predicate)
.field("sliding_window", &self.sliding_window)
.field("on_state_change", &self.on_state_change.is_some())
.finish()
}
}
#[derive(Clone)]
pub enum FailurePredicate {
AllErrors,
ByType(fn(&str) -> bool),
Custom(Arc<dyn Fn(&str) -> bool + Send + Sync>),
}
impl fmt::Debug for FailurePredicate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AllErrors => write!(f, "AllErrors"),
Self::ByType(_) => write!(f, "ByType(...)"),
Self::Custom(_) => write!(f, "Custom(...)"),
}
}
}
impl FailurePredicate {
fn is_failure(&self, error: &str) -> bool {
match self {
Self::AllErrors => true,
Self::ByType(pred) => pred(error),
Self::Custom(pred) => pred(error),
}
}
fn is_all_errors(&self) -> bool {
matches!(self, Self::AllErrors)
}
}
#[derive(Clone, Debug)]
pub struct SlidingWindowConfig {
pub window_duration: Duration,
pub minimum_calls: u32,
pub failure_rate_threshold: f64,
}
pub type StateChangeCallback = Arc<dyn Fn(State, State, &CircuitBreakerMetrics) + Send + Sync>;
impl Default for CircuitBreakerPolicy {
fn default() -> Self {
Self {
name: "default".into(),
failure_threshold: 5,
success_threshold: 2,
open_duration: Duration::from_secs(30),
half_open_max_probes: 1,
failure_predicate: FailurePredicate::AllErrors,
sliding_window: None,
on_state_change: None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum State {
Closed {
failures: u32,
},
Open {
since_millis: u64,
},
HalfOpen {
epoch: u32,
probes_active: u32,
successes: u32,
},
}
impl Default for State {
fn default() -> Self {
Self::Closed { failures: 0 }
}
}
impl State {
fn to_bits(self) -> u64 {
match self {
Self::Closed { failures } => u64::from(failures) << 8,
Self::Open { since_millis } => 1 | (since_millis << 8),
Self::HalfOpen {
epoch,
probes_active,
successes,
} => {
2 | ((u64::from(epoch) & 0xFF_FFFF) << 8)
| ((u64::from(probes_active) & 0xFFFF) << 32)
| ((u64::from(successes) & 0xFFFF) << 48)
}
}
}
fn from_bits(bits: u64) -> Self {
let state_type = bits & 0xFF;
match state_type {
1 => Self::Open {
since_millis: bits >> 8,
},
2 => Self::HalfOpen {
epoch: ((bits >> 8) & 0xFF_FFFF) as u32,
probes_active: ((bits >> 32) & 0xFFFF) as u32,
successes: ((bits >> 48) & 0xFFFF) as u32,
},
_ => Self::Closed {
failures: (bits >> 8) as u32,
},
}
}
}
struct SlidingWindow {
config: SlidingWindowConfig,
entries: VecDeque<(u64, bool)>,
success_count: u32,
failure_count: u32,
}
impl SlidingWindow {
fn new(config: SlidingWindowConfig) -> Self {
Self {
config,
entries: VecDeque::with_capacity(1024),
success_count: 0,
failure_count: 0,
}
}
fn cleanup(&mut self, now_millis: u64) {
let window_span =
u64::try_from(self.config.window_duration.as_millis()).unwrap_or(u64::MAX);
let window_start = now_millis.saturating_sub(window_span);
while let Some((ts, is_failure)) = self.entries.front() {
if *ts < window_start {
if *is_failure {
self.failure_count = self.failure_count.saturating_sub(1);
} else {
self.success_count = self.success_count.saturating_sub(1);
}
self.entries.pop_front();
} else {
break;
}
}
}
fn record_success(&mut self, now_millis: u64) {
self.cleanup(now_millis);
self.entries.push_back((now_millis, false));
self.success_count = self.success_count.saturating_add(1);
}
fn record_failure(&mut self, now_millis: u64) {
self.cleanup(now_millis);
self.entries.push_back((now_millis, true));
self.failure_count = self.failure_count.saturating_add(1);
}
fn failure_rate(&self) -> f64 {
let total = self.success_count.saturating_add(self.failure_count);
if total == 0 {
return 0.0;
}
f64::from(self.failure_count) / f64::from(total)
}
fn should_open(&self) -> bool {
let total = self.success_count.saturating_add(self.failure_count);
if total < self.config.minimum_calls {
return false;
}
self.failure_rate() >= self.config.failure_rate_threshold
}
fn reset(&mut self) {
self.entries.clear();
self.success_count = 0;
self.failure_count = 0;
}
}
#[derive(Clone, Debug, Default)]
pub struct CircuitBreakerMetrics {
pub total_success: u64,
pub total_failure: u64,
pub total_rejected: u64,
pub total_ignored_errors: u64,
pub times_opened: u64,
pub times_closed: u64,
pub current_failure_streak: u32,
pub current_state: State,
pub sliding_window_failure_rate: Option<f64>,
}
pub struct CircuitBreaker {
policy: CircuitBreakerPolicy,
state_bits: AtomicU64,
metrics: RwLock<CircuitBreakerMetrics>,
sliding_window: Option<RwLock<SlidingWindow>>,
total_success: AtomicU64,
total_failure: AtomicU64,
total_rejected: AtomicU64,
total_ignored_errors: AtomicU64,
current_failure_streak: AtomicU32,
times_opened: AtomicU64,
times_closed: AtomicU64,
}
impl CircuitBreaker {
#[must_use]
pub fn new(mut policy: CircuitBreakerPolicy) -> Self {
policy.half_open_max_probes = normalize_half_open_max_probes(policy.half_open_max_probes);
policy.success_threshold = policy.success_threshold.clamp(1, 0xFFFF);
policy.failure_threshold = policy.failure_threshold.max(1);
let sliding_window = policy
.sliding_window
.as_ref()
.map(|config| RwLock::new(SlidingWindow::new(config.clone())));
Self {
policy,
state_bits: AtomicU64::new(State::default().to_bits()),
metrics: RwLock::new(CircuitBreakerMetrics::default()),
sliding_window,
total_success: AtomicU64::new(0),
total_failure: AtomicU64::new(0),
total_rejected: AtomicU64::new(0),
total_ignored_errors: AtomicU64::new(0),
current_failure_streak: AtomicU32::new(0),
times_opened: AtomicU64::new(0),
times_closed: AtomicU64::new(0),
}
}
#[must_use]
pub fn state(&self) -> State {
State::from_bits(self.state_bits.load(Ordering::Acquire))
}
#[must_use]
pub fn metrics(&self) -> CircuitBreakerMetrics {
CircuitBreakerMetrics {
total_success: self.total_success.load(Ordering::Relaxed),
total_failure: self.total_failure.load(Ordering::Relaxed),
total_rejected: self.total_rejected.load(Ordering::Relaxed),
total_ignored_errors: self.total_ignored_errors.load(Ordering::Relaxed),
times_opened: self.times_opened.load(Ordering::Relaxed),
times_closed: self.times_closed.load(Ordering::Relaxed),
current_failure_streak: self.current_failure_streak.load(Ordering::Relaxed),
current_state: self.state(),
sliding_window_failure_rate: self
.sliding_window
.as_ref()
.map(|w| w.read().failure_rate()),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.policy.name
}
pub fn should_allow(&self, now: Time) -> Result<Permit, CircuitBreakerError<()>> {
let now_millis = now.as_millis();
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::Closed { .. } => {
return Ok(Permit::Normal);
}
State::Open { since_millis } => {
let elapsed = Duration::from_millis(now_millis.saturating_sub(since_millis));
if elapsed >= self.policy.open_duration {
let epoch = (self.times_opened.load(Ordering::Relaxed) & 0xFF_FFFF) as u32;
let new_state = State::HalfOpen {
epoch,
probes_active: 1,
successes: 0,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
let callback_metrics = self.update_state_metrics(state, new_state);
if let Some(ref cb) = self.policy.on_state_change {
cb(state, new_state, &callback_metrics);
}
return Ok(Permit::Probe { epoch });
}
Err(actual) => {
current_bits = actual;
continue;
}
}
}
self.total_rejected.fetch_add(1, Ordering::Relaxed);
let remaining = self
.policy
.open_duration
.checked_sub(elapsed)
.unwrap_or(Duration::ZERO);
return Err(CircuitBreakerError::Open { remaining });
}
State::HalfOpen {
epoch,
probes_active,
successes,
} => {
if probes_active < self.policy.half_open_max_probes {
let new_state = State::HalfOpen {
epoch,
probes_active: probes_active + 1,
successes,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
return Ok(Permit::Probe { epoch });
}
Err(actual) => {
current_bits = actual;
continue;
}
}
}
self.total_rejected.fetch_add(1, Ordering::Relaxed);
return Err(CircuitBreakerError::HalfOpenFull);
}
}
}
}
#[allow(clippy::significant_drop_tightening, clippy::too_many_lines)]
pub fn record_success(&self, permit: Permit, now: Time) {
let now_millis = now.as_millis();
self.total_success.fetch_add(1, Ordering::Relaxed);
if permit == Permit::Normal {
let current_bits = self.state_bits.load(Ordering::Acquire);
if State::from_bits(current_bits) == (State::Closed { failures: 0 }) {
self.check_sliding_window_success(now_millis);
return;
}
}
let callback_event = match permit {
Permit::Normal => {
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::Closed { failures } if failures > 0 => {
let new_state = State::Closed { failures: 0 };
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.current_failure_streak.store(0, Ordering::Relaxed);
break;
}
Err(actual) => current_bits = actual,
}
}
_ => break,
}
}
None
}
Permit::Probe {
epoch: permit_epoch,
} => {
let mut event = None;
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::HalfOpen {
epoch,
probes_active,
successes,
} if epoch == permit_epoch => {
let new_successes = successes.saturating_add(1);
if new_successes >= self.policy.success_threshold {
let new_state = State::Closed { failures: 0 };
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.current_failure_streak.store(0, Ordering::Relaxed);
self.times_closed.fetch_add(1, Ordering::Relaxed);
let mut m = self.metrics.write();
m.current_state = new_state;
if self.policy.on_state_change.is_some() {
self.populate_metrics_snapshot(&mut m);
event = Some((state, new_state, m.clone()));
}
break;
}
Err(actual) => current_bits = actual,
}
} else {
let new_state = State::HalfOpen {
epoch,
probes_active: probes_active.saturating_sub(1),
successes: new_successes,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => current_bits = actual,
}
}
}
_ => break,
}
}
event
}
};
if let Some((from, to, m)) = callback_event {
if let Some(ref cb) = self.policy.on_state_change {
cb(from, to, &m);
}
}
self.check_sliding_window_success(now_millis);
}
fn update_state_metrics(&self, _old: State, new: State) -> CircuitBreakerMetrics {
let mut m = self.metrics.write();
m.current_state = new;
self.populate_metrics_snapshot(&mut m);
m.clone()
}
fn populate_metrics_snapshot(&self, m: &mut CircuitBreakerMetrics) {
m.total_success = self.total_success.load(Ordering::Relaxed);
m.total_failure = self.total_failure.load(Ordering::Relaxed);
m.total_rejected = self.total_rejected.load(Ordering::Relaxed);
m.total_ignored_errors = self.total_ignored_errors.load(Ordering::Relaxed);
m.current_failure_streak = self.current_failure_streak.load(Ordering::Relaxed);
m.times_opened = self.times_opened.load(Ordering::Relaxed);
m.times_closed = self.times_closed.load(Ordering::Relaxed);
}
fn check_sliding_window_success(&self, now_millis: u64) {
let window_triggered = self.sliding_window.as_ref().is_some_and(|window| {
let mut w = window.write();
w.record_success(now_millis);
w.should_open()
});
if window_triggered {
self.trigger_open_from_window(now_millis);
}
}
fn trigger_open_from_window(&self, now_millis: u64) {
let mut event = None;
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
if !matches!(state, State::Closed { .. }) {
break;
}
let new_state = State::Open {
since_millis: now_millis,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.times_opened.fetch_add(1, Ordering::Relaxed);
let mut m = self.metrics.write();
m.current_state = new_state;
if let Some(ref w) = self.sliding_window {
w.write().reset();
}
if self.policy.on_state_change.is_some() {
self.populate_metrics_snapshot(&mut m);
event = Some((state, new_state, m.clone()));
}
drop(m);
break;
}
Err(actual) => current_bits = actual,
}
}
if let Some((from, to, m)) = event {
if let Some(ref cb) = self.policy.on_state_change {
cb(from, to, &m);
}
}
}
#[allow(clippy::significant_drop_tightening, clippy::too_many_lines)]
pub fn record_failure(&self, permit: Permit, error: &str, now: Time) {
let now_millis = now.as_millis();
let counts_as_failure = self.policy.failure_predicate.is_all_errors()
|| self.policy.failure_predicate.is_failure(error);
if !counts_as_failure {
self.total_ignored_errors.fetch_add(1, Ordering::Relaxed);
if let Permit::Probe {
epoch: permit_epoch,
} = permit
{
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::HalfOpen {
epoch,
probes_active,
successes,
} if epoch == permit_epoch => {
let new_state = State::HalfOpen {
epoch,
probes_active: probes_active.saturating_sub(1),
successes,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => current_bits = actual,
}
}
_ => break,
}
}
}
return;
}
self.total_failure.fetch_add(1, Ordering::Relaxed);
let window_triggered = self.sliding_window.as_ref().is_some_and(|window| {
let mut w = window.write();
w.record_failure(now_millis);
w.should_open()
});
let mut event = None;
match permit {
Permit::Normal => {
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::Closed { failures } => {
let new_failures = failures.saturating_add(1);
if new_failures >= self.policy.failure_threshold || window_triggered {
let new_state = State::Open {
since_millis: now_millis,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.current_failure_streak
.store(new_failures, Ordering::Relaxed);
self.times_opened.fetch_add(1, Ordering::Relaxed);
let mut m = self.metrics.write();
m.current_state = new_state;
if let Some(ref w) = self.sliding_window {
w.write().reset();
}
if self.policy.on_state_change.is_some() {
self.populate_metrics_snapshot(&mut m);
event = Some((state, new_state, m.clone()));
}
break;
}
Err(actual) => current_bits = actual,
}
} else {
let new_state = State::Closed {
failures: new_failures,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.current_failure_streak
.store(new_failures, Ordering::Relaxed);
break;
}
Err(actual) => current_bits = actual,
}
}
}
_ => break,
}
}
}
Permit::Probe {
epoch: permit_epoch,
} => {
let mut current_bits = self.state_bits.load(Ordering::Acquire);
loop {
let state = State::from_bits(current_bits);
match state {
State::HalfOpen { epoch, .. } if epoch == permit_epoch => {
let new_state = State::Open {
since_millis: now_millis,
};
match self.state_bits.compare_exchange_weak(
current_bits,
new_state.to_bits(),
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
self.times_opened.fetch_add(1, Ordering::Relaxed);
let mut m = self.metrics.write();
m.current_state = new_state;
if let Some(ref w) = self.sliding_window {
w.write().reset();
}
if self.policy.on_state_change.is_some() {
self.populate_metrics_snapshot(&mut m);
event = Some((state, new_state, m.clone()));
}
break;
}
Err(actual) => current_bits = actual,
}
}
_ => break,
}
}
}
}
if let Some((from, to, m)) = event {
if let Some(ref cb) = self.policy.on_state_change {
cb(from, to, &m);
}
}
}
pub fn call<T, E, F>(&self, now: Time, op: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
{
struct CallGuard<'a> {
cb: &'a CircuitBreaker,
permit: Option<Permit>,
now: Time,
}
impl Drop for CallGuard<'_> {
fn drop(&mut self) {
if let Some(permit) = self.permit.take() {
self.cb.record_failure(permit, "Panic", self.now);
}
}
}
let permit = self.should_allow(now).map_err(|e| match e {
CircuitBreakerError::Open { remaining } => CircuitBreakerError::Open { remaining },
CircuitBreakerError::HalfOpenFull => CircuitBreakerError::HalfOpenFull,
CircuitBreakerError::Inner(()) => unreachable!(),
})?;
let mut guard = CallGuard {
cb: self,
permit: Some(permit),
now,
};
match op() {
Ok(value) => {
if let Some(p) = guard.permit.take() {
self.record_success(p, now);
}
Ok(value)
}
Err(e) => {
let error_str = if self.policy.failure_predicate.is_all_errors() {
String::new()
} else {
e.to_string()
};
if let Some(p) = guard.permit.take() {
self.record_failure(p, &error_str, now);
}
Err(CircuitBreakerError::Inner(e))
}
}
}
pub fn reset(&self) {
let new_bits = State::Closed { failures: 0 }.to_bits();
let mut current = self.state_bits.load(Ordering::Acquire);
loop {
match self.state_bits.compare_exchange_weak(
current,
new_bits,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
self.current_failure_streak.store(0, Ordering::Relaxed);
if let Some(ref window) = self.sliding_window {
window.write().reset();
}
}
}
impl fmt::Debug for CircuitBreaker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CircuitBreaker")
.field("name", &self.policy.name)
.field("state", &self.state())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone)]
pub enum CircuitBreakerError<E> {
Open {
remaining: Duration,
},
HalfOpenFull,
Inner(E),
}
impl<E: fmt::Display> fmt::Display for CircuitBreakerError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Open { remaining } => write!(f, "circuit open, retry after {remaining:?}"),
Self::HalfOpenFull => write!(f, "circuit half-open, max probes active"),
Self::Inner(e) => write!(f, "{e}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for CircuitBreakerError<E> {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Permit {
Normal,
Probe {
epoch: u32,
},
}
#[derive(Default)]
pub struct CircuitBreakerPolicyBuilder {
policy: CircuitBreakerPolicy,
}
impl CircuitBreakerPolicyBuilder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.policy.name = name.into();
self
}
#[must_use]
pub const fn failure_threshold(mut self, threshold: u32) -> Self {
self.policy.failure_threshold = threshold;
self
}
#[must_use]
pub const fn success_threshold(mut self, threshold: u32) -> Self {
self.policy.success_threshold = threshold;
self
}
#[must_use]
pub const fn open_duration(mut self, duration: Duration) -> Self {
self.policy.open_duration = duration;
self
}
#[must_use]
pub const fn half_open_max_probes(mut self, max_probes: u32) -> Self {
self.policy.half_open_max_probes = normalize_half_open_max_probes(max_probes);
self
}
#[must_use]
pub fn failure_predicate(mut self, predicate: FailurePredicate) -> Self {
self.policy.failure_predicate = predicate;
self
}
#[must_use]
pub fn sliding_window(
mut self,
window_duration: Duration,
minimum_calls: u32,
failure_rate_threshold: f64,
) -> Self {
self.policy.sliding_window = Some(SlidingWindowConfig {
window_duration,
minimum_calls,
failure_rate_threshold,
});
self
}
#[must_use]
pub fn on_state_change(mut self, callback: StateChangeCallback) -> Self {
self.policy.on_state_change = Some(callback);
self
}
#[must_use]
pub fn build(self) -> CircuitBreakerPolicy {
self.policy
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
#[test]
fn state_bits_roundtrip_closed() {
let state = State::Closed { failures: 42 };
let bits = state.to_bits();
let recovered = State::from_bits(bits);
assert_eq!(state, recovered);
}
#[test]
fn state_bits_roundtrip_open() {
let state = State::Open {
since_millis: 123_456_789,
};
let bits = state.to_bits();
let recovered = State::from_bits(bits);
assert_eq!(state, recovered);
}
#[test]
fn state_bits_roundtrip_half_open() {
let state = State::HalfOpen {
epoch: 123,
probes_active: 3,
successes: 7,
};
let bits = state.to_bits();
let recovered = State::from_bits(bits);
assert_eq!(state, recovered);
}
#[test]
fn new_circuit_starts_closed() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy::default());
assert_eq!(cb.state(), State::Closed { failures: 0 });
}
#[test]
fn closed_allows_calls() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy::default());
let now = Time::from_millis(0);
assert!(cb.should_allow(now).is_ok());
}
#[test]
fn failures_increment_count() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 5,
..Default::default()
});
let now = Time::from_millis(0);
for i in 0..4 {
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "test error", now);
assert_eq!(cb.state(), State::Closed { failures: i + 1 });
assert_eq!(cb.metrics().current_failure_streak, i + 1);
}
}
#[test]
fn threshold_failures_opens_circuit() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 3,
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..3 {
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "test error", now);
}
assert!(matches!(cb.state(), State::Open { .. }));
}
#[test]
fn open_circuit_rejects_calls() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(30),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let result = cb.should_allow(now);
assert!(matches!(
result,
Err(CircuitBreakerError::Open { remaining }) if remaining == Duration::from_secs(30)
));
assert_eq!(cb.metrics().total_rejected, 1);
}
#[test]
fn open_transitions_to_half_open_after_duration() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(10),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let later = Time::from_millis(11_000);
let result = cb.should_allow(later);
assert!(result.is_ok());
assert!(matches!(cb.state(), State::HalfOpen { .. }));
}
#[test]
fn open_to_half_open_updates_metrics_state() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(10),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let later = Time::from_millis(11_000);
let result = cb.should_allow(later);
assert!(matches!(result, Ok(Permit::Probe { .. })));
assert!(matches!(
cb.metrics().current_state,
State::HalfOpen {
probes_active: 1,
successes: 0,
..
}
));
}
#[test]
fn half_open_limits_concurrent_probes() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_millis(0),
half_open_max_probes: 1,
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let probe1 = cb.should_allow(now);
assert!(probe1.is_ok());
let probe2 = cb.should_allow(now);
assert!(matches!(probe2, Err(CircuitBreakerError::HalfOpenFull)));
}
#[test]
fn successful_probes_close_circuit() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
success_threshold: 2,
open_duration: Duration::from_millis(0),
half_open_max_probes: 5,
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
for _ in 0..2 {
let permit = cb.should_allow(now).unwrap();
cb.record_success(permit, now);
}
assert_eq!(cb.state(), State::Closed { failures: 0 });
}
#[test]
fn failed_probe_reopens_circuit() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_millis(0),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "probe fail", now);
assert!(matches!(cb.state(), State::Open { .. }));
}
#[test]
fn success_resets_failure_count() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 5,
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..3 {
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
}
assert_eq!(cb.metrics().current_failure_streak, 3);
let permit = cb.should_allow(now).unwrap();
cb.record_success(permit, now);
assert_eq!(cb.metrics().current_failure_streak, 0);
assert_eq!(cb.state(), State::Closed { failures: 0 });
}
#[test]
fn failure_predicate_filters_errors() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
failure_predicate: FailurePredicate::ByType(|e| e.contains("timeout")),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "network error", now);
assert_eq!(cb.state(), State::Closed { failures: 0 });
assert_eq!(cb.metrics().total_ignored_errors, 1);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "timeout error", now);
assert!(matches!(cb.state(), State::Open { .. }));
}
#[test]
fn sliding_window_tracks_failure_rate() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1000, sliding_window: Some(SlidingWindowConfig {
window_duration: Duration::from_secs(60),
minimum_calls: 10,
failure_rate_threshold: 0.5,
}),
..Default::default()
});
let now = Time::from_millis(0);
for i in 0..10 {
let permit = cb.should_allow(now).unwrap();
if i < 6 {
cb.record_failure(permit, "fail", now);
} else {
cb.record_success(permit, now);
}
}
assert!(matches!(cb.state(), State::Open { .. }));
}
#[test]
fn sliding_window_minimum_calls_required() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1000,
sliding_window: Some(SlidingWindowConfig {
window_duration: Duration::from_secs(60),
minimum_calls: 10,
failure_rate_threshold: 0.5,
}),
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..5 {
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
}
assert_eq!(cb.state(), State::Closed { failures: 5 });
}
#[test]
fn metrics_track_calls() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 100,
..Default::default()
});
let now = Time::from_millis(0);
for _ in 0..3 {
let permit = cb.should_allow(now).unwrap();
cb.record_success(permit, now);
}
for _ in 0..2 {
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
}
let metrics = cb.metrics();
assert_eq!(metrics.total_success, 3);
assert_eq!(metrics.total_failure, 2);
}
#[test]
fn metrics_track_rejections() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
for _ in 0..5 {
let _ = cb.should_allow(now);
}
assert_eq!(cb.metrics().total_rejected, 5);
}
#[test]
fn state_change_callback_invoked() {
use std::sync::atomic::AtomicUsize;
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = callback_count.clone();
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
on_state_change: Some(Arc::new(move |_from, _to, _| {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
})),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
assert_eq!(callback_count.load(Ordering::SeqCst), 1);
}
#[test]
fn state_change_callback_invoked_for_open_to_half_open() {
use std::sync::atomic::AtomicUsize;
let callback_count = Arc::new(AtomicUsize::new(0));
let callback_count_clone = callback_count.clone();
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(10),
on_state_change: Some(Arc::new(move |_from, _to, _| {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
})),
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
let later = Time::from_millis(11_000);
let permit = cb.should_allow(later);
assert!(matches!(permit, Ok(Permit::Probe { .. })));
assert_eq!(callback_count.load(Ordering::SeqCst), 2);
}
#[test]
fn concurrent_calls_safe() {
use std::thread;
let cb = Arc::new(CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 100,
..Default::default()
}));
let handles: Vec<_> = (0..10)
.map(|_| {
let cb = cb.clone();
thread::spawn(move || {
let now = Time::from_millis(0);
for _ in 0..100 {
if let Ok(permit) = cb.should_allow(now) {
cb.record_success(permit, now);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(cb.metrics().total_success, 1000);
}
#[test]
fn call_executes_and_records_success() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy::default());
let now = Time::from_millis(0);
let result = cb.call(now, || Ok::<_, &str>(42));
assert_eq!(result.unwrap(), 42);
assert_eq!(cb.metrics().total_success, 1);
}
#[test]
fn call_executes_and_records_failure() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 5,
..Default::default()
});
let now = Time::from_millis(0);
let result: Result<i32, CircuitBreakerError<&str>> = cb.call(now, || Err("error"));
assert!(matches!(result, Err(CircuitBreakerError::Inner("error"))));
assert_eq!(cb.metrics().total_failure, 1);
}
#[test]
fn call_rejects_when_open() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::from_secs(60),
..Default::default()
});
let now = Time::from_millis(0);
let _ = cb.call(now, || Err::<i32, _>("fail"));
let mut called = false;
let result: Result<i32, CircuitBreakerError<&str>> = cb.call(now, || {
called = true;
Ok(42)
});
assert!(!called, "Operation should not have been called");
assert!(matches!(result, Err(CircuitBreakerError::Open { .. })));
}
#[test]
fn builder_creates_policy() {
let policy = CircuitBreakerPolicyBuilder::new()
.name("test")
.failure_threshold(10)
.success_threshold(3)
.open_duration(Duration::from_secs(60))
.half_open_max_probes(2)
.build();
assert_eq!(policy.name, "test");
assert_eq!(policy.failure_threshold, 10);
assert_eq!(policy.success_threshold, 3);
assert_eq!(policy.open_duration, Duration::from_secs(60));
assert_eq!(policy.half_open_max_probes, 2);
}
#[test]
fn builder_clamps_max_probes() {
let policy = CircuitBreakerPolicyBuilder::new()
.half_open_max_probes(20_000_000) .build();
assert_eq!(policy.half_open_max_probes, MAX_HALF_OPEN_PROBES);
assert_eq!(policy.half_open_max_probes, 0xFFFF);
}
#[test]
fn builder_clamps_zero_probes_to_minimum() {
let policy = CircuitBreakerPolicyBuilder::new()
.half_open_max_probes(0)
.build();
assert_eq!(policy.half_open_max_probes, MIN_HALF_OPEN_PROBES);
}
#[test]
fn constructor_clamps_zero_probes_to_minimum_semantics() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::ZERO,
half_open_max_probes: 0,
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "trip", now);
assert!(matches!(cb.state(), State::Open { .. }));
let probe = cb.should_allow(now);
assert!(matches!(probe, Ok(Permit::Probe { .. })));
let second_probe = cb.should_allow(now);
assert!(matches!(
second_probe,
Err(CircuitBreakerError::HalfOpenFull)
));
}
#[test]
fn reset_clears_state() {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
..Default::default()
});
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
assert!(matches!(cb.state(), State::Open { .. }));
cb.reset();
assert_eq!(cb.state(), State::Closed { failures: 0 });
assert_eq!(cb.metrics().current_failure_streak, 0);
}
#[test]
fn error_display() {
let open: CircuitBreakerError<&str> = CircuitBreakerError::Open {
remaining: Duration::from_secs(30),
};
assert!(open.to_string().contains("circuit open"));
let half_open: CircuitBreakerError<&str> = CircuitBreakerError::HalfOpenFull;
assert!(half_open.to_string().contains("half-open"));
let inner: CircuitBreakerError<&str> = CircuitBreakerError::Inner("test error");
assert_eq!(inner.to_string(), "test error");
}
#[test]
fn halfopen_bit_packing_large_probes_active() {
let state = State::HalfOpen {
epoch: 5,
probes_active: 0xFFFF, successes: 42,
};
let roundtripped = State::from_bits(state.to_bits());
assert_eq!(
roundtripped,
State::HalfOpen {
epoch: 5,
probes_active: 0xFFFF,
successes: 42,
}
);
let overflow = State::HalfOpen {
epoch: 5,
probes_active: 0x1FFFF, successes: 7,
};
let rt = State::from_bits(overflow.to_bits());
assert_eq!(
rt,
State::HalfOpen {
epoch: 5,
probes_active: 0xFFFF,
successes: 7,
}
);
}
#[test]
fn halfopen_bit_packing_successes_isolated() {
for probes in [0u32, 1, 255, 0xFFFF] {
for succ in [0u32, 1, 100, 0xFFFF] {
let state = State::HalfOpen {
epoch: 77,
probes_active: probes,
successes: succ,
};
let rt = State::from_bits(state.to_bits());
match rt {
State::HalfOpen {
epoch: e,
probes_active: p,
successes: s,
} => {
assert_eq!(e, 77, "epoch mismatch");
assert_eq!(p, probes, "probes_active mismatch for ({probes}, {succ})");
assert_eq!(s, succ, "successes mismatch for ({probes}, {succ})");
}
_ => panic!("Expected HalfOpen"),
}
}
}
}
#[test]
fn call_panics_leak_probes() {
let cb = std::sync::Arc::new(CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
open_duration: Duration::ZERO,
half_open_max_probes: 1,
..Default::default()
}));
let now = Time::from_millis(0);
let permit = cb.should_allow(now).unwrap();
cb.record_failure(permit, "fail", now);
assert!(matches!(cb.state(), State::Open { .. }));
let cb_clone = cb.clone();
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
let _: Result<(), CircuitBreakerError<String>> =
cb_clone.call::<(), String, _>(now, || panic!("oops"));
}));
assert!(
matches!(cb.state(), State::Open { .. }),
"Panic should record failure and reopen circuit"
);
let result = cb.should_allow(now);
assert!(
matches!(
result,
Ok(Permit::Probe { .. }) | Err(CircuitBreakerError::Open { .. })
),
"Expected reopened circuit to permit probe or briefly report Open"
);
}
#[test]
fn sliding_window_huge_duration_does_not_over_evict() {
let mut window = SlidingWindow::new(SlidingWindowConfig {
window_duration: Duration::MAX,
minimum_calls: 1,
failure_rate_threshold: 1.0,
});
window.record_failure(10);
window.record_success(20);
window.cleanup(u64::MAX);
assert_eq!(window.entries.len(), 2);
assert_eq!(window.failure_count, 1);
assert_eq!(window.success_count, 1);
}
#[test]
fn state_debug_clone_copy_eq_default() {
let s = State::default();
assert_eq!(s, State::Closed { failures: 0 });
let dbg = format!("{s:?}");
assert!(dbg.contains("Closed"), "{dbg}");
let copied: State = s;
let cloned = s;
assert_eq!(copied, cloned);
let open = State::Open { since_millis: 999 };
assert_ne!(s, open);
let dbg_open = format!("{open:?}");
assert!(dbg_open.contains("Open"), "{dbg_open}");
}
#[test]
fn circuit_breaker_metrics_debug_clone_default() {
let m = CircuitBreakerMetrics::default();
let dbg = format!("{m:?}");
assert!(dbg.contains("CircuitBreakerMetrics"), "{dbg}");
assert_eq!(m.total_success, 0);
let cloned = m;
assert_eq!(format!("{cloned:?}"), dbg);
}
#[test]
fn permit_debug_clone_copy_eq() {
let p = Permit::Normal;
let dbg = format!("{p:?}");
assert!(dbg.contains("Normal"), "{dbg}");
let copied: Permit = p;
let cloned = p;
assert_eq!(copied, cloned);
assert_ne!(p, Permit::Probe { epoch: 999 });
}
#[test]
fn half_open_to_closed_preserves_probe_success_history_in_sliding_window() {
let policy = CircuitBreakerPolicy {
failure_threshold: 2,
success_threshold: 2,
open_duration: Duration::from_millis(100),
sliding_window: Some(SlidingWindowConfig {
window_duration: Duration::from_secs(60),
minimum_calls: 2,
failure_rate_threshold: 0.5,
}),
..CircuitBreakerPolicy::default()
};
let cb = CircuitBreaker::new(policy);
let now = Time::from_millis(1_000);
let permit = cb.should_allow(now).expect("closed");
cb.record_failure(permit, "fail-1", now);
let permit = cb.should_allow(now).expect("closed after 1 failure");
cb.record_failure(permit, "fail-2", now);
assert!(
cb.should_allow(now).is_err(),
"circuit should be open after 2 failures"
);
let after_open = Time::from_millis(1_200);
let probe1 = cb
.should_allow(after_open)
.expect("first half-open probe should be allowed");
assert!(matches!(probe1, Permit::Probe { .. }));
cb.record_success(probe1, after_open);
let second_probe_time = Time::from_millis(1_201);
let probe2 = cb
.should_allow(second_probe_time)
.expect("second half-open probe should be allowed");
assert!(matches!(probe2, Permit::Probe { .. }));
cb.record_success(probe2, second_probe_time);
let post_close = cb.should_allow(second_probe_time);
assert!(
post_close.is_ok(),
"circuit should close after the required half-open successes, got {post_close:?}"
);
let post_recovery_failure_time = Time::from_millis(1_202);
let permit = cb
.should_allow(post_recovery_failure_time)
.expect("closed breaker should allow a normal call after recovery");
cb.record_failure(
permit,
"single post-recovery failure",
post_recovery_failure_time,
);
let state = cb.state();
assert_eq!(
state,
State::Closed { failures: 1 },
"one post-recovery failure should not reopen the breaker when the successful probe history is preserved"
);
let after_failure = cb.should_allow(post_recovery_failure_time);
assert!(
after_failure.is_ok(),
"preserving the successful probe history keeps the breaker closed after one ordinary failure, got {after_failure:?}"
);
}
#[test]
fn opening_circuit_clears_sliding_window_before_half_open_recovery() {
let policy = CircuitBreakerPolicy {
failure_threshold: 2,
success_threshold: 1,
open_duration: Duration::from_millis(100),
sliding_window: Some(SlidingWindowConfig {
window_duration: Duration::from_secs(60),
minimum_calls: 3,
failure_rate_threshold: 0.5,
}),
..CircuitBreakerPolicy::default()
};
let cb = CircuitBreaker::new(policy);
let now = Time::from_millis(1_000);
let p = cb.should_allow(now).expect("closed");
cb.record_failure(p, "err", now);
let p = cb.should_allow(now).expect("closed after 1");
cb.record_failure(p, "err", now);
assert!(cb.should_allow(now).is_err(), "should be open");
assert_eq!(
cb.metrics().sliding_window_failure_rate,
Some(0.0),
"opening the breaker should clear stale failure history from the sliding window"
);
let later = Time::from_millis(1_200);
let probe = cb.should_allow(later).expect("half-open probe");
assert!(matches!(probe, Permit::Probe { .. }));
cb.record_success(probe, later);
assert_eq!(
cb.state(),
State::Closed { failures: 0 },
"single successful probe should close the circuit when success_threshold=1"
);
let post = cb.should_allow(later);
assert!(
post.is_ok(),
"circuit should stay closed after half-open recovery when open already cleared the window, got {post:?}"
);
}
#[derive(Debug, PartialEq, Eq)]
struct OpenRecoverySnapshot {
post_probe_states: Vec<State>,
final_state: State,
total_success: u64,
total_failure: u64,
total_rejected: u64,
times_opened: u64,
times_closed: u64,
terminal_allows_normal: bool,
}
fn run_open_recovery_trace(
success_threshold: u32,
extra_open_polls: u32,
) -> OpenRecoverySnapshot {
let cb = CircuitBreaker::new(CircuitBreakerPolicy {
failure_threshold: 1,
success_threshold,
open_duration: Duration::from_millis(10),
half_open_max_probes: 1,
..Default::default()
});
let opened_at = Time::from_millis(1_000);
let permit = cb.should_allow(opened_at).expect("closed call should pass");
cb.record_failure(permit, "trip", opened_at);
assert!(matches!(cb.state(), State::Open { .. }));
let pre_expiry = Time::from_millis(1_005);
for _ in 0..extra_open_polls {
let result = cb.should_allow(pre_expiry);
assert!(
matches!(
result,
Err(CircuitBreakerError::Open { remaining })
if remaining > Duration::ZERO && remaining <= Duration::from_millis(10)
),
"pre-expiry polls must stay rejected while open: {result:?}"
);
}
let recovery_at = Time::from_millis(1_020);
let mut post_probe_states = Vec::new();
for step in 0..success_threshold {
let now = Time::from_millis(recovery_at.as_millis() + u64::from(step));
let permit = cb
.should_allow(now)
.expect("recovery probe should be allowed");
assert!(matches!(permit, Permit::Probe { .. }));
cb.record_success(permit, now);
post_probe_states.push(cb.state());
}
let terminal_allows_normal = matches!(
cb.should_allow(Time::from_millis(
recovery_at.as_millis() + u64::from(success_threshold) + 1
)),
Ok(Permit::Normal)
);
let metrics = cb.metrics();
OpenRecoverySnapshot {
post_probe_states,
final_state: cb.state(),
total_success: metrics.total_success,
total_failure: metrics.total_failure,
total_rejected: metrics.total_rejected,
times_opened: metrics.times_opened,
times_closed: metrics.times_closed,
terminal_allows_normal,
}
}
proptest! {
#[test]
fn metamorphic_open_rejections_do_not_perturb_recovery(
success_threshold in 1u32..=6,
extra_open_polls in 0u32..20,
) {
let baseline = run_open_recovery_trace(success_threshold, 0);
let transformed = run_open_recovery_trace(success_threshold, extra_open_polls);
prop_assert_eq!(transformed.post_probe_states, baseline.post_probe_states);
prop_assert_eq!(transformed.final_state, baseline.final_state);
prop_assert_eq!(transformed.final_state, State::Closed { failures: 0 });
prop_assert_eq!(baseline.total_success, u64::from(success_threshold));
prop_assert_eq!(transformed.total_success, u64::from(success_threshold));
prop_assert_eq!(transformed.total_failure, baseline.total_failure);
prop_assert_eq!(baseline.total_failure, 1);
prop_assert_eq!(transformed.times_opened, baseline.times_opened);
prop_assert_eq!(transformed.times_closed, baseline.times_closed);
prop_assert_eq!(baseline.times_opened, 1);
prop_assert_eq!(baseline.times_closed, 1);
prop_assert_eq!(
transformed.total_rejected,
baseline.total_rejected + u64::from(extra_open_polls)
);
prop_assert!(baseline.terminal_allows_normal);
prop_assert!(transformed.terminal_allows_normal);
}
}
}