Skip to main content

poolsim_core/
monte_carlo.rs

1//! Monte Carlo queue simulation primitives.
2//!
3//! This module is used when `poolsim` needs a sampled queue-wait distribution
4//! rather than only analytical queueing metrics.
5//!
6//! Typical uses:
7//!
8//! - full simulation through crate-root [`crate::simulate`]
9//! - fixed-size evaluation through [`crate::evaluate`]
10//! - M/D/c probing during optimization and sensitivity analysis
11//!
12//! The public entrypoint is [`crate::monte_carlo::run`], which returns
13//! [`crate::monte_carlo::MonteCarloResult`].
14
15use rand::{rngs::StdRng, Rng, SeedableRng};
16use rand_distr::{Distribution, Exp};
17
18use crate::{
19    distribution::LatencyDistribution, error::PoolsimError, types::QueueModel,
20    types::SimulationOptions, WorkloadConfig,
21};
22
23/// Monte Carlo queue-wait summary and raw sampled waits.
24#[derive(Debug, Clone)]
25pub struct MonteCarloResult {
26    /// Sorted per-request queue waits in milliseconds.
27    pub wait_times_ms: Vec<f64>,
28    /// p50 queue wait (milliseconds).
29    pub p50: f64,
30    /// p95 queue wait (milliseconds).
31    pub p95: f64,
32    /// p99 queue wait (milliseconds).
33    pub p99: f64,
34    /// Mean queue wait (milliseconds).
35    pub mean: f64,
36}
37
38/// Runs Monte Carlo queue-wait simulation.
39///
40/// # Errors
41///
42/// Returns [`PoolsimError::InvalidInput`] for invalid pool size, and
43/// [`PoolsimError::Simulation`] when no wait samples are produced.
44pub fn run(
45    workload: &WorkloadConfig,
46    pool_size: u32,
47    dist: &LatencyDistribution,
48    opts: &SimulationOptions,
49) -> Result<MonteCarloResult, PoolsimError> {
50    run_with_overhead(workload, pool_size, 0.0, dist, opts)
51}
52
53pub(crate) fn run_with_overhead(
54    workload: &WorkloadConfig,
55    pool_size: u32,
56    connection_overhead_ms: f64,
57    dist: &LatencyDistribution,
58    opts: &SimulationOptions,
59) -> Result<MonteCarloResult, PoolsimError> {
60    if pool_size == 0 {
61        return Err(PoolsimError::invalid_input(
62            "INVALID_POOL_SIZE",
63            "pool_size must be > 0",
64            None,
65        ));
66    }
67
68    let iterations = opts.iterations as usize;
69    let lambda = workload.requests_per_second;
70    let base_seed = opts.seed.unwrap_or_else(rand::random::<u64>);
71    let deterministic_service_ms = dist.mean_ms() + connection_overhead_ms;
72
73    #[cfg(feature = "parallel")]
74    let waits = {
75        use rayon::prelude::*;
76
77        let workers = rayon::current_num_threads().max(1);
78        let chunk_count = workers.min(iterations.max(1));
79        let chunk_size = iterations.div_ceil(chunk_count);
80
81        (0..chunk_count)
82            .into_par_iter()
83            .map(|chunk_id| {
84                let start = chunk_id * chunk_size;
85                let end = ((chunk_id + 1) * chunk_size).min(iterations);
86                if start >= end {
87                    return Vec::new();
88                }
89
90                let seed = base_seed ^ ((chunk_id as u64 + 1).wrapping_mul(0x9E37_79B9_7F4A_7C15));
91                let mut rng = StdRng::seed_from_u64(seed);
92                simulate_chunk(
93                    &mut rng,
94                    end - start,
95                    lambda,
96                    pool_size,
97                    connection_overhead_ms,
98                    deterministic_service_ms,
99                    opts.queue_model,
100                    dist,
101                )
102            })
103            .reduce(Vec::new, |mut left, mut right| {
104                left.append(&mut right);
105                left
106            })
107    };
108
109    #[cfg(not(feature = "parallel"))]
110    let waits = {
111        let mut rng = StdRng::seed_from_u64(base_seed);
112        simulate_chunk(
113            &mut rng,
114            iterations,
115            lambda,
116            pool_size,
117            connection_overhead_ms,
118            deterministic_service_ms,
119            opts.queue_model,
120            dist,
121        )
122    };
123
124    build_result(waits)
125}
126
127fn simulate_chunk<R: Rng + ?Sized>(
128    rng: &mut R,
129    iterations: usize,
130    lambda: f64,
131    pool_size: u32,
132    connection_overhead_ms: f64,
133    deterministic_service_ms: f64,
134    queue_model: QueueModel,
135    dist: &LatencyDistribution,
136) -> Vec<f64> {
137    let mut waits = Vec::with_capacity(iterations);
138    let mut arrival_time_s = 0.0;
139    let mut server_free_at = vec![0.0f64; pool_size as usize];
140
141    let inter_arrival = Exp::new(lambda).expect("lambda > 0 for exponential inter-arrival");
142
143    for _ in 0..iterations {
144        arrival_time_s += inter_arrival.sample(rng);
145
146        let mut min_idx = 0usize;
147        let mut min_free = server_free_at[0];
148        for (idx, free_at) in server_free_at.iter().copied().enumerate().skip(1) {
149            if free_at < min_free {
150                min_idx = idx;
151                min_free = free_at;
152            }
153        }
154
155        let wait_s = (min_free - arrival_time_s).max(0.0);
156        let service_ms = match queue_model {
157            QueueModel::MMC => dist.sample_ms(rng) + connection_overhead_ms,
158            QueueModel::MDC => deterministic_service_ms,
159        };
160        let service_s = service_ms / 1_000.0;
161
162        server_free_at[min_idx] = arrival_time_s + wait_s + service_s;
163        waits.push(wait_s * 1_000.0);
164    }
165
166    waits
167}
168
169fn build_result(mut waits: Vec<f64>) -> Result<MonteCarloResult, PoolsimError> {
170    if waits.is_empty() {
171        return Err(PoolsimError::Simulation(
172            "no wait times were generated during simulation".to_string(),
173        ));
174    }
175
176    let mean = waits.iter().sum::<f64>() / waits.len() as f64;
177    waits.sort_by(|a, b| a.total_cmp(b));
178
179    Ok(MonteCarloResult {
180        p50: percentile(&waits, 0.50),
181        p95: percentile(&waits, 0.95),
182        p99: percentile(&waits, 0.99),
183        mean,
184        wait_times_ms: waits,
185    })
186}
187
188fn percentile(sorted: &[f64], p: f64) -> f64 {
189    if sorted.is_empty() {
190        return 0.0;
191    }
192    let p = p.clamp(0.0, 1.0);
193    let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
194    sorted[idx]
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn percentile_returns_zero_for_empty_input() {
203        assert_eq!(percentile(&[], 0.5), 0.0);
204    }
205}