Skip to main content

lmn_core/execution/fixed/
mod.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6use tracing::Instrument;
7use tracing::info_span;
8
9use crate::execution::{
10    DrainMetricsAccumulator, ResolvedScenario, RpsLimiter, ScenarioStats, assign_scenario,
11};
12use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
13use crate::http::{RequestConfig, RequestRecord};
14use crate::monitoring::SpanName;
15use crate::request_template::Template;
16use crate::response_template::stats::ResponseStats;
17use crate::vu::Vu;
18use crate::vu::scenario::{ScenarioVu, StepExec};
19
20// ── FixedExecutorParams ───────────────────────────────────────────────────────
21
22/// Parameters for constructing a `FixedExecutor`.
23pub struct FixedExecutorParams {
24    pub request_config: Arc<RequestConfig>,
25    pub template: Option<Arc<Template>>,
26    pub total: usize,
27    pub concurrency: usize,
28    /// Optional upper bound on aggregate requests-per-second across all VUs.
29    /// `None` means no rate limit. Values that overflow `u32` or equal zero
30    /// are treated as unset.
31    pub rps: Option<usize>,
32    pub cancellation_token: CancellationToken,
33    /// When present, the executor spawns `ScenarioVu` instances instead of
34    /// plain `Vu` instances. Each VU is assigned a scenario via weighted
35    /// round-robin. Budget counts iterations (one per full scenario loop).
36    pub scenarios: Option<Vec<ResolvedScenario>>,
37}
38
39// ── FixedExecutionResult ──────────────────────────────────────────────────────
40
41/// Result returned by `FixedExecutor::execute`.
42pub struct FixedExecutionResult {
43    pub latency: LatencyHistogram,
44    pub status_codes: StatusCodeHistogram,
45    pub total_requests: u64,
46    pub total_failures: u64,
47    pub total_skipped: u64,
48    pub response_stats: Option<ResponseStats>,
49    pub scenario_stats: Option<Vec<ScenarioStats>>,
50}
51
52// ── FixedExecutor ─────────────────────────────────────────────────────────────
53
54/// Executes a fixed-count load test using a worker-pool model: spawns exactly
55/// `concurrency` long-lived VU tasks that share an atomic request budget and
56/// self-terminate when the budget is exhausted.
57pub struct FixedExecutor {
58    params: FixedExecutorParams,
59}
60
61impl FixedExecutor {
62    pub fn new(params: FixedExecutorParams) -> Self {
63        Self { params }
64    }
65
66    /// Runs the fixed load test. Spawns `concurrency` VU tasks sharing a budget
67    /// of `total` requests. Returns a `FixedExecutionResult` when all requests
68    /// complete or a cancellation signal is received.
69    pub async fn execute(self) -> Result<FixedExecutionResult, crate::execution::RunError> {
70        let FixedExecutorParams {
71            request_config,
72            template,
73            total,
74            concurrency,
75            rps,
76            cancellation_token,
77            scenarios,
78        } = self.params;
79
80        // Pre-convert headers once before spawning VUs to avoid per-request allocation.
81        let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
82            request_config
83                .headers
84                .iter()
85                .map(|(k, v)| (k.clone(), v.to_string()))
86                .collect(),
87        );
88
89        // `has_tracked_fields` is true when the single-request path uses a
90        // response template, OR when any step across any scenario does.
91        let has_tracked_fields = if let Some(ref sc) = scenarios {
92            sc.iter()
93                .flat_map(|s| s.steps.iter())
94                .any(|step| step.response_template.is_some())
95        } else {
96            request_config.tracked_fields.is_some()
97        };
98
99        // Build the shared RPS limiter up front so every VU receives the same
100        // `Arc` clone. `None` means no rate limit configured.
101        let rate_limiter = rps.and_then(RpsLimiter::new);
102
103        async {
104            let budget = Arc::new(AtomicUsize::new(total));
105            let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
106
107            // Spawn a dedicated drain task that owns the receiver and all
108            // accumulator state. It returns a `FixedExecutionResult` once the
109            // channel closes (all VU senders dropped).
110            let drain_handle = tokio::spawn(async move {
111                let mut rx = rx;
112                let mut acc = DrainMetricsAccumulator::new(has_tracked_fields);
113
114                while let Some(record) = rx.recv().await {
115                    acc.record_request(&record);
116                    acc.record_extraction(record.extraction);
117                }
118                let scenario_stats = acc.finalize_scenario_stats();
119
120                FixedExecutionResult {
121                    latency: acc.latency,
122                    status_codes: acc.status_codes,
123                    total_requests: acc.total_requests,
124                    total_failures: acc.total_failures,
125                    total_skipped: acc.total_skipped,
126                    response_stats: acc.response_stats,
127                    scenario_stats,
128                }
129            });
130
131            let vu_handles: Vec<_> = if let Some(ref scenarios) = scenarios {
132                // Scenario mode: spawn `ScenarioVu` instances. Each VU is
133                // assigned a scenario via weighted round-robin and claims one
134                // budget unit per full iteration (not per step).
135                (0..concurrency)
136                    .map(|vu_idx| {
137                        let scenario = &scenarios[assign_scenario(vu_idx, scenarios)];
138                        let steps = scenario
139                            .steps
140                            .iter()
141                            .map(|step| StepExec {
142                                step_name: Arc::clone(&step.name),
143                                request_config: Arc::clone(&step.request_config),
144                                plain_headers: Arc::clone(&step.plain_headers),
145                                request_template: step.request_template.as_ref().map(Arc::clone),
146                                response_template: step.response_template.as_ref().map(Arc::clone),
147                                captures: step.captures.clone(),
148                                inline_body: step.inline_body.clone(),
149                                has_capture_headers: step.has_capture_headers,
150                            })
151                            .collect();
152                        ScenarioVu {
153                            scenario_name: Arc::clone(&scenario.name),
154                            steps,
155                            on_step_failure: scenario.on_step_failure,
156                            cancellation_token: cancellation_token.clone(),
157                            result_tx: tx.clone(),
158                            budget: Some(Arc::clone(&budget)),
159                            rate_limiter: rate_limiter.as_ref().map(Arc::clone),
160                        }
161                        .spawn()
162                    })
163                    .collect()
164            } else {
165                // Single-request mode: spawn plain `Vu` instances sharing the budget.
166                (0..concurrency)
167                    .map(|_| {
168                        Vu {
169                            request_config: Arc::clone(&request_config),
170                            plain_headers: Arc::clone(&plain_headers),
171                            template: template.as_ref().map(Arc::clone),
172                            scenario_label: None,
173                            step_label: None,
174                            cancellation_token: cancellation_token.clone(),
175                            result_tx: tx.clone(),
176                            budget: Some(Arc::clone(&budget)),
177                            rate_limiter: rate_limiter.as_ref().map(Arc::clone),
178                        }
179                        .spawn()
180                    })
181                    .collect()
182            };
183
184            // Drop the coordinator's sender so the channel closes once all VU
185            // senders are also dropped (they are, once each VU task exits).
186            drop(tx);
187
188            // Await all VU tasks to ensure they have finished sending.
189            for handle in vu_handles {
190                let _ = handle.await;
191            }
192
193            // All VU senders are now dropped — channel is closed. Await the
194            // drain task to get the accumulated result.
195            Ok(drain_handle.await?)
196        }
197        .instrument(info_span!(SpanName::REQUESTS, total))
198        .await
199    }
200}
201
202// ── Tests ─────────────────────────────────────────────────────────────────────
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207
208    // ── struct_shape_fixed_execution_result ───────────────────────────────────
209
210    #[test]
211    fn struct_shape_fixed_execution_result() {
212        let result = FixedExecutionResult {
213            latency: LatencyHistogram::new(),
214            status_codes: StatusCodeHistogram::new(),
215            total_requests: 10,
216            total_failures: 1,
217            total_skipped: 0,
218            response_stats: None,
219            scenario_stats: None,
220        };
221        assert_eq!(result.total_requests, 10);
222        assert_eq!(result.total_failures, 1);
223        assert!(result.latency.is_empty());
224        assert!(result.response_stats.is_none());
225        assert!(result.scenario_stats.is_none());
226    }
227
228    // ── struct_shape_fixed_executor_params ────────────────────────────────────
229
230    #[test]
231    fn struct_shape_fixed_executor_params() {
232        use crate::command::HttpMethod;
233        use crate::http::RequestConfig;
234        use tokio_util::sync::CancellationToken;
235
236        let config = Arc::new(RequestConfig {
237            client: reqwest::Client::new(),
238            host: Arc::new("http://localhost".to_string()),
239            method: HttpMethod::Get,
240            body: Arc::new(None),
241            tracked_fields: None,
242            headers: Arc::new(vec![]),
243        });
244
245        let params = FixedExecutorParams {
246            request_config: Arc::clone(&config),
247            template: None,
248            total: 5,
249            concurrency: 2,
250            rps: None,
251            cancellation_token: CancellationToken::new(),
252            scenarios: None,
253        };
254
255        assert_eq!(params.total, 5);
256        assert_eq!(params.concurrency, 2);
257        assert!(params.template.is_none());
258        assert!(params.scenarios.is_none());
259        assert!(params.rps.is_none());
260    }
261
262    // ── fixed_executor_new_stores_params ─────────────────────────────────────
263
264    #[test]
265    fn fixed_executor_new_stores_params() {
266        use crate::command::HttpMethod;
267        use crate::http::RequestConfig;
268        use tokio_util::sync::CancellationToken;
269
270        let config = Arc::new(RequestConfig {
271            client: reqwest::Client::new(),
272            host: Arc::new("http://localhost".to_string()),
273            method: HttpMethod::Get,
274            body: Arc::new(None),
275            tracked_fields: None,
276            headers: Arc::new(vec![]),
277        });
278
279        let executor = FixedExecutor::new(FixedExecutorParams {
280            request_config: config,
281            template: None,
282            total: 1,
283            concurrency: 1,
284            rps: Some(50),
285            cancellation_token: CancellationToken::new(),
286            scenarios: None,
287        });
288
289        assert_eq!(executor.params.total, 1);
290        assert_eq!(executor.params.concurrency, 1);
291        assert_eq!(executor.params.rps, Some(50));
292    }
293}