Skip to main content

lmn_core/execution/
mod.rs

1pub mod curve;
2pub mod error;
3pub mod fixed;
4pub mod rate_limit;
5
6pub use error::RunError;
7pub use rate_limit::RpsLimiter;
8
9use std::collections::HashMap;
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use crate::capture::CaptureDefinition;
15use crate::command::{Body, HttpMethod};
16use crate::config::secret::SensitiveString;
17use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
18use crate::http::{RequestConfig, RequestRecord};
19use crate::load_curve::LoadCurve;
20use crate::request_template::Template;
21use crate::response_template::ResponseTemplate;
22use crate::response_template::extractor::ExtractionResult;
23use crate::response_template::field::TrackedField;
24use crate::response_template::stats::ResponseStats;
25
26// ── RunMode ───────────────────────────────────────────────────────────────────
27
28/// Indicates which execution strategy produced the run results.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum RunMode {
31    /// Worker-pool fixed-count mode: N long-lived VUs share an atomic request budget.
32    Fixed,
33    /// Time-based dynamic VU mode driven by a `LoadCurve`.
34    Curve,
35}
36
37// ── TemplateStats ─────────────────────────────────────────────────────────────
38
39/// Timing information for request template generation.
40#[derive(Debug, Clone)]
41pub struct TemplateStats {
42    pub generation_duration: std::time::Duration,
43}
44
45// ── StageStats ────────────────────────────────────────────────────────────────
46
47/// Per-stage latency and request counts for curve-mode runs.
48pub struct StageStats {
49    pub latency: LatencyHistogram,
50    pub status_codes: StatusCodeHistogram,
51    pub total_requests: u64,
52    pub total_failures: u64,
53}
54
55// ── RequestStats ──────────────────────────────────────────────────────────────
56
57/// Generic request-metric container reused for scenario-level and step-level
58/// request summaries.
59#[derive(Default)]
60pub struct RequestStats {
61    pub latency: LatencyHistogram,
62    pub status_codes: StatusCodeHistogram,
63    pub total_requests: u64,
64    pub total_failures: u64,
65    pub total_skipped: u64,
66}
67
68impl RequestStats {
69    pub fn record(&mut self, duration: Duration, success: bool, status_code: Option<u16>) {
70        self.total_requests += 1;
71        if !success {
72            self.total_failures += 1;
73        }
74        self.latency.record(duration);
75        self.status_codes.record(status_code);
76    }
77
78    pub fn record_skipped(&mut self) {
79        self.total_requests += 1;
80        self.total_skipped += 1;
81    }
82}
83
84// ── ScenarioStepStats ─────────────────────────────────────────────────────────
85
86/// Per-step request metrics inside a scenario.
87pub struct ScenarioStepStats {
88    pub name: String,
89    pub requests: RequestStats,
90}
91
92// ── ScenarioStats ─────────────────────────────────────────────────────────────
93
94/// Per-scenario request metrics and nested step breakdowns.
95pub struct ScenarioStats {
96    pub name: String,
97    pub requests: RequestStats,
98    pub steps: Vec<ScenarioStepStats>,
99}
100
101// ── Shared drain accumulators ────────────────────────────────────────────────
102
103/// Internal accumulator for one scenario while draining request records.
104#[derive(Default)]
105struct ScenarioAccumulator {
106    requests: RequestStats,
107    steps: HashMap<Arc<str>, RequestStats>,
108}
109
110/// Internal scenario/step metrics accumulator shared across executors.
111#[derive(Default)]
112struct ScenarioMetricsAccumulator {
113    scenarios: HashMap<Arc<str>, ScenarioAccumulator>,
114}
115
116impl ScenarioMetricsAccumulator {
117    fn record(
118        &mut self,
119        scenario: Option<&Arc<str>>,
120        step: Option<&Arc<str>>,
121        duration: Duration,
122        success: bool,
123        status_code: Option<u16>,
124        skipped: bool,
125    ) {
126        let Some(scenario_name) = scenario else {
127            return;
128        };
129
130        let scenario = self.scenarios.entry(Arc::clone(scenario_name)).or_default();
131
132        if skipped {
133            scenario.requests.record_skipped();
134            if let Some(step_name) = step {
135                scenario
136                    .steps
137                    .entry(Arc::clone(step_name))
138                    .or_default()
139                    .record_skipped();
140            }
141        } else {
142            scenario.requests.record(duration, success, status_code);
143            if let Some(step_name) = step {
144                scenario
145                    .steps
146                    .entry(Arc::clone(step_name))
147                    .or_default()
148                    .record(duration, success, status_code);
149            }
150        }
151    }
152
153    /// Returns scenarios sorted by name, with steps sorted by name within each scenario.
154    fn into_stats(self) -> Option<Vec<ScenarioStats>> {
155        let mut scenario_stats: Vec<ScenarioStats> = self
156            .scenarios
157            .into_iter()
158            .map(|(name, acc)| {
159                let mut steps: Vec<ScenarioStepStats> = acc
160                    .steps
161                    .into_iter()
162                    .map(|(name, requests)| ScenarioStepStats {
163                        name: name.to_string(),
164                        requests,
165                    })
166                    .collect();
167                steps.sort_by(|a, b| a.name.cmp(&b.name));
168                ScenarioStats {
169                    name: name.to_string(),
170                    requests: acc.requests,
171                    steps,
172                }
173            })
174            .collect();
175        scenario_stats.sort_by(|a, b| a.name.cmp(&b.name));
176
177        if scenario_stats.is_empty() {
178            None
179        } else {
180            Some(scenario_stats)
181        }
182    }
183}
184
185/// Shared request-drain accumulator used by both fixed and curve executors.
186pub(crate) struct DrainMetricsAccumulator {
187    pub latency: LatencyHistogram,
188    pub status_codes: StatusCodeHistogram,
189    pub total_requests: u64,
190    pub total_failures: u64,
191    pub total_skipped: u64,
192    pub response_stats: Option<ResponseStats>,
193    scenario_metrics: ScenarioMetricsAccumulator,
194}
195
196impl DrainMetricsAccumulator {
197    pub fn new(has_tracked_fields: bool) -> Self {
198        Self {
199            latency: LatencyHistogram::new(),
200            status_codes: StatusCodeHistogram::new(),
201            total_requests: 0,
202            total_failures: 0,
203            total_skipped: 0,
204            response_stats: if has_tracked_fields {
205                Some(ResponseStats::new())
206            } else {
207                None
208            },
209            scenario_metrics: ScenarioMetricsAccumulator::default(),
210        }
211    }
212
213    pub fn record_request(&mut self, record: &RequestRecord) {
214        self.total_requests += 1;
215
216        if record.skipped {
217            self.total_skipped += 1;
218        } else {
219            if !record.success {
220                self.total_failures += 1;
221            }
222            self.latency.record(record.duration);
223            self.status_codes.record(record.status_code);
224        }
225
226        self.scenario_metrics.record(
227            record.scenario.as_ref(),
228            record.step.as_ref(),
229            record.duration,
230            record.success,
231            record.status_code,
232            record.skipped,
233        );
234    }
235
236    pub fn record_extraction(&mut self, extraction: Option<ExtractionResult>) {
237        if let Some(extraction) = extraction
238            && let Some(ref mut rs) = self.response_stats
239        {
240            rs.record(extraction);
241        }
242    }
243
244    pub fn finalize_scenario_stats(&mut self) -> Option<Vec<ScenarioStats>> {
245        std::mem::take(&mut self.scenario_metrics).into_stats()
246    }
247}
248
249// ── CurveStats ────────────────────────────────────────────────────────────────
250
251/// Curve-specific metadata captured at the end of a curve run.
252pub struct CurveStats {
253    pub duration: std::time::Duration,
254    pub stages: Vec<crate::load_curve::Stage>,
255    /// Per-stage histogram data — one entry per stage in the load curve.
256    pub stage_stats: Vec<StageStats>,
257}
258
259// ── RunStats ──────────────────────────────────────────────────────────────────
260
261pub struct RunStats {
262    pub elapsed: std::time::Duration,
263    pub mode: RunMode,
264    pub latency: LatencyHistogram,
265    pub status_codes: StatusCodeHistogram,
266    pub total_requests: u64,
267    pub total_failures: u64,
268    pub total_skipped: u64,
269    pub template_stats: Option<TemplateStats>,
270    pub response_stats: Option<ResponseStats>,
271    pub curve_stats: Option<CurveStats>,
272    /// Present when scenario/step attribution data was emitted by VUs.
273    pub scenario_stats: Option<Vec<ScenarioStats>>,
274}
275
276// ── OnStepFailure ─────────────────────────────────────────────────────────────
277
278/// Controls what happens when a step fails within a scenario iteration.
279#[derive(Debug, Clone, Copy, Default)]
280pub enum OnStepFailure {
281    /// Continue executing remaining steps in the iteration even after a failure.
282    #[default]
283    Continue,
284    /// Abort the current iteration and skip remaining steps on any step failure.
285    AbortIteration,
286}
287
288// ── ResolvedStep ──────────────────────────────────────────────────────────────
289
290/// A fully resolved scenario step, ready to be executed by a `ScenarioVu`.
291pub struct ResolvedStep {
292    pub name: Arc<str>,
293    pub request_config: Arc<RequestConfig>,
294    pub plain_headers: Arc<Vec<(String, String)>>,
295    pub request_template: Option<Arc<Template>>,
296    pub response_template: Option<Arc<Vec<TrackedField>>>,
297    /// Capture definitions to extract from the response body.
298    pub captures: Vec<CaptureDefinition>,
299    /// Inline request body (mutually exclusive with `request_template`).
300    pub inline_body: Option<Arc<str>>,
301    /// True if any header value contains `{{capture.` references.
302    pub has_capture_headers: bool,
303}
304
305// ── ResolvedScenario ──────────────────────────────────────────────────────────
306
307/// A fully resolved scenario with its steps, ready for VU assignment.
308pub struct ResolvedScenario {
309    pub name: Arc<str>,
310    pub weight: u32,
311    pub on_step_failure: OnStepFailure,
312    pub steps: Vec<ResolvedStep>,
313}
314
315// ── RequestSpec ───────────────────────────────────────────────────────────────
316
317/// All request-level parameters for a run.
318pub enum RequestSpec {
319    /// Single-endpoint mode — all VUs hit the same host/method.
320    Single {
321        host: String,
322        method: HttpMethod,
323        body: Option<Body>,
324        template_path: Option<PathBuf>,
325        response_template_path: Option<PathBuf>,
326        /// Custom HTTP headers to send with every request in this run.
327        headers: Vec<(String, SensitiveString)>,
328    },
329    /// Scenario mode — VUs execute multi-step sequences.
330    Scenarios(Vec<ResolvedScenario>),
331}
332
333// ── ExecutionMode ─────────────────────────────────────────────────────────────
334
335/// Determines the execution strategy for a run.
336pub enum ExecutionMode {
337    /// Classic semaphore-based fixed-count execution.
338    Fixed {
339        request_count: usize,
340        concurrency: usize,
341        /// Optional upper bound on aggregate requests-per-second.
342        /// `None` means no rate limit.
343        rps: Option<usize>,
344    },
345    /// Time-based dynamic VU execution driven by a `LoadCurve`.
346    Curve {
347        curve: LoadCurve,
348        /// Optional upper bound on aggregate requests-per-second.
349        /// `None` means no rate limit.
350        rps: Option<usize>,
351    },
352}
353
354// ── Shared helpers ────────────────────────────────────────────────────────────
355
356/// Maps a VU index to a scenario index using weighted round-robin assignment.
357///
358/// VUs are assigned to scenarios proportionally to each scenario's `weight`.
359/// For example, with weights `[3, 1]` and 8 VUs: VUs 0–2 → scenario 0,
360/// VU 3 → scenario 1, VU 4–6 → scenario 0, VU 7 → scenario 1.
361///
362/// Parse-time caps (`MAX_SCENARIOS` × `MAX_SCENARIO_WEIGHT`) keep the total
363/// weight well inside `u32`. As defense in depth for programmatic callers that
364/// bypass `parse_config`, the running sum is accumulated as `u64` and we fall
365/// back to `vu_index % scenarios.len()` if the total weight does not fit or is
366/// zero. The `vu_index` is also widened to `u64` to avoid truncation on
367/// 64-bit platforms.
368///
369/// Panics if `scenarios` is empty — callers must ensure at least one scenario exists.
370pub fn assign_scenario(vu_index: usize, scenarios: &[ResolvedScenario]) -> usize {
371    // Accumulate in u64 to make overflow impossible for any realistic number
372    // of scenarios. Saturate on the (unreachable under parse-time caps) overflow.
373    let total_weight: u64 = scenarios
374        .iter()
375        .fold(0u64, |acc, s| acc.saturating_add(s.weight as u64));
376    if total_weight == 0 {
377        // Pathological input (zero weights, or empty after caps). Fall back to
378        // a deterministic round-robin across scenario indices.
379        return vu_index % scenarios.len();
380    }
381    let slot = (vu_index as u64) % total_weight;
382    let mut cumulative: u64 = 0;
383    for (i, s) in scenarios.iter().enumerate() {
384        cumulative = cumulative.saturating_add(s.weight as u64);
385        if slot < cumulative {
386            return i;
387        }
388    }
389    scenarios.len() - 1
390}
391
392pub(crate) fn resolve_tracked_fields(
393    path: Option<PathBuf>,
394) -> Result<Option<Arc<Vec<TrackedField>>>, Box<dyn std::error::Error>> {
395    path.map(|p| {
396        ResponseTemplate::parse(&p)
397            .map(|rt| Arc::new(rt.fields))
398            .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
399    })
400    .transpose()
401}
402
403pub(crate) fn build_request_config(
404    host: String,
405    method: HttpMethod,
406    body: Option<Body>,
407    tracked_fields: Option<Arc<Vec<TrackedField>>>,
408    headers: Vec<(String, SensitiveString)>,
409    concurrency: usize,
410) -> Result<Arc<RequestConfig>, RunError> {
411    let client = reqwest::Client::builder()
412        .pool_max_idle_per_host(concurrency)
413        .build()?;
414    Ok(Arc::new(RequestConfig {
415        client,
416        host: Arc::new(host),
417        method,
418        body: Arc::new(body),
419        tracked_fields,
420        headers: Arc::new(headers),
421    }))
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use std::sync::Arc;
428    use std::time::{Duration, Instant};
429
430    // ── ScenarioMetricsAccumulator ────────────────────────────────────────────
431
432    #[test]
433    fn scenario_record_none_stays_empty() {
434        let mut acc = ScenarioMetricsAccumulator::default();
435        acc.record(
436            None,
437            None,
438            Duration::from_millis(10),
439            true,
440            Some(200),
441            false,
442        );
443        assert!(acc.into_stats().is_none());
444    }
445
446    #[test]
447    fn scenario_empty_into_stats_returns_none() {
448        let acc = ScenarioMetricsAccumulator::default();
449        assert!(acc.into_stats().is_none());
450    }
451
452    #[test]
453    fn two_scenarios_same_step_name_independent() {
454        let mut acc = ScenarioMetricsAccumulator::default();
455
456        let scenario_a: Arc<str> = Arc::from("A");
457        let scenario_b: Arc<str> = Arc::from("B");
458        let step_login: Arc<str> = Arc::from("login");
459
460        // 2 requests for scenario A / step login
461        acc.record(
462            Some(&scenario_a),
463            Some(&step_login),
464            Duration::from_millis(10),
465            true,
466            Some(200),
467            false,
468        );
469        acc.record(
470            Some(&scenario_a),
471            Some(&step_login),
472            Duration::from_millis(15),
473            true,
474            Some(200),
475            false,
476        );
477
478        // 3 requests for scenario B / step login
479        acc.record(
480            Some(&scenario_b),
481            Some(&step_login),
482            Duration::from_millis(20),
483            true,
484            Some(200),
485            false,
486        );
487        acc.record(
488            Some(&scenario_b),
489            Some(&step_login),
490            Duration::from_millis(25),
491            true,
492            Some(200),
493            false,
494        );
495        acc.record(
496            Some(&scenario_b),
497            Some(&step_login),
498            Duration::from_millis(30),
499            true,
500            Some(200),
501            false,
502        );
503
504        let stats = acc.into_stats().expect("should have scenario stats");
505        assert_eq!(stats.len(), 2);
506
507        // into_stats() sorts by name, so A comes before B
508        assert_eq!(stats[0].name, "A");
509        assert_eq!(stats[1].name, "B");
510
511        let a_login = stats[0]
512            .steps
513            .iter()
514            .find(|s| s.name == "login")
515            .expect("A should have login step");
516        assert_eq!(a_login.requests.total_requests, 2);
517
518        let b_login = stats[1]
519            .steps
520            .iter()
521            .find(|s| s.name == "login")
522            .expect("B should have login step");
523        assert_eq!(b_login.requests.total_requests, 3);
524    }
525
526    // ── DrainMetricsAccumulator ───────────────────────────────────────────────
527
528    #[test]
529    fn drain_accumulator_no_scenario_labels() {
530        let mut acc = DrainMetricsAccumulator::new(false);
531
532        let record = RequestRecord {
533            duration: Duration::from_millis(50),
534            completed_at: Instant::now(),
535            success: true,
536            status_code: Some(200),
537            extraction: None,
538            scenario: None,
539            step: None,
540            skipped: false,
541        };
542
543        acc.record_request(&record);
544
545        assert_eq!(acc.total_requests, 1);
546        assert!(acc.finalize_scenario_stats().is_none());
547    }
548
549    #[test]
550    fn drain_accumulator_skipped_records() {
551        let mut acc = DrainMetricsAccumulator::new(false);
552
553        let scenario: Arc<str> = Arc::from("checkout");
554        let step: Arc<str> = Arc::from("pay");
555
556        // Normal record
557        let normal = RequestRecord {
558            duration: Duration::from_millis(50),
559            completed_at: Instant::now(),
560            success: true,
561            status_code: Some(200),
562            extraction: None,
563            scenario: Some(Arc::clone(&scenario)),
564            step: Some(Arc::clone(&step)),
565            skipped: false,
566        };
567        acc.record_request(&normal);
568
569        // Skipped record
570        let skipped = RequestRecord {
571            duration: Duration::ZERO,
572            completed_at: Instant::now(),
573            success: false,
574            status_code: None,
575            extraction: None,
576            scenario: Some(Arc::clone(&scenario)),
577            step: Some(Arc::clone(&step)),
578            skipped: true,
579        };
580        acc.record_request(&skipped);
581
582        assert_eq!(acc.total_requests, 2);
583        assert_eq!(acc.total_failures, 0, "skipped records are not failures");
584        assert_eq!(acc.total_skipped, 1);
585
586        let scenarios = acc.finalize_scenario_stats().unwrap();
587        let checkout = &scenarios[0];
588        assert_eq!(checkout.requests.total_requests, 2);
589        assert_eq!(checkout.requests.total_skipped, 1);
590        assert_eq!(checkout.requests.total_failures, 0);
591
592        let pay_step = checkout.steps.iter().find(|s| s.name == "pay").unwrap();
593        assert_eq!(pay_step.requests.total_requests, 2);
594        assert_eq!(pay_step.requests.total_skipped, 1);
595    }
596
597    // ── assign_scenario ───────────────────────────────────────────────────────
598
599    fn make_scenario(name: &str, weight: u32) -> ResolvedScenario {
600        ResolvedScenario {
601            name: Arc::from(name),
602            weight,
603            on_step_failure: OnStepFailure::Continue,
604            steps: vec![],
605        }
606    }
607
608    #[test]
609    fn assign_scenario_weighted() {
610        // Weights [3, 1] → total 4. With 8 VUs the pattern repeats twice:
611        // slot 0,1,2 → scenario 0; slot 3 → scenario 1.
612        let scenarios = vec![make_scenario("A", 3), make_scenario("B", 1)];
613
614        let assignments: Vec<usize> = (0..8).map(|i| assign_scenario(i, &scenarios)).collect();
615
616        // First cycle (VUs 0–3)
617        assert_eq!(assignments[0], 0);
618        assert_eq!(assignments[1], 0);
619        assert_eq!(assignments[2], 0);
620        assert_eq!(assignments[3], 1);
621        // Second cycle (VUs 4–7) — identical pattern
622        assert_eq!(assignments[4], 0);
623        assert_eq!(assignments[5], 0);
624        assert_eq!(assignments[6], 0);
625        assert_eq!(assignments[7], 1);
626    }
627
628    #[test]
629    fn assign_scenario_single() {
630        let scenarios = vec![make_scenario("only", 5)];
631        for i in 0..10 {
632            assert_eq!(assign_scenario(i, &scenarios), 0);
633        }
634    }
635
636    #[test]
637    fn assign_scenario_equal_weights() {
638        let scenarios = vec![make_scenario("A", 1), make_scenario("B", 1)];
639        // Alternating: 0, 1, 0, 1, ...
640        for i in 0..8 {
641            assert_eq!(assign_scenario(i, &scenarios), i % 2);
642        }
643    }
644
645    #[test]
646    fn assign_scenario_u32_max_weights_does_not_panic() {
647        // Regression for VULN-002: with the old u32 accumulator, summing two
648        // `u32::MAX` weights would wrap to 0xFFFF_FFFE in release (or panic in
649        // debug) and `% 0` could crash. The u64 accumulator handles it safely;
650        // parse_config's cap prevents this shape of input in practice, but the
651        // function itself is public and callable by library users bypassing
652        // parse_config.
653        let scenarios = vec![make_scenario("A", u32::MAX), make_scenario("B", u32::MAX)];
654        // Should not panic. Any index in [0, 2) is acceptable — we just need
655        // the function to terminate cleanly.
656        let assignment = assign_scenario(0, &scenarios);
657        assert!(assignment < 2);
658        let assignment = assign_scenario(usize::MAX, &scenarios);
659        assert!(assignment < 2);
660    }
661
662    #[test]
663    fn assign_scenario_zero_weights_falls_back_to_round_robin() {
664        // Pathological input: every scenario has weight 0 (not reachable via
665        // parse_config, but possible via programmatic ResolvedScenario
666        // construction). Must not divide-by-zero.
667        let scenarios = vec![make_scenario("A", 0), make_scenario("B", 0)];
668        assert_eq!(assign_scenario(0, &scenarios), 0);
669        assert_eq!(assign_scenario(1, &scenarios), 1);
670        assert_eq!(assign_scenario(2, &scenarios), 0);
671        assert_eq!(assign_scenario(3, &scenarios), 1);
672    }
673}