use crate::cx::Cx;
use crate::time::Sleep;
use crate::types::cancel::CancelReason;
use crate::types::outcome::PanicPayload;
use crate::types::{Outcome, Time};
use core::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct AdaptiveHedgePolicy {
history: Vec<u64>,
count: u64,
alpha: f64,
min_delay: Duration,
max_delay: Duration,
}
#[allow(
clippy::cast_precision_loss, // quantile math is float-based by definition (alpha is f64)
clippy::cast_sign_loss // value is clamped into [0, n-1] before conversion
)]
fn conformal_rank(n: usize, alpha: f64) -> usize {
let q = ((n as f64 + 1.0) * (1.0 - alpha)).ceil();
if !q.is_finite() || q <= 1.0 {
0
} else if q >= n as f64 {
n.saturating_sub(1)
} else {
(q as usize).saturating_sub(1)
}
}
impl AdaptiveHedgePolicy {
#[must_use]
pub fn new(window_size: usize, alpha: f64, min_delay: Duration, max_delay: Duration) -> Self {
assert!(window_size > 0, "window size must be positive");
assert!(alpha > 0.0 && alpha < 1.0, "alpha must be in (0, 1)");
Self {
history: vec![0; window_size],
count: 0,
alpha,
min_delay,
max_delay,
}
}
pub fn record(&mut self, latency: Duration) {
let micros = latency.as_micros();
let val = if micros > u128::from(u64::MAX) {
u64::MAX
} else {
micros as u64
};
let capacity = self.history.len() as u64;
self.history[(self.count % capacity) as usize] = val;
self.count += 1;
}
#[must_use]
pub fn next_hedge_delay(&self) -> Duration {
let n = (self.count).min(self.history.len() as u64) as usize;
if n < 10 {
return self.max_delay;
}
let mut sorted = self.history[0..n].to_vec();
let rank = conformal_rank(n, self.alpha);
let (_, &mut bound_micros, _) = sorted.select_nth_unstable(rank);
let delay = Duration::from_micros(bound_micros);
delay.clamp(self.min_delay, self.max_delay)
}
#[must_use]
pub fn config(&self) -> HedgeConfig {
HedgeConfig::new(self.next_hedge_delay())
}
#[must_use]
pub fn window_size(&self) -> usize {
self.history.len()
}
#[must_use]
pub fn sample_count(&self) -> usize {
usize::try_from(self.count)
.unwrap_or(usize::MAX)
.min(self.history.len())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HedgeConfig {
pub hedge_delay: Duration,
pub backup_spawned: bool,
}
impl HedgeConfig {
#[must_use]
pub const fn new(hedge_delay: Duration) -> Self {
Self {
hedge_delay,
backup_spawned: false,
}
}
#[must_use]
pub const fn from_millis(millis: u64) -> Self {
Self::new(Duration::from_millis(millis))
}
#[must_use]
pub const fn from_secs(secs: u64) -> Self {
Self::new(Duration::from_secs(secs))
}
#[must_use]
pub fn delay_elapsed(&self, start: Time, now: Time) -> bool {
now >= self.deadline_from(start)
}
#[must_use]
pub fn deadline_from(&self, start: Time) -> Time {
start.saturating_add_nanos(self.hedge_delay_nanos_u64())
}
fn hedge_delay_nanos_u64(&self) -> u64 {
let nanos = self.hedge_delay.as_nanos();
if nanos > u128::from(u64::MAX) {
u64::MAX
} else {
nanos as u64
}
}
}
impl Default for HedgeConfig {
fn default() -> Self {
Self::from_millis(100)
}
}
#[derive(Debug)]
pub struct Hedge<T> {
pub config: HedgeConfig,
_t: PhantomData<T>,
}
impl<T> Hedge<T> {
#[must_use]
pub const fn new(hedge_delay: Duration) -> Self {
Self {
config: HedgeConfig::new(hedge_delay),
_t: PhantomData,
}
}
#[must_use]
pub const fn from_millis(millis: u64) -> Self {
Self::new(Duration::from_millis(millis))
}
#[must_use]
pub const fn from_secs(secs: u64) -> Self {
Self::new(Duration::from_secs(secs))
}
#[must_use]
pub const fn delay(&self) -> Duration {
self.config.hedge_delay
}
}
impl<T> Clone for Hedge<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for Hedge<T> {}
impl<T> Default for Hedge<T> {
fn default() -> Self {
Self {
config: HedgeConfig::default(),
_t: PhantomData,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HedgeWinner {
Primary,
Backup,
}
impl HedgeWinner {
#[must_use]
pub const fn is_primary(self) -> bool {
matches!(self, Self::Primary)
}
#[must_use]
pub const fn is_backup(self) -> bool {
matches!(self, Self::Backup)
}
}
#[derive(Debug, Clone)]
pub enum HedgeResult<T, E> {
PrimaryFast(Outcome<T, E>),
Raced {
winner_outcome: Outcome<T, E>,
winner: HedgeWinner,
loser_outcome: Outcome<T, E>,
},
}
impl<T, E> HedgeResult<T, E> {
#[must_use]
pub fn primary_fast(outcome: Outcome<T, E>) -> Self {
Self::PrimaryFast(outcome)
}
#[must_use]
pub fn primary_won(primary_outcome: Outcome<T, E>, backup_outcome: Outcome<T, E>) -> Self {
Self::Raced {
winner_outcome: primary_outcome,
winner: HedgeWinner::Primary,
loser_outcome: backup_outcome,
}
}
#[must_use]
pub fn backup_won(backup_outcome: Outcome<T, E>, primary_outcome: Outcome<T, E>) -> Self {
Self::Raced {
winner_outcome: backup_outcome,
winner: HedgeWinner::Backup,
loser_outcome: primary_outcome,
}
}
#[must_use]
pub const fn is_primary_fast(&self) -> bool {
matches!(self, Self::PrimaryFast(_))
}
#[must_use]
pub const fn was_raced(&self) -> bool {
matches!(self, Self::Raced { .. })
}
#[must_use]
pub fn winner_outcome(&self) -> &Outcome<T, E> {
match self {
Self::PrimaryFast(o) => o,
Self::Raced { winner_outcome, .. } => winner_outcome,
}
}
#[must_use]
pub fn into_winner_outcome(self) -> Outcome<T, E> {
match self {
Self::PrimaryFast(o) => o,
Self::Raced { winner_outcome, .. } => winner_outcome,
}
}
#[must_use]
pub fn winner_succeeded(&self) -> bool {
self.winner_outcome().is_ok()
}
#[must_use]
pub const fn winner(&self) -> HedgeWinner {
match self {
Self::PrimaryFast(_) => HedgeWinner::Primary,
Self::Raced { winner, .. } => *winner,
}
}
#[must_use]
pub fn loser_outcome(&self) -> Option<&Outcome<T, E>> {
match self {
Self::PrimaryFast(_) => None,
Self::Raced { loser_outcome, .. } => Some(loser_outcome),
}
}
}
#[derive(Debug, Clone)]
pub enum HedgeError<E> {
PrimaryFastError(E),
PrimaryError(E),
BackupError(E),
Cancelled(CancelReason),
Panicked(PanicPayload),
}
impl<E: fmt::Display> fmt::Display for HedgeError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::PrimaryFastError(e) => write!(f, "primary completed fast with error: {e}"),
Self::PrimaryError(e) => write!(f, "primary won race with error: {e}"),
Self::BackupError(e) => write!(f, "backup won race with error: {e}"),
Self::Cancelled(r) => write!(f, "winner was cancelled: {r}"),
Self::Panicked(p) => write!(f, "branch panicked: {p}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for HedgeError<E> {}
#[must_use]
pub fn hedge_outcomes<T, E>(
primary_outcome: Outcome<T, E>,
backup_spawned: bool,
backup_outcome: Option<Outcome<T, E>>,
winner: Option<HedgeWinner>,
) -> HedgeResult<T, E> {
if backup_spawned {
let backup_outcome = backup_outcome.expect("backup_outcome required when backup_spawned");
let winner = winner.expect("winner required when backup_spawned");
match winner {
HedgeWinner::Primary => HedgeResult::primary_won(primary_outcome, backup_outcome),
HedgeWinner::Backup => HedgeResult::backup_won(backup_outcome, primary_outcome),
}
} else {
assert!(
backup_outcome.is_none(),
"backup_outcome must be None when backup_spawned is false"
);
assert!(
winner.is_none(),
"winner must be None when backup_spawned is false"
);
HedgeResult::primary_fast(primary_outcome)
}
}
pub fn hedge_to_result<T, E>(result: HedgeResult<T, E>) -> Result<T, HedgeError<E>> {
match result {
HedgeResult::PrimaryFast(outcome) => match outcome {
Outcome::Ok(v) => Ok(v),
Outcome::Err(e) => Err(HedgeError::PrimaryFastError(e)),
Outcome::Cancelled(r) => Err(HedgeError::Cancelled(r)),
Outcome::Panicked(p) => Err(HedgeError::Panicked(p)),
},
HedgeResult::Raced {
winner_outcome,
winner,
..
} => match winner_outcome {
Outcome::Ok(v) => Ok(v),
Outcome::Err(e) => match winner {
HedgeWinner::Primary => Err(HedgeError::PrimaryError(e)),
HedgeWinner::Backup => Err(HedgeError::BackupError(e)),
},
Outcome::Cancelled(r) => Err(HedgeError::Cancelled(r)),
Outcome::Panicked(p) => Err(HedgeError::Panicked(p)),
},
}
}
pub struct HedgeFuture<Prim, Back, F> {
primary: Option<Prim>,
backup_factory: Option<F>,
backup: Option<Back>,
timer: Option<Sleep>,
config: HedgeConfig,
}
impl<Prim, Back, F> HedgeFuture<Prim, Back, F> {
fn new(config: HedgeConfig, primary: Prim, backup_factory: F) -> Self {
let timer = {
let now = Cx::current().map_or_else(crate::time::wall_now, |current| {
current
.timer_driver()
.map_or_else(crate::time::wall_now, |driver| driver.now())
});
Sleep::after(now, config.hedge_delay)
};
Self {
primary: Some(primary),
backup_factory: Some(backup_factory),
backup: None,
timer: Some(timer),
config,
}
}
}
impl<Prim, Back, F, T, E> Future for HedgeFuture<Prim, Back, F>
where
Prim: Future<Output = Outcome<T, E>> + Unpin,
Back: Future<Output = Outcome<T, E>> + Unpin,
F: FnOnce() -> Back + Unpin,
T: Unpin,
E: Unpin,
{
type Output = HedgeResult<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if let Some(primary) = &mut this.primary {
if let Poll::Ready(outcome) = Pin::new(primary).poll(cx) {
return Poll::Ready(if this.backup.is_some() {
HedgeResult::primary_won(
outcome,
Outcome::Cancelled(CancelReason::race_loser()),
)
} else {
HedgeResult::primary_fast(outcome)
});
}
}
if this.timer.is_some() {
if Pin::new(this.timer.as_mut().expect("timer initialized")).poll(cx) == Poll::Ready(())
{
this.timer = None; this.config.backup_spawned = true;
if let Some(factory) = this.backup_factory.take() {
this.backup = Some(factory());
}
}
}
if let Some(backup) = &mut this.backup {
if let Poll::Ready(outcome) = Pin::new(backup).poll(cx) {
return Poll::Ready(HedgeResult::backup_won(
outcome,
Outcome::Cancelled(CancelReason::race_loser()),
));
}
}
Poll::Pending
}
}
pub fn hedge<Prim, Back, F>(
config: HedgeConfig,
primary: Prim,
backup_factory: F,
) -> HedgeFuture<Prim, Back, F>
where
F: FnOnce() -> Back,
{
HedgeFuture::new(config, primary, backup_factory)
}
#[macro_export]
macro_rules! hedge {
($config:expr, $primary:expr, $backup:expr) => {
$crate::combinator::hedge::hedge($config, $primary, $backup)
};
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hedge_config_from_millis() {
let config = HedgeConfig::from_millis(100);
assert_eq!(config.hedge_delay, Duration::from_millis(100));
assert!(!config.backup_spawned);
}
#[test]
fn hedge_config_from_secs() {
let config = HedgeConfig::from_secs(5);
assert_eq!(config.hedge_delay, Duration::from_secs(5));
}
#[test]
fn hedge_config_default() {
let config = HedgeConfig::default();
assert_eq!(config.hedge_delay, Duration::from_millis(100));
}
#[test]
fn hedge_config_delay_elapsed() {
let config = HedgeConfig::from_millis(100);
let start = Time::from_nanos(1_000_000);
let now = Time::from_nanos(51_000_000);
assert!(!config.delay_elapsed(start, now));
let now = Time::from_nanos(101_000_000);
assert!(config.delay_elapsed(start, now));
let now = Time::from_nanos(201_000_000);
assert!(config.delay_elapsed(start, now));
}
#[test]
fn hedge_config_deadline_from() {
let config = HedgeConfig::from_millis(100);
let start = Time::from_nanos(1_000_000);
let deadline = config.deadline_from(start);
assert_eq!(deadline.as_nanos(), 101_000_000); }
#[test]
fn hedge_config_deadline_from_saturates_on_large_duration() {
let config = HedgeConfig::new(Duration::from_secs(u64::MAX));
let start = Time::from_nanos(1);
let deadline = config.deadline_from(start);
assert_eq!(deadline, Time::MAX);
}
#[test]
fn hedge_config_delay_elapsed_respects_saturated_deadline() {
let config = HedgeConfig::new(Duration::from_nanos(10));
let start = Time::from_nanos(u64::MAX - 5);
assert_eq!(config.deadline_from(start), Time::MAX);
assert!(!config.delay_elapsed(start, start));
assert!(config.delay_elapsed(start, Time::MAX));
}
#[test]
fn hedge_creation() {
let hedge = Hedge::<()>::from_millis(200);
assert_eq!(hedge.delay(), Duration::from_millis(200));
}
#[test]
fn hedge_clone_and_copy() {
let h1 = Hedge::<()>::from_millis(100);
let h2 = h1; let h3 = h1;
assert_eq!(h1.delay(), h2.delay());
assert_eq!(h1.delay(), h3.delay());
}
#[test]
fn hedge_default() {
let hedge = Hedge::<()>::default();
assert_eq!(hedge.delay(), Duration::from_millis(100));
}
#[test]
fn hedge_winner_is_primary() {
assert!(HedgeWinner::Primary.is_primary());
assert!(!HedgeWinner::Primary.is_backup());
}
#[test]
fn hedge_winner_is_backup() {
assert!(!HedgeWinner::Backup.is_primary());
assert!(HedgeWinner::Backup.is_backup());
}
#[test]
fn hedge_result_primary_fast() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(42));
assert!(result.is_primary_fast());
assert!(!result.was_raced());
assert!(result.winner().is_primary());
assert!(result.winner_succeeded());
assert!(result.loser_outcome().is_none());
}
#[test]
fn hedge_result_primary_won_race() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(42),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert!(!result.is_primary_fast());
assert!(result.was_raced());
assert!(result.winner().is_primary());
assert!(result.winner_succeeded());
assert!(result.loser_outcome().is_some());
assert!(result.loser_outcome().unwrap().is_cancelled());
}
#[test]
fn hedge_result_backup_won_race() {
let result: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Ok(99),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert!(!result.is_primary_fast());
assert!(result.was_raced());
assert!(result.winner().is_backup());
assert!(result.winner_succeeded());
assert!(result.loser_outcome().is_some());
}
#[test]
fn hedge_result_winner_outcome() {
let result: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Ok(99),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert!(result.winner_outcome().is_ok());
if let Outcome::Ok(v) = result.winner_outcome() {
assert_eq!(*v, 99);
}
}
#[test]
fn hedge_result_into_winner_outcome() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(42));
let outcome = result.into_winner_outcome();
assert!(matches!(outcome, Outcome::Ok(42)));
}
#[test]
fn hedge_outcomes_primary_fast() {
let result = hedge_outcomes::<i32, &str>(Outcome::Ok(42), false, None, None);
assert!(result.is_primary_fast());
assert!(result.winner_succeeded());
}
#[test]
fn hedge_outcomes_primary_won_race() {
let result = hedge_outcomes::<i32, &str>(
Outcome::Ok(42),
true,
Some(Outcome::Cancelled(CancelReason::race_loser())),
Some(HedgeWinner::Primary),
);
assert!(result.was_raced());
assert!(result.winner().is_primary());
assert!(result.winner_succeeded());
}
#[test]
fn hedge_outcomes_backup_won_race() {
let result = hedge_outcomes::<i32, &str>(
Outcome::Cancelled(CancelReason::race_loser()),
true,
Some(Outcome::Ok(99)),
Some(HedgeWinner::Backup),
);
assert!(result.was_raced());
assert!(result.winner().is_backup());
assert!(result.winner_succeeded());
}
#[test]
#[should_panic(expected = "backup_outcome required")]
fn hedge_outcomes_panics_without_backup_outcome() {
let _ =
hedge_outcomes::<i32, &str>(Outcome::Ok(42), true, None, Some(HedgeWinner::Primary));
}
#[test]
#[should_panic(expected = "winner required")]
fn hedge_outcomes_panics_without_winner() {
let _ = hedge_outcomes::<i32, &str>(
Outcome::Ok(42),
true,
Some(Outcome::Cancelled(CancelReason::race_loser())),
None,
);
}
#[test]
#[should_panic(expected = "backup_outcome must be None")]
fn hedge_outcomes_panics_on_backup_outcome_when_not_spawned() {
let _ = hedge_outcomes::<i32, &str>(
Outcome::Ok(42),
false,
Some(Outcome::Cancelled(CancelReason::race_loser())),
None,
);
}
#[test]
#[should_panic(expected = "winner must be None")]
fn hedge_outcomes_panics_on_winner_when_not_spawned() {
let _ =
hedge_outcomes::<i32, &str>(Outcome::Ok(42), false, None, Some(HedgeWinner::Primary));
}
#[test]
fn hedge_to_result_primary_fast_ok() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(42));
assert_eq!(hedge_to_result(result).unwrap(), 42);
}
#[test]
fn hedge_to_result_primary_fast_err() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Err("failed"));
assert!(matches!(
hedge_to_result(result),
Err(HedgeError::PrimaryFastError("failed"))
));
}
#[test]
fn hedge_to_result_primary_won_ok() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(42),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(hedge_to_result(result).unwrap(), 42);
}
#[test]
fn hedge_to_result_primary_won_err() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Err("primary failed"),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert!(matches!(
hedge_to_result(result),
Err(HedgeError::PrimaryError("primary failed"))
));
}
#[test]
fn hedge_to_result_backup_won_ok() {
let result: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Ok(99),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(hedge_to_result(result).unwrap(), 99);
}
#[test]
fn hedge_to_result_backup_won_err() {
let result: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Err("backup failed"),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert!(matches!(
hedge_to_result(result),
Err(HedgeError::BackupError("backup failed"))
));
}
#[test]
fn hedge_to_result_cancelled() {
let result: HedgeResult<i32, &str> =
HedgeResult::primary_fast(Outcome::Cancelled(CancelReason::shutdown()));
assert!(matches!(
hedge_to_result(result),
Err(HedgeError::Cancelled(_))
));
}
#[test]
fn hedge_to_result_panicked() {
let result: HedgeResult<i32, &str> =
HedgeResult::primary_fast(Outcome::Panicked(PanicPayload::new("boom")));
assert!(matches!(
hedge_to_result(result),
Err(HedgeError::Panicked(_))
));
}
#[test]
fn hedge_error_display_primary_fast() {
let err: HedgeError<&str> = HedgeError::PrimaryFastError("test");
assert!(err.to_string().contains("primary completed fast"));
assert!(err.to_string().contains("test"));
}
#[test]
fn hedge_error_display_primary() {
let err: HedgeError<&str> = HedgeError::PrimaryError("test");
assert!(err.to_string().contains("primary won race"));
}
#[test]
fn hedge_error_display_backup() {
let err: HedgeError<&str> = HedgeError::BackupError("test");
assert!(err.to_string().contains("backup won race"));
}
#[test]
fn hedge_error_display_cancelled() {
let err: HedgeError<&str> = HedgeError::Cancelled(CancelReason::shutdown());
assert!(err.to_string().contains("cancelled"));
}
#[test]
fn hedge_error_display_panicked() {
let err: HedgeError<&str> = HedgeError::Panicked(PanicPayload::new("boom"));
assert!(err.to_string().contains("panicked"));
}
#[test]
fn loser_is_always_tracked_in_race() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(42),
Outcome::Cancelled(CancelReason::race_loser()),
);
let loser = result.loser_outcome().expect("loser must be tracked");
assert!(loser.is_cancelled());
if let Outcome::Cancelled(reason) = loser {
assert!(matches!(
reason.kind(),
crate::types::cancel::CancelKind::RaceLost
));
}
}
#[test]
fn primary_fast_has_no_loser() {
let result: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(42));
assert!(result.loser_outcome().is_none());
}
#[test]
fn hedge_commutativity_of_race_result() {
let val = 42;
let r1: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(val),
Outcome::Cancelled(CancelReason::race_loser()),
);
let r2: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Ok(val),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(hedge_to_result(r1).unwrap(), hedge_to_result(r2).unwrap());
}
#[test]
fn hedge_result_winner_reflects_input() {
let primary_won: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(1),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(primary_won.winner(), HedgeWinner::Primary);
let backup_won: HedgeResult<i32, &str> = HedgeResult::backup_won(
Outcome::Ok(2),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(backup_won.winner(), HedgeWinner::Backup);
let primary_fast: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(3));
assert_eq!(primary_fast.winner(), HedgeWinner::Primary);
}
#[test]
fn metamorphic_redundant_loser_cancel_preserves_hedged_winner_result() {
let baseline: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(42),
Outcome::Cancelled(CancelReason::race_loser()),
);
let transformed: HedgeResult<i32, &str> = HedgeResult::primary_won(
Outcome::Ok(42),
Outcome::Cancelled(CancelReason::race_loser()),
);
assert_eq!(
hedge_to_result(baseline.clone()).unwrap(),
hedge_to_result(transformed.clone()).unwrap(),
"reissuing the loser cancellation must not perturb the successful hedged result"
);
assert_eq!(baseline.winner(), transformed.winner());
let baseline_loser = baseline
.loser_outcome()
.expect("raced hedge must track loser outcome");
let transformed_loser = transformed
.loser_outcome()
.expect("raced hedge must track loser outcome");
match (baseline_loser, transformed_loser) {
(Outcome::Cancelled(left), Outcome::Cancelled(right)) => {
assert_eq!(left.kind(), right.kind());
}
_ => panic!("loser should remain represented as cancellation"),
}
}
#[test]
fn hedge_config_debug_clone_copy_eq() {
let c = HedgeConfig::from_millis(100);
let c2 = c; let c3 = c;
assert_eq!(c, c2);
assert_eq!(c, c3);
assert_ne!(c, HedgeConfig::from_secs(5));
let dbg = format!("{c:?}");
assert!(dbg.contains("HedgeConfig"));
}
#[test]
fn hedge_winner_debug_clone_copy_eq() {
let w = HedgeWinner::Primary;
let w2 = w; let w3 = w;
assert_eq!(w, w2);
assert_eq!(w, w3);
assert_ne!(w, HedgeWinner::Backup);
let dbg = format!("{w:?}");
assert!(dbg.contains("Primary"));
}
#[test]
fn hedge_result_debug_clone() {
let r: HedgeResult<i32, &str> = HedgeResult::primary_fast(Outcome::Ok(42));
let r2 = r.clone();
assert_eq!(r.winner(), r2.winner());
let dbg = format!("{r:?}");
assert!(dbg.contains("HedgeResult") || dbg.contains("PrimaryFast"));
}
fn block_on<F: Future>(f: F) -> F::Output {
futures_lite::future::block_on(f)
}
#[test]
fn test_hedge_execution_primary_fast() {
let config = HedgeConfig::from_secs(10); let future = hedge(
config,
std::future::ready(Outcome::<i32, ()>::Ok(1)),
|| std::future::ready(Outcome::<i32, ()>::Ok(2)),
);
let result = block_on(future);
assert!(result.is_primary_fast());
if let Outcome::Ok(v) = result.winner_outcome() {
assert_eq!(*v, 1);
}
}
#[test]
fn test_hedge_execution_backup_wins_pending_primary() {
let config = HedgeConfig::from_millis(1);
let future = hedge(config, std::future::pending::<Outcome<i32, ()>>(), || {
std::future::ready(Outcome::<i32, ()>::Ok(2))
});
let result = block_on(future);
assert!(result.was_raced());
assert!(result.winner().is_backup());
if let Outcome::Ok(v) = result.winner_outcome() {
assert_eq!(*v, 2);
}
}
#[test]
fn test_adaptive_hedge_policy_conformal_quantile() {
let min_delay = Duration::from_millis(10);
let max_delay = Duration::from_secs(1);
let mut policy = AdaptiveHedgePolicy::new(100, 0.05, min_delay, max_delay);
for _ in 0..9 {
policy.record(Duration::from_millis(20));
}
assert_eq!(policy.next_hedge_delay(), max_delay);
policy.record(Duration::from_millis(20));
assert_eq!(policy.next_hedge_delay(), Duration::from_millis(20));
let mut policy = AdaptiveHedgePolicy::new(100, 0.05, min_delay, max_delay);
for i in 1..=100 {
policy.record(Duration::from_millis(i));
}
let delay = policy.next_hedge_delay();
assert_eq!(delay, Duration::from_millis(96));
}
#[test]
fn test_adaptive_hedge_policy_clamps() {
let min_delay = Duration::from_millis(50);
let max_delay = Duration::from_millis(100);
let mut policy = AdaptiveHedgePolicy::new(100, 0.05, min_delay, max_delay);
for _ in 0..20 {
policy.record(Duration::from_millis(1));
}
assert_eq!(policy.next_hedge_delay(), min_delay);
for _ in 0..20 {
policy.record(Duration::from_secs(5));
}
assert_eq!(policy.next_hedge_delay(), max_delay);
}
#[test]
fn test_adaptive_hedge_policy_uses_sliding_window_latest_samples() {
let min_delay = Duration::from_millis(1);
let max_delay = Duration::from_secs(1);
let mut policy = AdaptiveHedgePolicy::new(10, 0.5, min_delay, max_delay);
for millis in 1..=20 {
policy.record(Duration::from_millis(millis));
}
assert_eq!(policy.window_size(), 10);
assert_eq!(policy.sample_count(), 10);
assert_eq!(policy.next_hedge_delay(), Duration::from_millis(16));
}
#[test]
fn test_adaptive_hedge_policy_config_matches_next_delay() {
let min_delay = Duration::from_millis(5);
let max_delay = Duration::from_millis(500);
let mut policy = AdaptiveHedgePolicy::new(16, 0.1, min_delay, max_delay);
for millis in 10..=30 {
policy.record(Duration::from_millis(millis));
}
assert_eq!(policy.config().hedge_delay, policy.next_hedge_delay());
}
}