use rand::Rng;
use rand_distr::{Distribution as RandDistribution, Exp, Gamma as RandGamma, LogNormal as RandLogNormal};
use statrs::distribution::{ContinuousCDF, Gamma as StatGamma, LogNormal as StatLogNormal, Normal};
use crate::{
error::PoolsimError,
types::{DistributionModel, WorkloadConfig},
};
#[derive(Debug, Clone)]
pub struct EmpiricalCdf {
samples: Vec<f64>,
}
impl EmpiricalCdf {
fn new(mut samples: Vec<f64>) -> Result<Self, PoolsimError> {
if samples.is_empty() {
return Err(PoolsimError::Distribution(
"empirical distribution requires at least one sample".to_string(),
));
}
samples.sort_by(|a, b| a.total_cmp(b));
Ok(Self { samples })
}
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
let idx = rng.gen_range(0..self.samples.len());
self.samples[idx]
}
fn percentile(&self, p: f64) -> f64 {
if self.samples.is_empty() {
return 0.0;
}
let p = p.clamp(0.0, 1.0);
let idx = ((self.samples.len() - 1) as f64 * p).round() as usize;
self.samples[idx]
}
fn mean(&self) -> f64 {
self.samples.iter().sum::<f64>() / self.samples.len() as f64
}
}
#[derive(Debug, Clone)]
pub enum LatencyDistribution {
LogNormal {
mu: f64,
sigma: f64,
},
Exponential {
mean_ms: f64,
},
Empirical(EmpiricalCdf),
Gamma {
shape: f64,
scale: f64,
},
}
impl LatencyDistribution {
pub fn fit(workload: &WorkloadConfig, model: DistributionModel) -> Result<Self, PoolsimError> {
if let Some(raw_samples) = &workload.raw_samples_ms {
return EmpiricalCdf::new(raw_samples.clone()).map(Self::Empirical);
}
match model {
DistributionModel::LogNormal | DistributionModel::Empirical => {
let (mu, sigma) = fit_lognormal(workload)?;
Ok(Self::LogNormal { mu, sigma })
}
DistributionModel::Exponential => {
let mean_ms = workload.latency_p50_ms / std::f64::consts::LN_2;
Ok(Self::Exponential { mean_ms })
}
DistributionModel::Gamma => {
let (shape, scale) = fit_gamma(workload)?;
Ok(Self::Gamma { shape, scale })
}
}
}
pub fn sample_ms<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
match self {
Self::LogNormal { mu, sigma } => {
let dist = RandLogNormal::new(*mu, *sigma).expect("valid lognormal params");
dist.sample(rng)
}
Self::Exponential { mean_ms } => {
let dist = Exp::new(1.0 / mean_ms).expect("valid exponential rate");
dist.sample(rng)
}
Self::Empirical(empirical) => empirical.sample(rng),
Self::Gamma { shape, scale } => {
let dist = RandGamma::new(*shape, *scale).expect("valid gamma params");
dist.sample(rng)
}
}
}
pub fn percentile_ms(&self, p: f64) -> Result<f64, PoolsimError> {
let p = p.clamp(0.0, 1.0);
match self {
Self::LogNormal { mu, sigma } => {
let dist =
StatLogNormal::new(*mu, *sigma).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
Ok(dist.inverse_cdf(p))
}
Self::Exponential { mean_ms } => Ok(-mean_ms * (1.0 - p).ln()),
Self::Empirical(empirical) => Ok(empirical.percentile(p)),
Self::Gamma { shape, scale } => {
let dist =
StatGamma::new(*shape, *scale).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
Ok(dist.inverse_cdf(p))
}
}
}
pub fn mean_ms(&self) -> f64 {
match self {
Self::LogNormal { mu, sigma } => (mu + 0.5 * sigma * sigma).exp(),
Self::Exponential { mean_ms } => *mean_ms,
Self::Empirical(empirical) => empirical.mean(),
Self::Gamma { shape, scale } => shape * scale,
}
}
}
fn fit_lognormal(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
let mu = workload.latency_p50_ms.ln();
let normal = Normal::new(0.0, 1.0).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
let mut sigmas = Vec::new();
if workload.latency_p95_ms > workload.latency_p50_ms {
let z95 = normal.inverse_cdf(0.95);
sigmas.push((workload.latency_p95_ms / workload.latency_p50_ms).ln() / z95);
}
if workload.latency_p99_ms > workload.latency_p50_ms {
let z99 = normal.inverse_cdf(0.99);
sigmas.push((workload.latency_p99_ms / workload.latency_p50_ms).ln() / z99);
}
let sigma = sigmas
.into_iter()
.filter(|s| s.is_finite() && *s > 0.0)
.sum::<f64>();
if sigma <= 0.0 {
return Err(PoolsimError::Distribution(
"unable to derive positive lognormal sigma from percentiles".to_string(),
));
}
let count = if workload.latency_p99_ms > workload.latency_p50_ms { 2.0 } else { 1.0 };
Ok((mu, sigma / count))
}
fn fit_gamma(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
let mean = (workload.latency_p50_ms + workload.latency_p95_ms + workload.latency_p99_ms) / 3.0;
let std_est = ((workload.latency_p99_ms - workload.latency_p50_ms) / 2.326_347_874).max(1e-6);
let var = std_est * std_est;
let shape = (mean * mean / var).max(1e-6);
let scale = (var / mean).max(1e-6);
Ok((shape, scale))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn empirical_percentile_returns_zero_for_empty_internal_samples() {
let empirical = EmpiricalCdf { samples: Vec::new() };
assert_eq!(empirical.percentile(0.5), 0.0);
}
#[test]
fn fit_gamma_clamps_non_finite_derived_parameters_to_safe_minimum() {
let workload = WorkloadConfig {
requests_per_second: 100.0,
latency_p50_ms: 10.0,
latency_p95_ms: 20.0,
latency_p99_ms: f64::INFINITY,
raw_samples_ms: None,
step_load_profile: None,
};
let (shape, scale) =
fit_gamma(&workload).expect("non-finite intermediates should clamp to safe values");
assert_eq!(shape, 1e-6);
assert_eq!(scale, 1e-6);
}
}