Skip to main content

mockforge_bench/
chunked_bench.rs

1//! Native Rust chunked-encoding traffic generator.
2//!
3//! `mockforge bench --native-chunked` bypasses k6 entirely. Each worker opens
4//! its own HTTP connection and sends POST/PUT/PATCH requests with bodies
5//! streamed via `reqwest::Body::wrap_stream`. Because the body has no known
6//! `Content-Length`, hyper transports it as `Transfer-Encoding: chunked` —
7//! guaranteed, unlike the k6/Go path where the runtime decides based on body
8//! type.
9//!
10//! This is a small benchmark intended to exercise the *server's* chunked
11//! handling (slow consumers, max body size, partial-response chaos against
12//! chunked uploads). Not a k6 replacement for general load testing.
13//!
14//! ```no_run
15//! # use mockforge_bench::chunked_bench::{ChunkedBenchConfig, run};
16//! # use std::time::Duration;
17//! # use std::collections::HashMap;
18//! # async fn x() -> anyhow::Result<()> {
19//! let result = run(ChunkedBenchConfig {
20//!     target_url: "http://localhost:3000/upload".into(),
21//!     method: reqwest::Method::POST,
22//!     concurrency: 10,
23//!     duration: Duration::from_secs(60),
24//!     chunk_size_bytes: 1024,
25//!     total_size_bytes: 1024 * 1024,
26//!     chunk_interval_ms: 0,
27//!     headers: HashMap::new(),
28//!     skip_tls_verify: false,
29//! }).await?;
30//! println!("{} req/s", result.req_per_sec);
31//! # Ok(()) }
32//! ```
33
34use async_stream::stream;
35use futures::StreamExt;
36use std::{
37    collections::HashMap,
38    sync::{
39        atomic::{AtomicU64, Ordering},
40        Arc,
41    },
42    time::{Duration, Instant},
43};
44use tokio::sync::Mutex;
45
46/// Configuration for the native chunked-encoding bench.
47#[derive(Debug, Clone)]
48pub struct ChunkedBenchConfig {
49    /// Target URL (e.g. `http://localhost:3000/upload`).
50    pub target_url: String,
51    /// HTTP method. POST/PUT/PATCH make sense; GET/HEAD don't take a body.
52    pub method: reqwest::Method,
53    /// Number of concurrent workers (each holds its own connection / future).
54    pub concurrency: u32,
55    /// Total run duration.
56    pub duration: Duration,
57    /// Bytes per chunk emitted into the request body stream.
58    pub chunk_size_bytes: usize,
59    /// Total body size per request, in bytes.
60    pub total_size_bytes: usize,
61    /// Sleep between chunks, in milliseconds. 0 = back-to-back.
62    pub chunk_interval_ms: u64,
63    /// Extra headers to attach to every request. `Transfer-Encoding: chunked`
64    /// is set automatically by hyper because the body has no Content-Length.
65    pub headers: HashMap<String, String>,
66    /// Skip TLS certificate verification (useful for test self-signed certs).
67    pub skip_tls_verify: bool,
68}
69
70/// Aggregate result from a chunked bench run.
71#[derive(Debug, Clone)]
72pub struct ChunkedBenchResult {
73    pub total_requests: u64,
74    pub successful: u64,
75    pub failed: u64,
76    pub bytes_sent: u64,
77    pub elapsed: Duration,
78    pub req_per_sec: f64,
79    pub latencies_ms: Vec<u64>,
80    pub avg_latency_ms: f64,
81    pub p50_ms: u64,
82    pub p95_ms: u64,
83    pub p99_ms: u64,
84    pub status_counts: HashMap<u16, u64>,
85}
86
87/// Run the chunked-traffic bench. Spawns `concurrency` worker tasks that send
88/// chunked POSTs back-to-back until `duration` elapses, then aggregates stats.
89pub async fn run(cfg: ChunkedBenchConfig) -> anyhow::Result<ChunkedBenchResult> {
90    if cfg.chunk_size_bytes == 0 {
91        anyhow::bail!("chunk_size_bytes must be > 0");
92    }
93    if cfg.total_size_bytes == 0 {
94        anyhow::bail!("total_size_bytes must be > 0");
95    }
96    if cfg.concurrency == 0 {
97        anyhow::bail!("concurrency must be >= 1");
98    }
99
100    let client = reqwest::Client::builder()
101        .danger_accept_invalid_certs(cfg.skip_tls_verify)
102        .build()?;
103
104    let total_requests = Arc::new(AtomicU64::new(0));
105    let successful = Arc::new(AtomicU64::new(0));
106    let failed = Arc::new(AtomicU64::new(0));
107    let bytes_sent = Arc::new(AtomicU64::new(0));
108    let latencies: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::with_capacity(8192)));
109    let status_counts: Arc<Mutex<HashMap<u16, u64>>> = Arc::new(Mutex::new(HashMap::new()));
110
111    let deadline = Instant::now() + cfg.duration;
112    let started_at = Instant::now();
113
114    let mut workers = Vec::with_capacity(cfg.concurrency as usize);
115    for _ in 0..cfg.concurrency {
116        let cfg = cfg.clone();
117        let client = client.clone();
118        let total_requests = total_requests.clone();
119        let successful = successful.clone();
120        let failed = failed.clone();
121        let bytes_sent = bytes_sent.clone();
122        let latencies = latencies.clone();
123        let status_counts = status_counts.clone();
124
125        workers.push(tokio::spawn(async move {
126            while Instant::now() < deadline {
127                let req_started = Instant::now();
128                match send_one_chunked_request(&client, &cfg).await {
129                    Ok(status) => {
130                        successful.fetch_add(1, Ordering::Relaxed);
131                        bytes_sent.fetch_add(cfg.total_size_bytes as u64, Ordering::Relaxed);
132                        let elapsed_ms = req_started.elapsed().as_millis() as u64;
133                        latencies.lock().await.push(elapsed_ms);
134                        *status_counts.lock().await.entry(status).or_insert(0) += 1;
135                    }
136                    Err(_e) => {
137                        failed.fetch_add(1, Ordering::Relaxed);
138                    }
139                }
140                total_requests.fetch_add(1, Ordering::Relaxed);
141            }
142        }));
143    }
144
145    for w in workers {
146        let _ = w.await;
147    }
148
149    let elapsed = started_at.elapsed();
150    let total = total_requests.load(Ordering::Relaxed);
151    let mut samples: Vec<u64> = {
152        let mut g = latencies.lock().await;
153        std::mem::take(&mut *g)
154    };
155    let final_status_counts: HashMap<u16, u64> = {
156        let mut g = status_counts.lock().await;
157        std::mem::take(&mut *g)
158    };
159    samples.sort_unstable();
160    let avg = if samples.is_empty() {
161        0.0
162    } else {
163        samples.iter().copied().sum::<u64>() as f64 / samples.len() as f64
164    };
165    let p = |q: f64| -> u64 {
166        if samples.is_empty() {
167            return 0;
168        }
169        let idx = ((samples.len() as f64 - 1.0) * q).round() as usize;
170        samples[idx]
171    };
172
173    Ok(ChunkedBenchResult {
174        total_requests: total,
175        successful: successful.load(Ordering::Relaxed),
176        failed: failed.load(Ordering::Relaxed),
177        bytes_sent: bytes_sent.load(Ordering::Relaxed),
178        elapsed,
179        req_per_sec: if elapsed.as_secs_f64() > 0.0 {
180            total as f64 / elapsed.as_secs_f64()
181        } else {
182            0.0
183        },
184        avg_latency_ms: avg,
185        p50_ms: p(0.50),
186        p95_ms: p(0.95),
187        p99_ms: p(0.99),
188        latencies_ms: samples,
189        status_counts: final_status_counts,
190    })
191}
192
193async fn send_one_chunked_request(
194    client: &reqwest::Client,
195    cfg: &ChunkedBenchConfig,
196) -> anyhow::Result<u16> {
197    let chunk_size = cfg.chunk_size_bytes;
198    let total = cfg.total_size_bytes;
199    let interval_ms = cfg.chunk_interval_ms;
200
201    // Build a stream that yields fixed-size chunks until `total` bytes are
202    // emitted. No Content-Length is set on the request, so hyper transports
203    // the body as Transfer-Encoding: chunked.
204    let body_stream = stream! {
205        let mut sent: usize = 0;
206        let payload = vec![b'X'; chunk_size];
207        while sent < total {
208            let next = std::cmp::min(chunk_size, total - sent);
209            let chunk = payload[..next].to_vec();
210            sent += next;
211            if interval_ms > 0 && sent < total {
212                tokio::time::sleep(Duration::from_millis(interval_ms)).await;
213            }
214            yield Ok::<_, std::io::Error>(chunk);
215        }
216    };
217
218    let body = reqwest::Body::wrap_stream(body_stream.boxed());
219
220    let mut req = client.request(cfg.method.clone(), &cfg.target_url).body(body);
221    for (k, v) in &cfg.headers {
222        req = req.header(k, v);
223    }
224    let resp = req.send().await?;
225    Ok(resp.status().as_u16())
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[tokio::test]
233    async fn rejects_zero_concurrency() {
234        let cfg = ChunkedBenchConfig {
235            target_url: "http://127.0.0.1:1".into(),
236            method: reqwest::Method::POST,
237            concurrency: 0,
238            duration: Duration::from_millis(10),
239            chunk_size_bytes: 1024,
240            total_size_bytes: 4096,
241            chunk_interval_ms: 0,
242            headers: HashMap::new(),
243            skip_tls_verify: false,
244        };
245        assert!(run(cfg).await.is_err());
246    }
247
248    #[tokio::test]
249    async fn rejects_zero_chunk_size() {
250        let cfg = ChunkedBenchConfig {
251            target_url: "http://127.0.0.1:1".into(),
252            method: reqwest::Method::POST,
253            concurrency: 1,
254            duration: Duration::from_millis(10),
255            chunk_size_bytes: 0,
256            total_size_bytes: 4096,
257            chunk_interval_ms: 0,
258            headers: HashMap::new(),
259            skip_tls_verify: false,
260        };
261        assert!(run(cfg).await.is_err());
262    }
263}