mockforge_bench/
chunked_bench.rs1use async_stream::stream;
35use futures::StreamExt;
36use std::{
37 collections::HashMap,
38 sync::{
39 atomic::{AtomicU64, Ordering},
40 Arc,
41 },
42 time::{Duration, Instant},
43};
44use tokio::sync::Mutex;
45
46#[derive(Debug, Clone)]
48pub struct ChunkedBenchConfig {
49 pub target_url: String,
51 pub method: reqwest::Method,
53 pub concurrency: u32,
55 pub duration: Duration,
57 pub chunk_size_bytes: usize,
59 pub total_size_bytes: usize,
61 pub chunk_interval_ms: u64,
63 pub headers: HashMap<String, String>,
66 pub skip_tls_verify: bool,
68}
69
70#[derive(Debug, Clone)]
72pub struct ChunkedBenchResult {
73 pub total_requests: u64,
74 pub successful: u64,
75 pub failed: u64,
76 pub bytes_sent: u64,
77 pub elapsed: Duration,
78 pub req_per_sec: f64,
79 pub latencies_ms: Vec<u64>,
80 pub avg_latency_ms: f64,
81 pub p50_ms: u64,
82 pub p95_ms: u64,
83 pub p99_ms: u64,
84 pub status_counts: HashMap<u16, u64>,
85}
86
87pub async fn run(cfg: ChunkedBenchConfig) -> anyhow::Result<ChunkedBenchResult> {
90 if cfg.chunk_size_bytes == 0 {
91 anyhow::bail!("chunk_size_bytes must be > 0");
92 }
93 if cfg.total_size_bytes == 0 {
94 anyhow::bail!("total_size_bytes must be > 0");
95 }
96 if cfg.concurrency == 0 {
97 anyhow::bail!("concurrency must be >= 1");
98 }
99
100 let client = reqwest::Client::builder()
101 .danger_accept_invalid_certs(cfg.skip_tls_verify)
102 .build()?;
103
104 let total_requests = Arc::new(AtomicU64::new(0));
105 let successful = Arc::new(AtomicU64::new(0));
106 let failed = Arc::new(AtomicU64::new(0));
107 let bytes_sent = Arc::new(AtomicU64::new(0));
108 let latencies: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::with_capacity(8192)));
109 let status_counts: Arc<Mutex<HashMap<u16, u64>>> = Arc::new(Mutex::new(HashMap::new()));
110
111 let deadline = Instant::now() + cfg.duration;
112 let started_at = Instant::now();
113
114 let mut workers = Vec::with_capacity(cfg.concurrency as usize);
115 for _ in 0..cfg.concurrency {
116 let cfg = cfg.clone();
117 let client = client.clone();
118 let total_requests = total_requests.clone();
119 let successful = successful.clone();
120 let failed = failed.clone();
121 let bytes_sent = bytes_sent.clone();
122 let latencies = latencies.clone();
123 let status_counts = status_counts.clone();
124
125 workers.push(tokio::spawn(async move {
126 while Instant::now() < deadline {
127 let req_started = Instant::now();
128 match send_one_chunked_request(&client, &cfg).await {
129 Ok(status) => {
130 successful.fetch_add(1, Ordering::Relaxed);
131 bytes_sent.fetch_add(cfg.total_size_bytes as u64, Ordering::Relaxed);
132 let elapsed_ms = req_started.elapsed().as_millis() as u64;
133 latencies.lock().await.push(elapsed_ms);
134 *status_counts.lock().await.entry(status).or_insert(0) += 1;
135 }
136 Err(_e) => {
137 failed.fetch_add(1, Ordering::Relaxed);
138 }
139 }
140 total_requests.fetch_add(1, Ordering::Relaxed);
141 }
142 }));
143 }
144
145 for w in workers {
146 let _ = w.await;
147 }
148
149 let elapsed = started_at.elapsed();
150 let total = total_requests.load(Ordering::Relaxed);
151 let mut samples: Vec<u64> = {
152 let mut g = latencies.lock().await;
153 std::mem::take(&mut *g)
154 };
155 let final_status_counts: HashMap<u16, u64> = {
156 let mut g = status_counts.lock().await;
157 std::mem::take(&mut *g)
158 };
159 samples.sort_unstable();
160 let avg = if samples.is_empty() {
161 0.0
162 } else {
163 samples.iter().copied().sum::<u64>() as f64 / samples.len() as f64
164 };
165 let p = |q: f64| -> u64 {
166 if samples.is_empty() {
167 return 0;
168 }
169 let idx = ((samples.len() as f64 - 1.0) * q).round() as usize;
170 samples[idx]
171 };
172
173 Ok(ChunkedBenchResult {
174 total_requests: total,
175 successful: successful.load(Ordering::Relaxed),
176 failed: failed.load(Ordering::Relaxed),
177 bytes_sent: bytes_sent.load(Ordering::Relaxed),
178 elapsed,
179 req_per_sec: if elapsed.as_secs_f64() > 0.0 {
180 total as f64 / elapsed.as_secs_f64()
181 } else {
182 0.0
183 },
184 avg_latency_ms: avg,
185 p50_ms: p(0.50),
186 p95_ms: p(0.95),
187 p99_ms: p(0.99),
188 latencies_ms: samples,
189 status_counts: final_status_counts,
190 })
191}
192
193async fn send_one_chunked_request(
194 client: &reqwest::Client,
195 cfg: &ChunkedBenchConfig,
196) -> anyhow::Result<u16> {
197 let chunk_size = cfg.chunk_size_bytes;
198 let total = cfg.total_size_bytes;
199 let interval_ms = cfg.chunk_interval_ms;
200
201 let body_stream = stream! {
205 let mut sent: usize = 0;
206 let payload = vec![b'X'; chunk_size];
207 while sent < total {
208 let next = std::cmp::min(chunk_size, total - sent);
209 let chunk = payload[..next].to_vec();
210 sent += next;
211 if interval_ms > 0 && sent < total {
212 tokio::time::sleep(Duration::from_millis(interval_ms)).await;
213 }
214 yield Ok::<_, std::io::Error>(chunk);
215 }
216 };
217
218 let body = reqwest::Body::wrap_stream(body_stream.boxed());
219
220 let mut req = client.request(cfg.method.clone(), &cfg.target_url).body(body);
221 for (k, v) in &cfg.headers {
222 req = req.header(k, v);
223 }
224 let resp = req.send().await?;
225 Ok(resp.status().as_u16())
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[tokio::test]
233 async fn rejects_zero_concurrency() {
234 let cfg = ChunkedBenchConfig {
235 target_url: "http://127.0.0.1:1".into(),
236 method: reqwest::Method::POST,
237 concurrency: 0,
238 duration: Duration::from_millis(10),
239 chunk_size_bytes: 1024,
240 total_size_bytes: 4096,
241 chunk_interval_ms: 0,
242 headers: HashMap::new(),
243 skip_tls_verify: false,
244 };
245 assert!(run(cfg).await.is_err());
246 }
247
248 #[tokio::test]
249 async fn rejects_zero_chunk_size() {
250 let cfg = ChunkedBenchConfig {
251 target_url: "http://127.0.0.1:1".into(),
252 method: reqwest::Method::POST,
253 concurrency: 1,
254 duration: Duration::from_millis(10),
255 chunk_size_bytes: 0,
256 total_size_bytes: 4096,
257 chunk_interval_ms: 0,
258 headers: HashMap::new(),
259 skip_tls_verify: false,
260 };
261 assert!(run(cfg).await.is_err());
262 }
263}