1pub mod curve;
2pub mod error;
3pub mod fixed;
4pub mod rate_limit;
5
6pub use error::RunError;
7pub use rate_limit::RpsLimiter;
8
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::capture::CaptureDefinition;
15use crate::command::{Body, HttpMethod};
16use crate::config::secret::SensitiveString;
17use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
18use crate::http::{RequestConfig, RequestRecord};
19use crate::load_curve::LoadCurve;
20use crate::request_template::Template;
21use crate::response_template::ResponseTemplate;
22use crate::response_template::extractor::ExtractionResult;
23use crate::response_template::field::TrackedField;
24use crate::response_template::stats::ResponseStats;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum RunMode {
31 Fixed,
33 Curve,
35}
36
37#[derive(Debug, Clone)]
41pub struct TemplateStats {
42 pub generation_duration: std::time::Duration,
43}
44
45pub struct StageStats {
49 pub latency: LatencyHistogram,
50 pub status_codes: StatusCodeHistogram,
51 pub total_requests: u64,
52 pub total_failures: u64,
53}
54
55#[derive(Default)]
60pub struct RequestStats {
61 pub latency: LatencyHistogram,
62 pub status_codes: StatusCodeHistogram,
63 pub total_requests: u64,
64 pub total_failures: u64,
65 pub total_skipped: u64,
66}
67
68impl RequestStats {
69 pub fn record(&mut self, duration: Duration, success: bool, status_code: Option<u16>) {
70 self.total_requests += 1;
71 if !success {
72 self.total_failures += 1;
73 }
74 self.latency.record(duration);
75 self.status_codes.record(status_code);
76 }
77
78 pub fn record_skipped(&mut self) {
79 self.total_requests += 1;
80 self.total_skipped += 1;
81 }
82}
83
84pub struct ScenarioStepStats {
88 pub name: String,
89 pub requests: RequestStats,
90}
91
92pub struct ScenarioStats {
96 pub name: String,
97 pub requests: RequestStats,
98 pub steps: Vec<ScenarioStepStats>,
99}
100
101#[derive(Default)]
105struct ScenarioAccumulator {
106 requests: RequestStats,
107 steps: HashMap<Arc<str>, RequestStats>,
108}
109
110#[derive(Default)]
112struct ScenarioMetricsAccumulator {
113 scenarios: HashMap<Arc<str>, ScenarioAccumulator>,
114}
115
116impl ScenarioMetricsAccumulator {
117 fn record(
118 &mut self,
119 scenario: Option<&Arc<str>>,
120 step: Option<&Arc<str>>,
121 duration: Duration,
122 success: bool,
123 status_code: Option<u16>,
124 skipped: bool,
125 ) {
126 let Some(scenario_name) = scenario else {
127 return;
128 };
129
130 let scenario = self.scenarios.entry(Arc::clone(scenario_name)).or_default();
131
132 if skipped {
133 scenario.requests.record_skipped();
134 if let Some(step_name) = step {
135 scenario
136 .steps
137 .entry(Arc::clone(step_name))
138 .or_default()
139 .record_skipped();
140 }
141 } else {
142 scenario.requests.record(duration, success, status_code);
143 if let Some(step_name) = step {
144 scenario
145 .steps
146 .entry(Arc::clone(step_name))
147 .or_default()
148 .record(duration, success, status_code);
149 }
150 }
151 }
152
153 fn into_stats(self) -> Option<Vec<ScenarioStats>> {
155 let mut scenario_stats: Vec<ScenarioStats> = self
156 .scenarios
157 .into_iter()
158 .map(|(name, acc)| {
159 let mut steps: Vec<ScenarioStepStats> = acc
160 .steps
161 .into_iter()
162 .map(|(name, requests)| ScenarioStepStats {
163 name: name.to_string(),
164 requests,
165 })
166 .collect();
167 steps.sort_by(|a, b| a.name.cmp(&b.name));
168 ScenarioStats {
169 name: name.to_string(),
170 requests: acc.requests,
171 steps,
172 }
173 })
174 .collect();
175 scenario_stats.sort_by(|a, b| a.name.cmp(&b.name));
176
177 if scenario_stats.is_empty() {
178 None
179 } else {
180 Some(scenario_stats)
181 }
182 }
183}
184
185pub(crate) struct DrainMetricsAccumulator {
187 pub latency: LatencyHistogram,
188 pub status_codes: StatusCodeHistogram,
189 pub total_requests: u64,
190 pub total_failures: u64,
191 pub total_skipped: u64,
192 pub response_stats: Option<ResponseStats>,
193 scenario_metrics: ScenarioMetricsAccumulator,
194}
195
196impl DrainMetricsAccumulator {
197 pub fn new(has_tracked_fields: bool) -> Self {
198 Self {
199 latency: LatencyHistogram::new(),
200 status_codes: StatusCodeHistogram::new(),
201 total_requests: 0,
202 total_failures: 0,
203 total_skipped: 0,
204 response_stats: if has_tracked_fields {
205 Some(ResponseStats::new())
206 } else {
207 None
208 },
209 scenario_metrics: ScenarioMetricsAccumulator::default(),
210 }
211 }
212
213 pub fn record_request(&mut self, record: &RequestRecord) {
214 self.total_requests += 1;
215
216 if record.skipped {
217 self.total_skipped += 1;
218 } else {
219 if !record.success {
220 self.total_failures += 1;
221 }
222 self.latency.record(record.duration);
223 self.status_codes.record(record.status_code);
224 }
225
226 self.scenario_metrics.record(
227 record.scenario.as_ref(),
228 record.step.as_ref(),
229 record.duration,
230 record.success,
231 record.status_code,
232 record.skipped,
233 );
234 }
235
236 pub fn record_extraction(&mut self, extraction: Option<ExtractionResult>) {
237 if let Some(extraction) = extraction
238 && let Some(ref mut rs) = self.response_stats
239 {
240 rs.record(extraction);
241 }
242 }
243
244 pub fn finalize_scenario_stats(&mut self) -> Option<Vec<ScenarioStats>> {
245 std::mem::take(&mut self.scenario_metrics).into_stats()
246 }
247}
248
249pub struct CurveStats {
253 pub duration: std::time::Duration,
254 pub stages: Vec<crate::load_curve::Stage>,
255 pub stage_stats: Vec<StageStats>,
257}
258
259pub struct RunStats {
262 pub elapsed: std::time::Duration,
263 pub mode: RunMode,
264 pub latency: LatencyHistogram,
265 pub status_codes: StatusCodeHistogram,
266 pub total_requests: u64,
267 pub total_failures: u64,
268 pub total_skipped: u64,
269 pub template_stats: Option<TemplateStats>,
270 pub response_stats: Option<ResponseStats>,
271 pub curve_stats: Option<CurveStats>,
272 pub scenario_stats: Option<Vec<ScenarioStats>>,
274}
275
276#[derive(Debug, Clone, Copy, Default)]
280pub enum OnStepFailure {
281 #[default]
283 Continue,
284 AbortIteration,
286}
287
288pub struct ResolvedStep {
292 pub name: Arc<str>,
293 pub request_config: Arc<RequestConfig>,
294 pub plain_headers: Arc<Vec<(String, String)>>,
295 pub request_template: Option<Arc<Template>>,
296 pub response_template: Option<Arc<Vec<TrackedField>>>,
297 pub captures: Vec<CaptureDefinition>,
299 pub inline_body: Option<Arc<str>>,
301 pub has_capture_headers: bool,
303}
304
305pub struct ResolvedScenario {
309 pub name: Arc<str>,
310 pub weight: u32,
311 pub on_step_failure: OnStepFailure,
312 pub steps: Vec<ResolvedStep>,
313}
314
315pub enum RequestSpec {
319 Single {
321 host: String,
322 method: HttpMethod,
323 body: Option<Body>,
324 template_path: Option<PathBuf>,
325 response_template_path: Option<PathBuf>,
326 headers: Vec<(String, SensitiveString)>,
328 },
329 Scenarios(Vec<ResolvedScenario>),
331}
332
333pub enum ExecutionMode {
337 Fixed {
339 request_count: usize,
340 concurrency: usize,
341 rps: Option<usize>,
344 },
345 Curve {
347 curve: LoadCurve,
348 rps: Option<usize>,
351 },
352}
353
354pub fn assign_scenario(vu_index: usize, scenarios: &[ResolvedScenario]) -> usize {
371 let total_weight: u64 = scenarios
374 .iter()
375 .fold(0u64, |acc, s| acc.saturating_add(s.weight as u64));
376 if total_weight == 0 {
377 return vu_index % scenarios.len();
380 }
381 let slot = (vu_index as u64) % total_weight;
382 let mut cumulative: u64 = 0;
383 for (i, s) in scenarios.iter().enumerate() {
384 cumulative = cumulative.saturating_add(s.weight as u64);
385 if slot < cumulative {
386 return i;
387 }
388 }
389 scenarios.len() - 1
390}
391
392pub(crate) fn resolve_tracked_fields(
393 path: Option<PathBuf>,
394) -> Result<Option<Arc<Vec<TrackedField>>>, Box<dyn std::error::Error>> {
395 path.map(|p| {
396 ResponseTemplate::parse(&p)
397 .map(|rt| Arc::new(rt.fields))
398 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
399 })
400 .transpose()
401}
402
403pub(crate) fn build_request_config(
404 host: String,
405 method: HttpMethod,
406 body: Option<Body>,
407 tracked_fields: Option<Arc<Vec<TrackedField>>>,
408 headers: Vec<(String, SensitiveString)>,
409 concurrency: usize,
410) -> Result<Arc<RequestConfig>, RunError> {
411 let client = reqwest::Client::builder()
412 .pool_max_idle_per_host(concurrency)
413 .build()?;
414 Ok(Arc::new(RequestConfig {
415 client,
416 host: Arc::new(host),
417 method,
418 body: Arc::new(body),
419 tracked_fields,
420 headers: Arc::new(headers),
421 }))
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use std::sync::Arc;
428 use std::time::{Duration, Instant};
429
430 #[test]
433 fn scenario_record_none_stays_empty() {
434 let mut acc = ScenarioMetricsAccumulator::default();
435 acc.record(
436 None,
437 None,
438 Duration::from_millis(10),
439 true,
440 Some(200),
441 false,
442 );
443 assert!(acc.into_stats().is_none());
444 }
445
446 #[test]
447 fn scenario_empty_into_stats_returns_none() {
448 let acc = ScenarioMetricsAccumulator::default();
449 assert!(acc.into_stats().is_none());
450 }
451
452 #[test]
453 fn two_scenarios_same_step_name_independent() {
454 let mut acc = ScenarioMetricsAccumulator::default();
455
456 let scenario_a: Arc<str> = Arc::from("A");
457 let scenario_b: Arc<str> = Arc::from("B");
458 let step_login: Arc<str> = Arc::from("login");
459
460 acc.record(
462 Some(&scenario_a),
463 Some(&step_login),
464 Duration::from_millis(10),
465 true,
466 Some(200),
467 false,
468 );
469 acc.record(
470 Some(&scenario_a),
471 Some(&step_login),
472 Duration::from_millis(15),
473 true,
474 Some(200),
475 false,
476 );
477
478 acc.record(
480 Some(&scenario_b),
481 Some(&step_login),
482 Duration::from_millis(20),
483 true,
484 Some(200),
485 false,
486 );
487 acc.record(
488 Some(&scenario_b),
489 Some(&step_login),
490 Duration::from_millis(25),
491 true,
492 Some(200),
493 false,
494 );
495 acc.record(
496 Some(&scenario_b),
497 Some(&step_login),
498 Duration::from_millis(30),
499 true,
500 Some(200),
501 false,
502 );
503
504 let stats = acc.into_stats().expect("should have scenario stats");
505 assert_eq!(stats.len(), 2);
506
507 assert_eq!(stats[0].name, "A");
509 assert_eq!(stats[1].name, "B");
510
511 let a_login = stats[0]
512 .steps
513 .iter()
514 .find(|s| s.name == "login")
515 .expect("A should have login step");
516 assert_eq!(a_login.requests.total_requests, 2);
517
518 let b_login = stats[1]
519 .steps
520 .iter()
521 .find(|s| s.name == "login")
522 .expect("B should have login step");
523 assert_eq!(b_login.requests.total_requests, 3);
524 }
525
526 #[test]
529 fn drain_accumulator_no_scenario_labels() {
530 let mut acc = DrainMetricsAccumulator::new(false);
531
532 let record = RequestRecord {
533 duration: Duration::from_millis(50),
534 completed_at: Instant::now(),
535 success: true,
536 status_code: Some(200),
537 extraction: None,
538 scenario: None,
539 step: None,
540 skipped: false,
541 };
542
543 acc.record_request(&record);
544
545 assert_eq!(acc.total_requests, 1);
546 assert!(acc.finalize_scenario_stats().is_none());
547 }
548
549 #[test]
550 fn drain_accumulator_skipped_records() {
551 let mut acc = DrainMetricsAccumulator::new(false);
552
553 let scenario: Arc<str> = Arc::from("checkout");
554 let step: Arc<str> = Arc::from("pay");
555
556 let normal = RequestRecord {
558 duration: Duration::from_millis(50),
559 completed_at: Instant::now(),
560 success: true,
561 status_code: Some(200),
562 extraction: None,
563 scenario: Some(Arc::clone(&scenario)),
564 step: Some(Arc::clone(&step)),
565 skipped: false,
566 };
567 acc.record_request(&normal);
568
569 let skipped = RequestRecord {
571 duration: Duration::ZERO,
572 completed_at: Instant::now(),
573 success: false,
574 status_code: None,
575 extraction: None,
576 scenario: Some(Arc::clone(&scenario)),
577 step: Some(Arc::clone(&step)),
578 skipped: true,
579 };
580 acc.record_request(&skipped);
581
582 assert_eq!(acc.total_requests, 2);
583 assert_eq!(acc.total_failures, 0, "skipped records are not failures");
584 assert_eq!(acc.total_skipped, 1);
585
586 let scenarios = acc.finalize_scenario_stats().unwrap();
587 let checkout = &scenarios[0];
588 assert_eq!(checkout.requests.total_requests, 2);
589 assert_eq!(checkout.requests.total_skipped, 1);
590 assert_eq!(checkout.requests.total_failures, 0);
591
592 let pay_step = checkout.steps.iter().find(|s| s.name == "pay").unwrap();
593 assert_eq!(pay_step.requests.total_requests, 2);
594 assert_eq!(pay_step.requests.total_skipped, 1);
595 }
596
597 fn make_scenario(name: &str, weight: u32) -> ResolvedScenario {
600 ResolvedScenario {
601 name: Arc::from(name),
602 weight,
603 on_step_failure: OnStepFailure::Continue,
604 steps: vec![],
605 }
606 }
607
608 #[test]
609 fn assign_scenario_weighted() {
610 let scenarios = vec![make_scenario("A", 3), make_scenario("B", 1)];
613
614 let assignments: Vec<usize> = (0..8).map(|i| assign_scenario(i, &scenarios)).collect();
615
616 assert_eq!(assignments[0], 0);
618 assert_eq!(assignments[1], 0);
619 assert_eq!(assignments[2], 0);
620 assert_eq!(assignments[3], 1);
621 assert_eq!(assignments[4], 0);
623 assert_eq!(assignments[5], 0);
624 assert_eq!(assignments[6], 0);
625 assert_eq!(assignments[7], 1);
626 }
627
628 #[test]
629 fn assign_scenario_single() {
630 let scenarios = vec![make_scenario("only", 5)];
631 for i in 0..10 {
632 assert_eq!(assign_scenario(i, &scenarios), 0);
633 }
634 }
635
636 #[test]
637 fn assign_scenario_equal_weights() {
638 let scenarios = vec![make_scenario("A", 1), make_scenario("B", 1)];
639 for i in 0..8 {
641 assert_eq!(assign_scenario(i, &scenarios), i % 2);
642 }
643 }
644
645 #[test]
646 fn assign_scenario_u32_max_weights_does_not_panic() {
647 let scenarios = vec![make_scenario("A", u32::MAX), make_scenario("B", u32::MAX)];
654 let assignment = assign_scenario(0, &scenarios);
657 assert!(assignment < 2);
658 let assignment = assign_scenario(usize::MAX, &scenarios);
659 assert!(assignment < 2);
660 }
661
662 #[test]
663 fn assign_scenario_zero_weights_falls_back_to_round_robin() {
664 let scenarios = vec![make_scenario("A", 0), make_scenario("B", 0)];
668 assert_eq!(assign_scenario(0, &scenarios), 0);
669 assert_eq!(assign_scenario(1, &scenarios), 1);
670 assert_eq!(assign_scenario(2, &scenarios), 0);
671 assert_eq!(assign_scenario(3, &scenarios), 1);
672 }
673}