1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Instant;
4
5use tokio::sync::mpsc;
6use tokio::task::JoinHandle;
7use tokio_util::sync::CancellationToken;
8
9use crate::capture::{
10 CaptureDefinition, CaptureState, inject_captures, inject_captures_into_headers, value_to_string,
11};
12use crate::execution::{OnStepFailure, RpsLimiter};
13use crate::http::{Request, RequestConfig, RequestRecord};
14use crate::request_template::Template;
15use crate::response_template::extractor::resolve_path;
16use crate::response_template::field::TrackedField;
17
18pub struct StepExec {
22 pub step_name: Arc<str>,
23 pub request_config: Arc<RequestConfig>,
24 pub plain_headers: Arc<Vec<(String, String)>>,
26 pub request_template: Option<Arc<Template>>,
27 pub response_template: Option<Arc<Vec<TrackedField>>>,
29 pub captures: Vec<CaptureDefinition>,
31 pub inline_body: Option<Arc<str>>,
33 pub has_capture_headers: bool,
35}
36
37pub struct ScenarioVu {
46 pub scenario_name: Arc<str>,
47 pub steps: Vec<StepExec>,
48 pub on_step_failure: OnStepFailure,
49 pub cancellation_token: CancellationToken,
50 pub result_tx: mpsc::UnboundedSender<RequestRecord>,
51 pub budget: Option<Arc<AtomicUsize>>,
56 pub rate_limiter: Option<Arc<RpsLimiter>>,
60}
61
62impl ScenarioVu {
63 fn claim_budget(&self) -> bool {
71 match &self.budget {
72 None => true,
73 Some(b) => b
74 .fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
75 if n > 0 { Some(n - 1) } else { None }
76 })
77 .is_ok(),
78 }
79 }
80
81 fn emit_skipped(&self, step: &StepExec) {
83 let _ = self.result_tx.send(RequestRecord {
84 duration: std::time::Duration::ZERO,
85 completed_at: Instant::now(),
86 success: false,
87 status_code: None,
88 extraction: None,
89 scenario: Some(Arc::clone(&self.scenario_name)),
90 step: Some(Arc::clone(&step.step_name)),
91 skipped: true,
92 });
93 }
94
95 pub fn spawn(self) -> JoinHandle<()> {
101 tokio::spawn(async move {
102 let mut captures = CaptureState::new();
103
104 loop {
105 if self.cancellation_token.is_cancelled() {
108 break;
109 }
110
111 if !self.claim_budget() {
113 break;
114 }
115
116 captures.clear();
118
119 let mut abort_remaining = false;
121 for (step_idx, step) in self.steps.iter().enumerate() {
122 if self.cancellation_token.is_cancelled() {
123 return;
124 }
125
126 if abort_remaining {
128 self.emit_skipped(step);
129 continue;
130 }
131
132 let mut body_string: Option<String> = match step
134 .request_template
135 .as_ref()
136 .map(|t| t.generate_one())
137 {
138 None => step.inline_body.as_ref().map(|b| b.to_string()),
139 Some(Ok(s)) => Some(s),
140 Some(Err(e)) => {
141 eprintln!(
142 "error: template serialization failed in {}/{}, aborting iteration: {e}",
143 self.scenario_name, step.step_name
144 );
145 for remaining in &self.steps[step_idx..] {
146 self.emit_skipped(remaining);
147 }
148 break;
149 }
150 };
151
152 if let Some(ref body) = body_string {
154 match inject_captures(body, &captures) {
155 Ok(injected) => body_string = Some(injected),
156 Err(e) => {
157 eprintln!(
158 "warning: capture injection into body failed in {}/{}, aborting iteration: {e}",
159 self.scenario_name, step.step_name
160 );
161 for remaining in &self.steps[step_idx..] {
164 self.emit_skipped(remaining);
165 }
166 break;
167 }
168 }
169 }
170
171 let headers = if step.has_capture_headers {
173 match inject_captures_into_headers(&step.plain_headers, &captures) {
174 Ok(injected) => {
175 if injected.is_empty() {
176 None
177 } else {
178 Some(Arc::new(injected))
179 }
180 }
181 Err(e) => {
182 eprintln!(
183 "warning: capture injection into headers failed in {}/{}, aborting iteration: {e}",
184 self.scenario_name, step.step_name
185 );
186 for remaining in &self.steps[step_idx..] {
187 self.emit_skipped(remaining);
188 }
189 break;
190 }
191 }
192 } else if step.plain_headers.is_empty() {
193 None
194 } else {
195 Some(Arc::clone(&step.plain_headers))
196 };
197
198 if let Some(ref rl) = self.rate_limiter {
202 tokio::select! {
203 _ = self.cancellation_token.cancelled() => return,
204 _ = rl.acquire() => {}
205 }
206 }
207
208 let resolved = step.request_config.resolve_body(body_string);
211
212 let client = step.request_config.client.clone();
213 let url = Arc::clone(&step.request_config.host);
214 let method = step.request_config.method;
215 let tracked_fields = step.response_template.clone();
216 let needs_response_body = tracked_fields.is_some() || !step.captures.is_empty();
217
218 let result_fut = async {
219 let mut req = Request::new(client, url, method);
220 if let Some((content, content_type)) = resolved {
221 req = req.body(content, content_type);
222 }
223 if needs_response_body {
224 req = req.read_response();
225 }
226 if let Some(h) = headers {
227 req = req.headers(h);
228 }
229 req.execute().await
230 };
231
232 tokio::select! {
233 _ = self.cancellation_token.cancelled() => return,
234 result = result_fut => {
235 let parsed_body: Option<serde_json::Value> =
237 result.response_body.as_deref().and_then(|s| {
238 serde_json::from_str(s).ok()
239 });
240
241 let extraction = if let Some(ref fields) = tracked_fields {
243 parsed_body
244 .as_ref()
245 .map(|val| crate::response_template::extractor::extract(val, fields))
246 } else {
247 None
248 };
249
250 if !step.captures.is_empty()
252 && let Some(ref body_val) = parsed_body {
253 for cap in &step.captures {
254 if let Some(matched) = resolve_path(body_val, &cap.path)
255 && let Some(s) = value_to_string(matched) {
256 captures.insert(cap.alias.clone(), s);
257 }
258 }
259 }
260
261 let step_failed = !result.success;
262
263 let record = RequestRecord {
265 duration: result.duration,
266 completed_at: result.completed_at,
267 success: result.success,
268 status_code: result.status_code,
269 extraction,
270 scenario: Some(Arc::clone(&self.scenario_name)),
271 step: Some(Arc::clone(&step.step_name)),
272 skipped: false,
273 };
274
275 if self.result_tx.send(record).is_err() {
276 return;
277 }
278
279 if step_failed
281 && matches!(self.on_step_failure, OnStepFailure::AbortIteration)
282 {
283 abort_remaining = true;
284 }
285 }
286 }
287 }
288 }
289 })
290 }
291}
292
293#[cfg(test)]
296mod tests {
297 use super::*;
298 use std::sync::atomic::AtomicUsize;
299
300 #[test]
303 fn struct_shape_step_exec() {
304 use crate::command::HttpMethod;
305 use crate::http::RequestConfig;
306
307 let config = Arc::new(RequestConfig {
308 client: reqwest::Client::new(),
309 host: Arc::new("http://localhost".to_string()),
310 method: HttpMethod::Get,
311 body: Arc::new(None),
312 tracked_fields: None,
313 headers: Arc::new(vec![]),
314 });
315
316 let step = StepExec {
317 step_name: Arc::from("login"),
318 request_config: Arc::clone(&config),
319 plain_headers: Arc::new(vec![]),
320 request_template: None,
321 response_template: None,
322 captures: vec![],
323 inline_body: None,
324 has_capture_headers: false,
325 };
326
327 assert_eq!(&*step.step_name, "login");
328 assert!(step.request_template.is_none());
329 assert!(step.response_template.is_none());
330 }
331
332 #[test]
335 fn struct_shape_scenario_vu() {
336 use crate::command::HttpMethod;
337 use crate::http::RequestConfig;
338
339 let config = Arc::new(RequestConfig {
340 client: reqwest::Client::new(),
341 host: Arc::new("http://localhost".to_string()),
342 method: HttpMethod::Get,
343 body: Arc::new(None),
344 tracked_fields: None,
345 headers: Arc::new(vec![]),
346 });
347
348 let (tx, _rx) = mpsc::unbounded_channel();
349
350 let vu = ScenarioVu {
351 scenario_name: Arc::from("checkout"),
352 steps: vec![StepExec {
353 step_name: Arc::from("add_to_cart"),
354 request_config: Arc::clone(&config),
355 plain_headers: Arc::new(vec![]),
356 request_template: None,
357 response_template: None,
358 captures: vec![],
359 inline_body: None,
360 has_capture_headers: false,
361 }],
362 on_step_failure: OnStepFailure::Continue,
363 cancellation_token: CancellationToken::new(),
364 result_tx: tx,
365 budget: None,
366 rate_limiter: None,
367 };
368
369 assert_eq!(&*vu.scenario_name, "checkout");
370 assert_eq!(vu.steps.len(), 1);
371 assert!(vu.budget.is_none());
372 }
373
374 #[test]
377 fn struct_shape_scenario_vu_with_budget() {
378 let (tx, _rx) = mpsc::unbounded_channel();
379 let budget = Arc::new(AtomicUsize::new(50));
380
381 let vu = ScenarioVu {
382 scenario_name: Arc::from("checkout"),
383 steps: vec![],
384 on_step_failure: OnStepFailure::AbortIteration,
385 cancellation_token: CancellationToken::new(),
386 result_tx: tx,
387 budget: Some(Arc::clone(&budget)),
388 rate_limiter: None,
389 };
390
391 assert_eq!(vu.budget.unwrap().load(Ordering::Relaxed), 50);
392 }
393
394 #[test]
397 fn budget_claim_exhausts_correctly() {
398 let budget = Arc::new(AtomicUsize::new(2));
399
400 let first = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
402 if n > 0 { Some(n - 1) } else { None }
403 });
404 assert!(first.is_ok());
405 assert_eq!(budget.load(Ordering::Relaxed), 1);
406
407 let second = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
409 if n > 0 { Some(n - 1) } else { None }
410 });
411 assert!(second.is_ok());
412 assert_eq!(budget.load(Ordering::Relaxed), 0);
413
414 let third = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
416 if n > 0 { Some(n - 1) } else { None }
417 });
418 assert!(third.is_err());
419 assert_eq!(budget.load(Ordering::Relaxed), 0);
420 }
421}