poolsim_core/
distribution.rs1use rand::Rng;
21use rand_distr::{Distribution as RandDistribution, Exp, Gamma as RandGamma, LogNormal as RandLogNormal};
22use statrs::distribution::{ContinuousCDF, Gamma as StatGamma, LogNormal as StatLogNormal, Normal};
23
24use crate::{
25 error::PoolsimError,
26 types::{DistributionModel, WorkloadConfig},
27};
28
29#[derive(Debug, Clone)]
31pub struct EmpiricalCdf {
32 samples: Vec<f64>,
33}
34
35impl EmpiricalCdf {
36 fn new(mut samples: Vec<f64>) -> Result<Self, PoolsimError> {
37 if samples.is_empty() {
38 return Err(PoolsimError::Distribution(
39 "empirical distribution requires at least one sample".to_string(),
40 ));
41 }
42 samples.sort_by(|a, b| a.total_cmp(b));
43 Ok(Self { samples })
44 }
45
46 fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
47 let idx = rng.gen_range(0..self.samples.len());
48 self.samples[idx]
49 }
50
51 fn percentile(&self, p: f64) -> f64 {
52 if self.samples.is_empty() {
53 return 0.0;
54 }
55 let p = p.clamp(0.0, 1.0);
56 let idx = ((self.samples.len() - 1) as f64 * p).round() as usize;
57 self.samples[idx]
58 }
59
60 fn mean(&self) -> f64 {
61 self.samples.iter().sum::<f64>() / self.samples.len() as f64
62 }
63}
64
65#[derive(Debug, Clone)]
67pub enum LatencyDistribution {
68 LogNormal {
70 mu: f64,
72 sigma: f64,
74 },
75 Exponential {
77 mean_ms: f64,
79 },
80 Empirical(EmpiricalCdf),
82 Gamma {
84 shape: f64,
86 scale: f64,
88 },
89}
90
91impl LatencyDistribution {
92 pub fn fit(workload: &WorkloadConfig, model: DistributionModel) -> Result<Self, PoolsimError> {
102 if let Some(raw_samples) = &workload.raw_samples_ms {
103 return EmpiricalCdf::new(raw_samples.clone()).map(Self::Empirical);
104 }
105
106 match model {
107 DistributionModel::LogNormal | DistributionModel::Empirical => {
108 let (mu, sigma) = fit_lognormal(workload)?;
109 Ok(Self::LogNormal { mu, sigma })
110 }
111 DistributionModel::Exponential => {
112 let mean_ms = workload.latency_p50_ms / std::f64::consts::LN_2;
113 Ok(Self::Exponential { mean_ms })
114 }
115 DistributionModel::Gamma => {
116 let (shape, scale) = fit_gamma(workload)?;
117 Ok(Self::Gamma { shape, scale })
118 }
119 }
120 }
121
122 pub fn sample_ms<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
124 match self {
125 Self::LogNormal { mu, sigma } => {
126 let dist = RandLogNormal::new(*mu, *sigma).expect("valid lognormal params");
127 dist.sample(rng)
128 }
129 Self::Exponential { mean_ms } => {
130 let dist = Exp::new(1.0 / mean_ms).expect("valid exponential rate");
131 dist.sample(rng)
132 }
133 Self::Empirical(empirical) => empirical.sample(rng),
134 Self::Gamma { shape, scale } => {
135 let dist = RandGamma::new(*shape, *scale).expect("valid gamma params");
136 dist.sample(rng)
137 }
138 }
139 }
140
141 pub fn percentile_ms(&self, p: f64) -> Result<f64, PoolsimError> {
150 let p = p.clamp(0.0, 1.0);
151 match self {
152 Self::LogNormal { mu, sigma } => {
153 let dist =
154 StatLogNormal::new(*mu, *sigma).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
155 Ok(dist.inverse_cdf(p))
156 }
157 Self::Exponential { mean_ms } => Ok(-mean_ms * (1.0 - p).ln()),
158 Self::Empirical(empirical) => Ok(empirical.percentile(p)),
159 Self::Gamma { shape, scale } => {
160 let dist =
161 StatGamma::new(*shape, *scale).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
162 Ok(dist.inverse_cdf(p))
163 }
164 }
165 }
166
167 pub fn mean_ms(&self) -> f64 {
169 match self {
170 Self::LogNormal { mu, sigma } => (mu + 0.5 * sigma * sigma).exp(),
171 Self::Exponential { mean_ms } => *mean_ms,
172 Self::Empirical(empirical) => empirical.mean(),
173 Self::Gamma { shape, scale } => shape * scale,
174 }
175 }
176}
177
178fn fit_lognormal(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
179 let mu = workload.latency_p50_ms.ln();
180 let normal = Normal::new(0.0, 1.0).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
181
182 let mut sigmas = Vec::new();
183 if workload.latency_p95_ms > workload.latency_p50_ms {
184 let z95 = normal.inverse_cdf(0.95);
185 sigmas.push((workload.latency_p95_ms / workload.latency_p50_ms).ln() / z95);
186 }
187 if workload.latency_p99_ms > workload.latency_p50_ms {
188 let z99 = normal.inverse_cdf(0.99);
189 sigmas.push((workload.latency_p99_ms / workload.latency_p50_ms).ln() / z99);
190 }
191
192 let sigma = sigmas
193 .into_iter()
194 .filter(|s| s.is_finite() && *s > 0.0)
195 .sum::<f64>();
196
197 if sigma <= 0.0 {
198 return Err(PoolsimError::Distribution(
199 "unable to derive positive lognormal sigma from percentiles".to_string(),
200 ));
201 }
202
203 let count = if workload.latency_p99_ms > workload.latency_p50_ms { 2.0 } else { 1.0 };
204 Ok((mu, sigma / count))
205}
206
207fn fit_gamma(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
208 let mean = (workload.latency_p50_ms + workload.latency_p95_ms + workload.latency_p99_ms) / 3.0;
209 let std_est = ((workload.latency_p99_ms - workload.latency_p50_ms) / 2.326_347_874).max(1e-6);
210 let var = std_est * std_est;
211 let shape = (mean * mean / var).max(1e-6);
212 let scale = (var / mean).max(1e-6);
213
214 Ok((shape, scale))
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn empirical_percentile_returns_zero_for_empty_internal_samples() {
223 let empirical = EmpiricalCdf { samples: Vec::new() };
224 assert_eq!(empirical.percentile(0.5), 0.0);
225 }
226
227 #[test]
228 fn fit_gamma_clamps_non_finite_derived_parameters_to_safe_minimum() {
229 let workload = WorkloadConfig {
230 requests_per_second: 100.0,
231 latency_p50_ms: 10.0,
232 latency_p95_ms: 20.0,
233 latency_p99_ms: f64::INFINITY,
234 raw_samples_ms: None,
235 step_load_profile: None,
236 };
237
238 let (shape, scale) =
239 fit_gamma(&workload).expect("non-finite intermediates should clamp to safe values");
240 assert_eq!(shape, 1e-6);
241 assert_eq!(scale, 1e-6);
242 }
243}