Skip to main content

lmn_core/vu/
scenario.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Instant;
4
5use tokio::sync::mpsc;
6use tokio::task::JoinHandle;
7use tokio_util::sync::CancellationToken;
8
9use crate::capture::{
10    CaptureDefinition, CaptureState, inject_captures, inject_captures_into_headers, value_to_string,
11};
12use crate::execution::{OnStepFailure, RpsLimiter};
13use crate::http::{Request, RequestConfig, RequestRecord};
14use crate::request_template::Template;
15use crate::response_template::extractor::resolve_path;
16use crate::response_template::field::TrackedField;
17
18// ── StepExec ──────────────────────────────────────────────────────────────────
19
20/// All execution-ready data for a single step within a scenario.
21pub struct StepExec {
22    pub step_name: Arc<str>,
23    pub request_config: Arc<RequestConfig>,
24    /// Pre-converted header pairs — avoids per-request allocation.
25    pub plain_headers: Arc<Vec<(String, String)>>,
26    pub request_template: Option<Arc<Template>>,
27    /// Tracked fields parsed from the response template, if present.
28    pub response_template: Option<Arc<Vec<TrackedField>>>,
29    /// Capture definitions to extract from the response body.
30    pub captures: Vec<CaptureDefinition>,
31    /// Inline request body (mutually exclusive with `request_template`).
32    pub inline_body: Option<Arc<str>>,
33    /// True if any header value contains `{{capture.` references.
34    pub has_capture_headers: bool,
35}
36
37// ── ScenarioVu ────────────────────────────────────────────────────────────────
38
39/// A virtual user that executes a fixed sequence of steps on each iteration.
40///
41/// Each loop iteration claims a single budget unit (covering all steps), then
42/// executes every step sequentially. Step failures are handled per
43/// `on_step_failure`: either the remaining steps are skipped (`AbortIteration`)
44/// or execution continues (`Continue`).
45pub struct ScenarioVu {
46    pub scenario_name: Arc<str>,
47    pub steps: Vec<StepExec>,
48    pub on_step_failure: OnStepFailure,
49    pub cancellation_token: CancellationToken,
50    pub result_tx: mpsc::UnboundedSender<RequestRecord>,
51    /// Optional request budget shared across all VUs in fixed-count mode.
52    ///
53    /// One unit is claimed per full iteration (not per step). `None` means the
54    /// VU runs until the cancellation token fires (curve mode).
55    pub budget: Option<Arc<AtomicUsize>>,
56    /// Optional shared RPS limiter. When present, one permit is awaited per
57    /// HTTP step (not per iteration), so the cap reflects requests-per-second
58    /// regardless of how many steps a scenario has.
59    pub rate_limiter: Option<Arc<RpsLimiter>>,
60}
61
62impl ScenarioVu {
63    /// Attempts to claim one iteration unit from the shared budget.
64    ///
65    /// Returns `true` if the VU should proceed with the next iteration, `false`
66    /// if the budget is exhausted and the VU should stop.
67    ///
68    /// Uses `fetch_update` instead of `fetch_sub` to prevent underflow: the
69    /// decrement is only committed when the value is still `> 0`.
70    fn claim_budget(&self) -> bool {
71        match &self.budget {
72            None => true,
73            Some(b) => b
74                .fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
75                    if n > 0 { Some(n - 1) } else { None }
76                })
77                .is_ok(),
78        }
79    }
80
81    /// Emits a skipped `RequestRecord` for the given step.
82    fn emit_skipped(&self, step: &StepExec) {
83        let _ = self.result_tx.send(RequestRecord {
84            duration: std::time::Duration::ZERO,
85            completed_at: Instant::now(),
86            success: false,
87            status_code: None,
88            extraction: None,
89            scenario: Some(Arc::clone(&self.scenario_name)),
90            step: Some(Arc::clone(&step.step_name)),
91            skipped: true,
92        });
93    }
94
95    /// Consumes the `ScenarioVu` and spawns it as a Tokio task.
96    ///
97    /// Returns a `JoinHandle` that resolves when the VU exits — either because
98    /// the budget is exhausted, the cancellation token fires, or the result
99    /// channel is closed.
100    pub fn spawn(self) -> JoinHandle<()> {
101        tokio::spawn(async move {
102            let mut captures = CaptureState::new();
103
104            loop {
105                // Check cancellation before claiming the budget so we don't
106                // consume a unit we won't use.
107                if self.cancellation_token.is_cancelled() {
108                    break;
109                }
110
111                // Claim one iteration unit. In curve mode this always succeeds.
112                if !self.claim_budget() {
113                    break;
114                }
115
116                // Clear capture state at the start of each iteration.
117                captures.clear();
118
119                // Execute steps sequentially.
120                let mut abort_remaining = false;
121                for (step_idx, step) in self.steps.iter().enumerate() {
122                    if self.cancellation_token.is_cancelled() {
123                        return;
124                    }
125
126                    // If a previous step triggered abort, emit skipped for remaining steps.
127                    if abort_remaining {
128                        self.emit_skipped(step);
129                        continue;
130                    }
131
132                    // 1. Generate body (from request_template, inline_body, or none).
133                    let mut body_string: Option<String> = match step
134                        .request_template
135                        .as_ref()
136                        .map(|t| t.generate_one())
137                    {
138                        None => step.inline_body.as_ref().map(|b| b.to_string()),
139                        Some(Ok(s)) => Some(s),
140                        Some(Err(e)) => {
141                            eprintln!(
142                                "error: template serialization failed in {}/{}, aborting iteration: {e}",
143                                self.scenario_name, step.step_name
144                            );
145                            for remaining in &self.steps[step_idx..] {
146                                self.emit_skipped(remaining);
147                            }
148                            break;
149                        }
150                    };
151
152                    // 2. Inject {{capture.KEY}} into body.
153                    if let Some(ref body) = body_string {
154                        match inject_captures(body, &captures) {
155                            Ok(injected) => body_string = Some(injected),
156                            Err(e) => {
157                                eprintln!(
158                                    "warning: capture injection into body failed in {}/{}, aborting iteration: {e}",
159                                    self.scenario_name, step.step_name
160                                );
161                                // Emit skipped for this step and all remaining, then
162                                // break out of the step loop (no abort_remaining needed).
163                                for remaining in &self.steps[step_idx..] {
164                                    self.emit_skipped(remaining);
165                                }
166                                break;
167                            }
168                        }
169                    }
170
171                    // 3. Inject {{capture.KEY}} into headers (if needed).
172                    let headers = if step.has_capture_headers {
173                        match inject_captures_into_headers(&step.plain_headers, &captures) {
174                            Ok(injected) => {
175                                if injected.is_empty() {
176                                    None
177                                } else {
178                                    Some(Arc::new(injected))
179                                }
180                            }
181                            Err(e) => {
182                                eprintln!(
183                                    "warning: capture injection into headers failed in {}/{}, aborting iteration: {e}",
184                                    self.scenario_name, step.step_name
185                                );
186                                for remaining in &self.steps[step_idx..] {
187                                    self.emit_skipped(remaining);
188                                }
189                                break;
190                            }
191                        }
192                    } else if step.plain_headers.is_empty() {
193                        None
194                    } else {
195                        Some(Arc::clone(&step.plain_headers))
196                    };
197
198                    // Wait for an RPS permit if a limiter is configured.
199                    // Awaited per-step so the cap is requests-per-second, not
200                    // iterations-per-second.
201                    if let Some(ref rl) = self.rate_limiter {
202                        tokio::select! {
203                            _ = self.cancellation_token.cancelled() => return,
204                            _ = rl.acquire() => {}
205                        }
206                    }
207
208                    // 4. Build and execute the HTTP request.
209                    // Resolve body through request_config (handles Body::Formatted).
210                    let resolved = step.request_config.resolve_body(body_string);
211
212                    let client = step.request_config.client.clone();
213                    let url = Arc::clone(&step.request_config.host);
214                    let method = step.request_config.method;
215                    let tracked_fields = step.response_template.clone();
216                    let needs_response_body = tracked_fields.is_some() || !step.captures.is_empty();
217
218                    let result_fut = async {
219                        let mut req = Request::new(client, url, method);
220                        if let Some((content, content_type)) = resolved {
221                            req = req.body(content, content_type);
222                        }
223                        if needs_response_body {
224                            req = req.read_response();
225                        }
226                        if let Some(h) = headers {
227                            req = req.headers(h);
228                        }
229                        req.execute().await
230                    };
231
232                    tokio::select! {
233                        _ = self.cancellation_token.cancelled() => return,
234                        result = result_fut => {
235                            // Parse response body once — shared by extraction and capture.
236                            let parsed_body: Option<serde_json::Value> =
237                                result.response_body.as_deref().and_then(|s| {
238                                    serde_json::from_str(s).ok()
239                                });
240
241                            // 5. Extract response_template fields (existing, for stats).
242                            let extraction = if let Some(ref fields) = tracked_fields {
243                                parsed_body
244                                    .as_ref()
245                                    .map(|val| crate::response_template::extractor::extract(val, fields))
246                            } else {
247                                None
248                            };
249
250                            // 6. Extract captures from response JSON.
251                            if !step.captures.is_empty()
252                                && let Some(ref body_val) = parsed_body {
253                                    for cap in &step.captures {
254                                        if let Some(matched) = resolve_path(body_val, &cap.path)
255                                            && let Some(s) = value_to_string(matched) {
256                                                captures.insert(cap.alias.clone(), s);
257                                            }
258                                    }
259                                }
260
261                            let step_failed = !result.success;
262
263                            // 7. Send RequestRecord through channel.
264                            let record = RequestRecord {
265                                duration: result.duration,
266                                completed_at: result.completed_at,
267                                success: result.success,
268                                status_code: result.status_code,
269                                extraction,
270                                scenario: Some(Arc::clone(&self.scenario_name)),
271                                step: Some(Arc::clone(&step.step_name)),
272                                skipped: false,
273                            };
274
275                            if self.result_tx.send(record).is_err() {
276                                return;
277                            }
278
279                            // 8. If step failed + on_step_failure == AbortIteration → skip remaining.
280                            if step_failed
281                                && matches!(self.on_step_failure, OnStepFailure::AbortIteration)
282                            {
283                                abort_remaining = true;
284                            }
285                        }
286                    }
287                }
288            }
289        })
290    }
291}
292
293// ── Tests ─────────────────────────────────────────────────────────────────────
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use std::sync::atomic::AtomicUsize;
299
300    // ── struct_shape_step_exec ────────────────────────────────────────────────
301
302    #[test]
303    fn struct_shape_step_exec() {
304        use crate::command::HttpMethod;
305        use crate::http::RequestConfig;
306
307        let config = Arc::new(RequestConfig {
308            client: reqwest::Client::new(),
309            host: Arc::new("http://localhost".to_string()),
310            method: HttpMethod::Get,
311            body: Arc::new(None),
312            tracked_fields: None,
313            headers: Arc::new(vec![]),
314        });
315
316        let step = StepExec {
317            step_name: Arc::from("login"),
318            request_config: Arc::clone(&config),
319            plain_headers: Arc::new(vec![]),
320            request_template: None,
321            response_template: None,
322            captures: vec![],
323            inline_body: None,
324            has_capture_headers: false,
325        };
326
327        assert_eq!(&*step.step_name, "login");
328        assert!(step.request_template.is_none());
329        assert!(step.response_template.is_none());
330    }
331
332    // ── struct_shape_scenario_vu ──────────────────────────────────────────────
333
334    #[test]
335    fn struct_shape_scenario_vu() {
336        use crate::command::HttpMethod;
337        use crate::http::RequestConfig;
338
339        let config = Arc::new(RequestConfig {
340            client: reqwest::Client::new(),
341            host: Arc::new("http://localhost".to_string()),
342            method: HttpMethod::Get,
343            body: Arc::new(None),
344            tracked_fields: None,
345            headers: Arc::new(vec![]),
346        });
347
348        let (tx, _rx) = mpsc::unbounded_channel();
349
350        let vu = ScenarioVu {
351            scenario_name: Arc::from("checkout"),
352            steps: vec![StepExec {
353                step_name: Arc::from("add_to_cart"),
354                request_config: Arc::clone(&config),
355                plain_headers: Arc::new(vec![]),
356                request_template: None,
357                response_template: None,
358                captures: vec![],
359                inline_body: None,
360                has_capture_headers: false,
361            }],
362            on_step_failure: OnStepFailure::Continue,
363            cancellation_token: CancellationToken::new(),
364            result_tx: tx,
365            budget: None,
366            rate_limiter: None,
367        };
368
369        assert_eq!(&*vu.scenario_name, "checkout");
370        assert_eq!(vu.steps.len(), 1);
371        assert!(vu.budget.is_none());
372    }
373
374    // ── struct_shape_scenario_vu_with_budget ──────────────────────────────────
375
376    #[test]
377    fn struct_shape_scenario_vu_with_budget() {
378        let (tx, _rx) = mpsc::unbounded_channel();
379        let budget = Arc::new(AtomicUsize::new(50));
380
381        let vu = ScenarioVu {
382            scenario_name: Arc::from("checkout"),
383            steps: vec![],
384            on_step_failure: OnStepFailure::AbortIteration,
385            cancellation_token: CancellationToken::new(),
386            result_tx: tx,
387            budget: Some(Arc::clone(&budget)),
388            rate_limiter: None,
389        };
390
391        assert_eq!(vu.budget.unwrap().load(Ordering::Relaxed), 50);
392    }
393
394    // ── budget_claim_exhausts_correctly ──────────────────────────────────────
395
396    #[test]
397    fn budget_claim_exhausts_correctly() {
398        let budget = Arc::new(AtomicUsize::new(2));
399
400        // First claim
401        let first = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
402            if n > 0 { Some(n - 1) } else { None }
403        });
404        assert!(first.is_ok());
405        assert_eq!(budget.load(Ordering::Relaxed), 1);
406
407        // Second claim
408        let second = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
409            if n > 0 { Some(n - 1) } else { None }
410        });
411        assert!(second.is_ok());
412        assert_eq!(budget.load(Ordering::Relaxed), 0);
413
414        // Third claim — exhausted, no underflow
415        let third = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
416            if n > 0 { Some(n - 1) } else { None }
417        });
418        assert!(third.is_err());
419        assert_eq!(budget.load(Ordering::Relaxed), 0);
420    }
421}