poolsim_core/
monte_carlo.rs1use rand::{rngs::StdRng, Rng, SeedableRng};
16use rand_distr::{Distribution, Exp};
17
18use crate::{
19 distribution::LatencyDistribution, error::PoolsimError, types::QueueModel,
20 types::SimulationOptions, WorkloadConfig,
21};
22
23#[derive(Debug, Clone)]
25pub struct MonteCarloResult {
26 pub wait_times_ms: Vec<f64>,
28 pub p50: f64,
30 pub p95: f64,
32 pub p99: f64,
34 pub mean: f64,
36}
37
38pub fn run(
45 workload: &WorkloadConfig,
46 pool_size: u32,
47 dist: &LatencyDistribution,
48 opts: &SimulationOptions,
49) -> Result<MonteCarloResult, PoolsimError> {
50 run_with_overhead(workload, pool_size, 0.0, dist, opts)
51}
52
53pub(crate) fn run_with_overhead(
54 workload: &WorkloadConfig,
55 pool_size: u32,
56 connection_overhead_ms: f64,
57 dist: &LatencyDistribution,
58 opts: &SimulationOptions,
59) -> Result<MonteCarloResult, PoolsimError> {
60 if pool_size == 0 {
61 return Err(PoolsimError::invalid_input(
62 "INVALID_POOL_SIZE",
63 "pool_size must be > 0",
64 None,
65 ));
66 }
67
68 let iterations = opts.iterations as usize;
69 let lambda = workload.requests_per_second;
70 let base_seed = opts.seed.unwrap_or_else(rand::random::<u64>);
71 let deterministic_service_ms = dist.mean_ms() + connection_overhead_ms;
72
73 #[cfg(feature = "parallel")]
74 let waits = {
75 use rayon::prelude::*;
76
77 let workers = rayon::current_num_threads().max(1);
78 let chunk_count = workers.min(iterations.max(1));
79 let chunk_size = iterations.div_ceil(chunk_count);
80
81 (0..chunk_count)
82 .into_par_iter()
83 .map(|chunk_id| {
84 let start = chunk_id * chunk_size;
85 let end = ((chunk_id + 1) * chunk_size).min(iterations);
86 if start >= end {
87 return Vec::new();
88 }
89
90 let seed = base_seed ^ ((chunk_id as u64 + 1).wrapping_mul(0x9E37_79B9_7F4A_7C15));
91 let mut rng = StdRng::seed_from_u64(seed);
92 simulate_chunk(
93 &mut rng,
94 end - start,
95 lambda,
96 pool_size,
97 connection_overhead_ms,
98 deterministic_service_ms,
99 opts.queue_model,
100 dist,
101 )
102 })
103 .reduce(Vec::new, |mut left, mut right| {
104 left.append(&mut right);
105 left
106 })
107 };
108
109 #[cfg(not(feature = "parallel"))]
110 let waits = {
111 let mut rng = StdRng::seed_from_u64(base_seed);
112 simulate_chunk(
113 &mut rng,
114 iterations,
115 lambda,
116 pool_size,
117 connection_overhead_ms,
118 deterministic_service_ms,
119 opts.queue_model,
120 dist,
121 )
122 };
123
124 build_result(waits)
125}
126
127fn simulate_chunk<R: Rng + ?Sized>(
128 rng: &mut R,
129 iterations: usize,
130 lambda: f64,
131 pool_size: u32,
132 connection_overhead_ms: f64,
133 deterministic_service_ms: f64,
134 queue_model: QueueModel,
135 dist: &LatencyDistribution,
136) -> Vec<f64> {
137 let mut waits = Vec::with_capacity(iterations);
138 let mut arrival_time_s = 0.0;
139 let mut server_free_at = vec![0.0f64; pool_size as usize];
140
141 let inter_arrival = Exp::new(lambda).expect("lambda > 0 for exponential inter-arrival");
142
143 for _ in 0..iterations {
144 arrival_time_s += inter_arrival.sample(rng);
145
146 let mut min_idx = 0usize;
147 let mut min_free = server_free_at[0];
148 for (idx, free_at) in server_free_at.iter().copied().enumerate().skip(1) {
149 if free_at < min_free {
150 min_idx = idx;
151 min_free = free_at;
152 }
153 }
154
155 let wait_s = (min_free - arrival_time_s).max(0.0);
156 let service_ms = match queue_model {
157 QueueModel::MMC => dist.sample_ms(rng) + connection_overhead_ms,
158 QueueModel::MDC => deterministic_service_ms,
159 };
160 let service_s = service_ms / 1_000.0;
161
162 server_free_at[min_idx] = arrival_time_s + wait_s + service_s;
163 waits.push(wait_s * 1_000.0);
164 }
165
166 waits
167}
168
169fn build_result(mut waits: Vec<f64>) -> Result<MonteCarloResult, PoolsimError> {
170 if waits.is_empty() {
171 return Err(PoolsimError::Simulation(
172 "no wait times were generated during simulation".to_string(),
173 ));
174 }
175
176 let mean = waits.iter().sum::<f64>() / waits.len() as f64;
177 waits.sort_by(|a, b| a.total_cmp(b));
178
179 Ok(MonteCarloResult {
180 p50: percentile(&waits, 0.50),
181 p95: percentile(&waits, 0.95),
182 p99: percentile(&waits, 0.99),
183 mean,
184 wait_times_ms: waits,
185 })
186}
187
188fn percentile(sorted: &[f64], p: f64) -> f64 {
189 if sorted.is_empty() {
190 return 0.0;
191 }
192 let p = p.clamp(0.0, 1.0);
193 let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
194 sorted[idx]
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn percentile_returns_zero_for_empty_input() {
203 assert_eq!(percentile(&[], 0.5), 0.0);
204 }
205}