use seatbelt::hedging::{Hedging, HedgingLayer};
use seatbelt::typestates::Set;
use crate::http_recovery::detect_recovery;
use crate::{HttpClone, HttpRecovery, HttpRequest, HttpResponse};
pub type HttpHedgingLayer<S1 = Set, S2 = Set> = HedgingLayer<HttpRequest, http_extensions::Result<HttpResponse>, S1, S2>;
pub type HttpHedging<S> = Hedging<HttpRequest, http_extensions::Result<HttpResponse>, S>;
pub trait HttpHedgingLayerExt<S1, S2>: sealed::Sealed {
fn http_configure_defaults(self) -> HttpHedgingLayer;
fn http_clone(self, clone_strategy: HttpClone) -> HttpHedgingLayer<Set, S2>;
fn http_recovery(self, recovery: impl Into<HttpRecovery>) -> HttpHedgingLayer<S1, Set>;
}
impl<S1, S2> HttpHedgingLayerExt<S1, S2> for HttpHedgingLayer<S1, S2> {
fn http_configure_defaults(self) -> HttpHedgingLayer {
self.http_clone(HttpClone::default()).http_recovery(HttpRecovery::default())
}
fn http_clone(self, clone_strategy: HttpClone) -> HttpHedgingLayer<Set, S2> {
self.clone_input_with(move |request, args| clone_strategy.try_clone(request, args.attempt(), None))
}
fn http_recovery(self, recovery: impl Into<HttpRecovery>) -> HttpHedgingLayer<S1, Set> {
let recovery = recovery.into();
self.recovery_with(move |out, args| detect_recovery(out, &recovery, args.clock()))
}
}
pub(crate) mod sealed {
use super::*;
#[expect(unnameable_types, reason = "intentional, sealed trait pattern")]
pub trait Sealed {}
impl<S1, S2> Sealed for HttpHedgingLayer<S1, S2> {}
}
#[cfg_attr(coverage_nightly, coverage(off))]
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::executor::block_on;
use http::{Method, StatusCode};
use http_extensions::routing::{BaseUriConflict, Router};
use http_extensions::{FakeHandler, HttpRequestBuilder, HttpResponseBuilder};
use layered::{Service, Stack};
use seatbelt::Attempt;
use templated_uri::BaseUri;
use tick::ClockControl;
use super::*;
#[test]
fn hedging_recovers_with_safe_methods() {
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let service = (
HttpHedging::layer("test", &context).http_configure_defaults(),
FakeHandler::from_status_codes([StatusCode::INTERNAL_SERVER_ERROR, StatusCode::OK]),
)
.into_service();
let request = HttpRequestBuilder::new_fake().uri("https://example.com").build().unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
#[test]
fn hedging_fails_with_unsafe_methods() {
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let service = (
HttpHedging::layer("test", &context).http_configure_defaults(),
FakeHandler::from_status_codes([StatusCode::INTERNAL_SERVER_ERROR, StatusCode::OK]),
)
.into_service();
let request = HttpRequestBuilder::new_fake()
.uri("https://example.com")
.method(Method::POST)
.build()
.unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
fn hedging_routes_hedged_attempts_with_custom_router() {
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context = crate::HttpResilienceContext::new(&clock);
let captured_uris: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let captured_uris_for_handler = Arc::clone(&captured_uris);
let handler = FakeHandler::from_sync_handler(move |request: HttpRequest| {
captured_uris_for_handler
.lock()
.expect("mutex is only accessed in single-threaded test")
.push(request.uri().to_string());
let attempt = request.extensions().get::<Attempt>().unwrap();
let status = if attempt.is_last() {
StatusCode::OK
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
HttpResponseBuilder::new_fake().status(status).build()
});
let router = Router::custom(
|ctx| {
Some(match ctx.attempt() {
1 => BaseUri::from_static("https://hedge-1.example.com"),
_ => BaseUri::from_static("https://hedge-2.example.com"),
})
},
true,
)
.conflict_policy(BaseUriConflict::UseRouted);
let service = (
HttpHedging::layer("test", &context)
.http_configure_defaults()
.max_hedged_attempts(2)
.hedging_delay(Duration::from_millis(10)),
handler,
)
.into_service();
let request = HttpRequestBuilder::new_fake()
.uri("https://primary.example.com/items")
.extension(router)
.build()
.unwrap();
let response = block_on(service.execute(request)).unwrap();
assert_eq!(response.status(), StatusCode::OK);
let uris = captured_uris
.lock()
.expect("mutex is only accessed in single-threaded test")
.clone();
assert_eq!(
uris,
vec![
"https://primary.example.com/items".to_string(),
"https://hedge-1.example.com/items".to_string(),
"https://hedge-2.example.com/items".to_string(),
],
);
}
}