Skip to main content

poolsim_core/
distribution.rs

1//! Distribution fitting and sampling for workload latency inputs.
2//!
3//! This module converts percentile-based or sample-based latency observations
4//! into a service-time distribution usable by the optimizer and simulation
5//! layers.
6//!
7//! Main entrypoints:
8//!
9//! - [`LatencyDistribution::fit`]
10//! - [`LatencyDistribution::sample_ms`]
11//! - [`LatencyDistribution::percentile_ms`]
12//! - [`LatencyDistribution::mean_ms`]
13//!
14//! Selection rules:
15//!
16//! - percentile-only workloads fit one of the supported parametric models
17//! - `raw_samples_ms` forces empirical fitting regardless of the requested model
18//! - fitted distributions are reused by Monte Carlo and queueing helpers
19
20use rand::Rng;
21use rand_distr::{Distribution as RandDistribution, Exp, Gamma as RandGamma, LogNormal as RandLogNormal};
22use statrs::distribution::{ContinuousCDF, Gamma as StatGamma, LogNormal as StatLogNormal, Normal};
23
24use crate::{
25    error::PoolsimError,
26    types::{DistributionModel, WorkloadConfig},
27};
28
29/// Empirical cumulative distribution built from raw latency samples.
30#[derive(Debug, Clone)]
31pub struct EmpiricalCdf {
32    samples: Vec<f64>,
33}
34
35impl EmpiricalCdf {
36    fn new(mut samples: Vec<f64>) -> Result<Self, PoolsimError> {
37        if samples.is_empty() {
38            return Err(PoolsimError::Distribution(
39                "empirical distribution requires at least one sample".to_string(),
40            ));
41        }
42        samples.sort_by(|a, b| a.total_cmp(b));
43        Ok(Self { samples })
44    }
45
46    fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
47        let idx = rng.gen_range(0..self.samples.len());
48        self.samples[idx]
49    }
50
51    fn percentile(&self, p: f64) -> f64 {
52        if self.samples.is_empty() {
53            return 0.0;
54        }
55        let p = p.clamp(0.0, 1.0);
56        let idx = ((self.samples.len() - 1) as f64 * p).round() as usize;
57        self.samples[idx]
58    }
59
60    fn mean(&self) -> f64 {
61        self.samples.iter().sum::<f64>() / self.samples.len() as f64
62    }
63}
64
65/// Service-time distribution used during simulation and queue estimation.
66#[derive(Debug, Clone)]
67pub enum LatencyDistribution {
68    /// Log-normal distribution parameters.
69    LogNormal {
70        /// Log-space mean.
71        mu: f64,
72        /// Log-space standard deviation.
73        sigma: f64,
74    },
75    /// Exponential distribution with the provided mean.
76    Exponential {
77        /// Mean service time in milliseconds.
78        mean_ms: f64,
79    },
80    /// Empirical distribution sampled directly from input samples.
81    Empirical(EmpiricalCdf),
82    /// Gamma distribution parameters.
83    Gamma {
84        /// Shape parameter (k).
85        shape: f64,
86        /// Scale parameter (theta).
87        scale: f64,
88    },
89}
90
91impl LatencyDistribution {
92    /// Fits a latency distribution for a workload and selected model.
93    ///
94    /// If `workload.raw_samples_ms` is present, empirical fitting is used regardless
95    /// of `model`.
96    ///
97    /// # Errors
98    ///
99    /// Returns [`PoolsimError::Distribution`] when distribution parameters cannot
100    /// be derived.
101    pub fn fit(workload: &WorkloadConfig, model: DistributionModel) -> Result<Self, PoolsimError> {
102        if let Some(raw_samples) = &workload.raw_samples_ms {
103            return EmpiricalCdf::new(raw_samples.clone()).map(Self::Empirical);
104        }
105
106        match model {
107            DistributionModel::LogNormal | DistributionModel::Empirical => {
108                let (mu, sigma) = fit_lognormal(workload)?;
109                Ok(Self::LogNormal { mu, sigma })
110            }
111            DistributionModel::Exponential => {
112                let mean_ms = workload.latency_p50_ms / std::f64::consts::LN_2;
113                Ok(Self::Exponential { mean_ms })
114            }
115            DistributionModel::Gamma => {
116                let (shape, scale) = fit_gamma(workload)?;
117                Ok(Self::Gamma { shape, scale })
118            }
119        }
120    }
121
122    /// Draws one latency sample in milliseconds.
123    pub fn sample_ms<R: Rng + ?Sized>(&self, rng: &mut R) -> f64 {
124        match self {
125            Self::LogNormal { mu, sigma } => {
126                let dist = RandLogNormal::new(*mu, *sigma).expect("valid lognormal params");
127                dist.sample(rng)
128            }
129            Self::Exponential { mean_ms } => {
130                let dist = Exp::new(1.0 / mean_ms).expect("valid exponential rate");
131                dist.sample(rng)
132            }
133            Self::Empirical(empirical) => empirical.sample(rng),
134            Self::Gamma { shape, scale } => {
135                let dist = RandGamma::new(*shape, *scale).expect("valid gamma params");
136                dist.sample(rng)
137            }
138        }
139    }
140
141    /// Returns the requested percentile in milliseconds.
142    ///
143    /// `p` is clamped into `[0, 1]`.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`PoolsimError::Distribution`] when percentile evaluation fails for
148    /// the current parameterization.
149    pub fn percentile_ms(&self, p: f64) -> Result<f64, PoolsimError> {
150        let p = p.clamp(0.0, 1.0);
151        match self {
152            Self::LogNormal { mu, sigma } => {
153                let dist =
154                    StatLogNormal::new(*mu, *sigma).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
155                Ok(dist.inverse_cdf(p))
156            }
157            Self::Exponential { mean_ms } => Ok(-mean_ms * (1.0 - p).ln()),
158            Self::Empirical(empirical) => Ok(empirical.percentile(p)),
159            Self::Gamma { shape, scale } => {
160                let dist =
161                    StatGamma::new(*shape, *scale).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
162                Ok(dist.inverse_cdf(p))
163            }
164        }
165    }
166
167    /// Returns the mean service time in milliseconds.
168    pub fn mean_ms(&self) -> f64 {
169        match self {
170            Self::LogNormal { mu, sigma } => (mu + 0.5 * sigma * sigma).exp(),
171            Self::Exponential { mean_ms } => *mean_ms,
172            Self::Empirical(empirical) => empirical.mean(),
173            Self::Gamma { shape, scale } => shape * scale,
174        }
175    }
176}
177
178fn fit_lognormal(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
179    let mu = workload.latency_p50_ms.ln();
180    let normal = Normal::new(0.0, 1.0).map_err(|e| PoolsimError::Distribution(e.to_string()))?;
181
182    let mut sigmas = Vec::new();
183    if workload.latency_p95_ms > workload.latency_p50_ms {
184        let z95 = normal.inverse_cdf(0.95);
185        sigmas.push((workload.latency_p95_ms / workload.latency_p50_ms).ln() / z95);
186    }
187    if workload.latency_p99_ms > workload.latency_p50_ms {
188        let z99 = normal.inverse_cdf(0.99);
189        sigmas.push((workload.latency_p99_ms / workload.latency_p50_ms).ln() / z99);
190    }
191
192    let sigma = sigmas
193        .into_iter()
194        .filter(|s| s.is_finite() && *s > 0.0)
195        .sum::<f64>();
196
197    if sigma <= 0.0 {
198        return Err(PoolsimError::Distribution(
199            "unable to derive positive lognormal sigma from percentiles".to_string(),
200        ));
201    }
202
203    let count = if workload.latency_p99_ms > workload.latency_p50_ms { 2.0 } else { 1.0 };
204    Ok((mu, sigma / count))
205}
206
207fn fit_gamma(workload: &WorkloadConfig) -> Result<(f64, f64), PoolsimError> {
208    let mean = (workload.latency_p50_ms + workload.latency_p95_ms + workload.latency_p99_ms) / 3.0;
209    let std_est = ((workload.latency_p99_ms - workload.latency_p50_ms) / 2.326_347_874).max(1e-6);
210    let var = std_est * std_est;
211    let shape = (mean * mean / var).max(1e-6);
212    let scale = (var / mean).max(1e-6);
213
214    Ok((shape, scale))
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn empirical_percentile_returns_zero_for_empty_internal_samples() {
223        let empirical = EmpiricalCdf { samples: Vec::new() };
224        assert_eq!(empirical.percentile(0.5), 0.0);
225    }
226
227    #[test]
228    fn fit_gamma_clamps_non_finite_derived_parameters_to_safe_minimum() {
229        let workload = WorkloadConfig {
230            requests_per_second: 100.0,
231            latency_p50_ms: 10.0,
232            latency_p95_ms: 20.0,
233            latency_p99_ms: f64::INFINITY,
234            raw_samples_ms: None,
235            step_load_profile: None,
236        };
237
238        let (shape, scale) =
239            fit_gamma(&workload).expect("non-finite intermediates should clamp to safe values");
240        assert_eq!(shape, 1e-6);
241        assert_eq!(scale, 1e-6);
242    }
243}