use std::cell::RefCell;
use std::collections::VecDeque;
use std::time::Duration;
pub const MIN_CALIBRATION_SAMPLES: usize = 8;
const MIN_CALIBRATION_WINDOW: usize = MIN_CALIBRATION_SAMPLES;
const MAX_CALIBRATION_WINDOW: usize = 4096;
pub const DEFAULT_CALIBRATION_WINDOW: usize = 256;
pub const DEFAULT_ALPHA: f64 = 0.05;
#[derive(Debug)]
pub struct ConformalRetryBudget {
slo_ms: u64,
alpha: f64,
calibration_window: usize,
latencies_ns: VecDeque<u64>,
}
impl Default for ConformalRetryBudget {
fn default() -> Self {
Self {
slo_ms: 0,
alpha: DEFAULT_ALPHA,
calibration_window: DEFAULT_CALIBRATION_WINDOW,
latencies_ns: VecDeque::with_capacity(DEFAULT_CALIBRATION_WINDOW),
}
}
}
impl ConformalRetryBudget {
pub const fn slo_ms(&self) -> Option<u64> {
if self.slo_ms == 0 {
None
} else {
Some(self.slo_ms)
}
}
pub const fn alpha(&self) -> f64 {
self.alpha
}
pub const fn calibration_window(&self) -> usize {
self.calibration_window
}
#[cfg(test)]
pub fn sample_count(&self) -> usize {
self.latencies_ns.len()
}
pub const fn set_slo_ms(&mut self, slo_ms: u64) {
self.slo_ms = slo_ms;
}
pub fn set_alpha(&mut self, alpha: f64) {
let clamped = if alpha.is_nan() {
DEFAULT_ALPHA
} else if alpha <= 0.0 {
f64::EPSILON
} else if alpha >= 1.0 {
1.0 - f64::EPSILON
} else {
alpha
};
self.alpha = clamped;
}
pub fn set_calibration_window(&mut self, window: usize) {
let window = window.clamp(MIN_CALIBRATION_WINDOW, MAX_CALIBRATION_WINDOW);
self.calibration_window = window;
while self.latencies_ns.len() > window {
self.latencies_ns.pop_front();
}
if self.latencies_ns.capacity() > window.saturating_mul(2) {
self.latencies_ns.shrink_to(window);
}
}
pub fn record_success(&mut self, latency: Duration) {
let ns = u64::try_from(latency.as_nanos()).unwrap_or(u64::MAX);
if self.latencies_ns.len() == self.calibration_window {
self.latencies_ns.pop_front();
}
self.latencies_ns.push_back(ns);
}
pub fn quantile_bound(&self) -> Option<Duration> {
let k = self.latencies_ns.len();
if k < MIN_CALIBRATION_SAMPLES {
return None;
}
let alpha = self.alpha.clamp(f64::EPSILON, 1.0 - f64::EPSILON);
let target = (1.0 - alpha) * ((k as f64) + 1.0);
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let rank_ceil = target.ceil() as usize;
if rank_ceil == 0 {
return None;
}
let idx = rank_ceil.saturating_sub(1).min(k - 1);
let mut scratch: Vec<u64> = self.latencies_ns.iter().copied().collect();
let (_, pivot, _) = scratch.select_nth_unstable(idx);
Some(Duration::from_nanos(*pivot))
}
pub fn slo_budget(&self) -> Option<Duration> {
self.slo_ms().map(Duration::from_millis)
}
pub fn retry_allowed(&self, elapsed: Duration) -> bool {
let Some(budget) = self.slo_budget() else {
return true;
};
let Some(predicted_tail) = self.quantile_bound() else {
return elapsed < budget;
};
let projected = elapsed.saturating_add(predicted_tail);
projected < budget
}
}
pub type ConformalRetryBudgetCell = RefCell<ConformalRetryBudget>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_budget_is_disabled() {
let b = ConformalRetryBudget::default();
assert!(b.slo_ms().is_none());
assert!(b.retry_allowed(Duration::from_secs(3600)));
}
#[test]
fn quantile_requires_minimum_samples() {
let mut b = ConformalRetryBudget::default();
for _ in 0..(MIN_CALIBRATION_SAMPLES - 1) {
b.record_success(Duration::from_millis(1));
}
assert!(b.quantile_bound().is_none());
b.record_success(Duration::from_millis(1));
assert!(b.quantile_bound().is_some());
}
#[test]
fn quantile_picks_correct_order_statistic() {
let mut b = ConformalRetryBudget::default();
b.set_alpha(0.2);
for ms in 1u64..=10 {
b.record_success(Duration::from_millis(ms));
}
let q = b.quantile_bound().expect("quantile with K=10");
assert_eq!(q, Duration::from_millis(9));
}
#[test]
fn set_calibration_window_truncates_oldest() {
let mut b = ConformalRetryBudget::default();
for ms in 1u64..=100 {
b.record_success(Duration::from_millis(ms));
}
b.set_calibration_window(16);
assert_eq!(b.sample_count(), 16);
b.set_alpha(0.05);
let q = b.quantile_bound().unwrap();
assert_eq!(q, Duration::from_millis(100));
}
#[test]
fn calibration_window_is_bounded() {
let mut b = ConformalRetryBudget::default();
b.set_calibration_window(0);
assert_eq!(b.calibration_window(), MIN_CALIBRATION_WINDOW);
b.set_calibration_window(usize::MAX);
assert_eq!(b.calibration_window(), MAX_CALIBRATION_WINDOW);
}
#[test]
fn retry_disallowed_when_projected_exceeds_slo() {
let mut b = ConformalRetryBudget::default();
b.set_slo_ms(100);
b.set_alpha(0.1);
for _ in 0..20 {
b.record_success(Duration::from_millis(50));
}
assert!(!b.retry_allowed(Duration::from_millis(60)));
assert!(b.retry_allowed(Duration::from_millis(10)));
}
#[test]
fn alpha_bounds_are_enforced() {
let mut b = ConformalRetryBudget::default();
b.set_alpha(-1.0);
assert!(b.alpha() > 0.0);
b.set_alpha(2.0);
assert!(b.alpha() < 1.0);
b.set_alpha(f64::NAN);
assert!(b.alpha() > 0.0 && b.alpha() < 1.0);
}
#[test]
fn retry_allowed_when_slo_disabled() {
let mut b = ConformalRetryBudget::default();
for _ in 0..32 {
b.record_success(Duration::from_secs(10));
}
assert!(b.retry_allowed(Duration::from_secs(86_400)));
}
}