use http::Uri;
use http_extensions::HttpError;
use seatbelt::breaker::{Breaker, BreakerId, BreakerLayer};
use seatbelt::typestates::Set;
use templated_uri::Origin;
use crate::http_recovery::detect_recovery;
use crate::{HttpRecovery, HttpRequest, HttpResponse};
pub type HttpBreakerLayer<S1 = Set, S2 = Set> = BreakerLayer<HttpRequest, http_extensions::Result<HttpResponse>, S1, S2>;
pub type HttpBreaker<S> = Breaker<HttpRequest, http_extensions::Result<HttpResponse>, S>;
pub trait HttpBreakerLayerExt<S1, S2>: sealed::Sealed {
fn http_configure_defaults(self) -> HttpBreakerLayer;
fn http_recovery(self, recovery: impl Into<HttpRecovery>) -> HttpBreakerLayer<Set, S2>;
fn http_rejected_request_error(self) -> HttpBreakerLayer<S1, Set>;
}
impl<S1, S2> HttpBreakerLayerExt<S1, S2> for HttpBreakerLayer<S1, S2> {
fn http_configure_defaults(self) -> HttpBreakerLayer {
self.http_recovery(HttpRecovery::default())
.http_rejected_request_error()
.breaker_id(|req: &HttpRequest| create_breaker_id(req.uri()))
}
fn http_recovery(self, recovery: impl Into<HttpRecovery>) -> HttpBreakerLayer<Set, S2> {
let recovery = recovery.into();
self.recovery_with(move |out, args| detect_recovery(out, &recovery, args.clock()))
}
fn http_rejected_request_error(self) -> HttpBreakerLayer<S1, Set> {
self.rejected_input_error(|request, _args| HttpError::unavailable("circuit breaker open").with_request(request))
}
}
pub(crate) mod sealed {
use super::*;
#[expect(unnameable_types, reason = "intentional, sealed trait pattern")]
pub trait Sealed {}
impl<S1, S2> Sealed for HttpBreakerLayer<S1, S2> {}
}
fn create_breaker_id(uri: &Uri) -> BreakerId {
match (uri.scheme(), uri.authority()) {
(Some(scheme), Some(authority)) => Origin::from_parts(scheme.clone(), authority.clone()).to_string().into(),
_ => BreakerId::from("default"),
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
use futures::executor::block_on;
use http::StatusCode;
use http_extensions::{FakeHandler, HttpRequestBuilder, HttpResponseBuilder};
use layered::{Service, Stack};
use ohno::ErrorExt;
use seatbelt::{Recovery, RecoveryKind};
use tick::ClockControl;
use super::*;
fn breaker_id(uri: &str) -> BreakerId {
create_breaker_id(&uri.parse::<Uri>().unwrap())
}
#[test]
fn create_breaker_id_extracts_origin() {
assert_eq!(breaker_id("https://example.com/path?q=1"), BreakerId::from("https://example.com"));
assert_eq!(breaker_id("http://example.com/path"), BreakerId::from("http://example.com"));
}
#[test]
fn create_breaker_id_handles_ports() {
assert_eq!(
breaker_id("https://example.com:8443/api"),
BreakerId::from("https://example.com:8443")
);
assert_eq!(breaker_id("https://example.com:443/api"), BreakerId::from("https://example.com"));
assert_eq!(breaker_id("http://example.com:80/api"), BreakerId::from("http://example.com"));
}
#[test]
fn create_breaker_id_distinguishes_origins() {
assert_ne!(breaker_id("https://a.example.com/path"), breaker_id("https://b.example.com/path"));
assert_eq!(breaker_id("https://example.com/a"), breaker_id("https://example.com/b"));
}
#[test]
fn create_breaker_id_falls_back_to_default() {
assert_eq!(breaker_id("/relative/path"), BreakerId::from("default"));
}
#[test]
fn server_errors_trip_breaker() {
let handler = FakeHandler::from_fn(|_req| HttpResponseBuilder::new_fake().status(StatusCode::INTERNAL_SERVER_ERROR).build());
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let service = (
HttpBreaker::layer("test", &context)
.http_configure_defaults()
.min_throughput(10)
.failure_threshold(0.5),
handler,
)
.into_service();
for _ in 0..20 {
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let _ = block_on(service.execute(request));
}
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let mut error = block_on(service.execute(request)).unwrap_err();
assert!(error.message().contains("circuit breaker"));
assert_eq!(error.recovery().kind(), RecoveryKind::Unavailable);
assert!(error.take_request().is_some());
}
#[test]
fn success_does_not_trip_breaker() {
let handler = FakeHandler::from_fn(|_req| HttpResponseBuilder::new_fake().status(StatusCode::OK).build());
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let service = (
HttpBreaker::layer("test", &context)
.http_configure_defaults()
.min_throughput(10)
.failure_threshold(0.5),
handler,
)
.into_service();
for _ in 0..20 {
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn client_errors_do_not_trip_breaker() {
let handler = FakeHandler::from_fn(|_req| HttpResponseBuilder::new_fake().status(StatusCode::BAD_REQUEST).build());
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let service = (
HttpBreaker::layer("test", &context)
.http_configure_defaults()
.min_throughput(10)
.failure_threshold(0.5),
handler,
)
.into_service();
for _ in 0..20 {
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
}