1use std::sync::Arc;
2use std::time::Instant;
3
4use tokio_util::sync::CancellationToken;
5
6use crate::command::Command;
7use crate::execution::curve::{CurveExecutor, CurveExecutorParams};
8use crate::execution::fixed::{FixedExecutor, FixedExecutorParams};
9use crate::execution::{
10 CurveStats, ExecutionMode, RequestSpec, ResolvedScenario, RunMode, RunStats,
11 build_request_config, resolve_tracked_fields,
12};
13use crate::load_curve::LoadCurve;
14use crate::request_template::Template;
15
16pub struct RunCommand {
19 pub request: RequestSpec,
20 pub execution: ExecutionMode,
21}
22
23impl Command for RunCommand {
24 async fn execute(self) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
25 match self.execution {
26 ExecutionMode::Fixed {
27 request_count,
28 concurrency,
29 rps,
30 } => execute_fixed(self.request, request_count, concurrency, rps).await,
31 ExecutionMode::Curve { curve, rps } => execute_curve(self.request, curve, rps).await,
32 }
33 }
34}
35
36async fn execute_fixed(
40 request_spec: RequestSpec,
41 total: usize,
42 concurrency: usize,
43 rps: Option<usize>,
44) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
45 match request_spec {
46 RequestSpec::Single {
47 host,
48 method,
49 body,
50 template_path,
51 response_template_path,
52 headers,
53 } => {
54 let template: Option<Arc<Template>> = template_path
56 .map(|path| Template::parse(&path).map(Arc::new))
57 .transpose()
58 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
59
60 let tracked_fields = resolve_tracked_fields(response_template_path)?;
61 let request_config =
62 build_request_config(host, method, body, tracked_fields, headers, concurrency)?;
63
64 let cancellation_token = CancellationToken::new();
65 let cancel = cancellation_token.clone();
66 tokio::spawn(async move {
67 match tokio::signal::ctrl_c().await {
68 Ok(()) => {
69 eprintln!(
70 "\nShutdown signal received — waiting for in-flight requests to finish..."
71 );
72 cancel.cancel();
73 }
74 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
75 }
76 });
77
78 let started_at = Instant::now();
79
80 let result = FixedExecutor::new(FixedExecutorParams {
81 request_config: Arc::clone(&request_config),
82 template,
83 total,
84 concurrency,
85 rps,
86 cancellation_token,
87 scenarios: None,
88 })
89 .execute()
90 .await?;
91
92 Ok(Some(RunStats {
93 elapsed: started_at.elapsed(),
94 mode: RunMode::Fixed,
95 latency: result.latency,
96 status_codes: result.status_codes,
97 total_requests: result.total_requests,
98 total_failures: result.total_failures,
99 total_skipped: result.total_skipped,
100 template_stats: None,
101 response_stats: result.response_stats,
102 curve_stats: None,
103 scenario_stats: result.scenario_stats,
104 }))
105 }
106 RequestSpec::Scenarios(scenarios) => {
107 execute_fixed_scenarios(scenarios, total, concurrency, rps).await
108 }
109 }
110}
111
112async fn execute_fixed_scenarios(
114 scenarios: Vec<ResolvedScenario>,
115 total: usize,
116 concurrency: usize,
117 rps: Option<usize>,
118) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
119 use crate::command::HttpMethod;
120
121 let placeholder_config = build_request_config(
125 String::new(),
126 HttpMethod::Get,
127 None,
128 None,
129 vec![],
130 concurrency,
131 )?;
132
133 let cancellation_token = CancellationToken::new();
134 let cancel = cancellation_token.clone();
135 tokio::spawn(async move {
136 match tokio::signal::ctrl_c().await {
137 Ok(()) => {
138 eprintln!(
139 "\nShutdown signal received — waiting for in-flight requests to finish..."
140 );
141 cancel.cancel();
142 }
143 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
144 }
145 });
146
147 let started_at = Instant::now();
148
149 let result = FixedExecutor::new(FixedExecutorParams {
150 request_config: placeholder_config,
151 template: None,
152 total,
153 concurrency,
154 rps,
155 cancellation_token,
156 scenarios: Some(scenarios),
157 })
158 .execute()
159 .await?;
160
161 Ok(Some(RunStats {
162 elapsed: started_at.elapsed(),
163 mode: RunMode::Fixed,
164 latency: result.latency,
165 status_codes: result.status_codes,
166 total_requests: result.total_requests,
167 total_failures: result.total_failures,
168 total_skipped: result.total_skipped,
169 template_stats: None,
170 response_stats: result.response_stats,
171 curve_stats: None,
172 scenario_stats: result.scenario_stats,
173 }))
174}
175
176async fn execute_curve(
180 request_spec: RequestSpec,
181 curve: LoadCurve,
182 rps: Option<usize>,
183) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
184 match request_spec {
185 RequestSpec::Single {
186 host,
187 method,
188 body,
189 template_path,
190 response_template_path,
191 headers,
192 } => {
193 let curve_duration = curve.total_duration();
194 let curve_stages = curve.stages.clone();
195
196 let template: Option<Arc<Template>> = template_path
198 .map(|path| Template::parse(&path).map(Arc::new))
199 .transpose()
200 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
201
202 let tracked_fields = resolve_tracked_fields(response_template_path)?;
203 let peak_vus = curve
204 .stages
205 .iter()
206 .map(|s| s.target_vus as usize)
207 .max()
208 .unwrap_or(1);
209 let request_config =
210 build_request_config(host, method, body, tracked_fields, headers, peak_vus)?;
211
212 let cancellation_token = CancellationToken::new();
213 let cancel = cancellation_token.clone();
214 tokio::spawn(async move {
215 match tokio::signal::ctrl_c().await {
216 Ok(()) => {
217 eprintln!("\nShutdown signal received — cancelling curve execution...");
218 cancel.cancel();
219 }
220 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
221 }
222 });
223
224 let started_at = Instant::now();
225
226 let executor = CurveExecutor::new(CurveExecutorParams {
227 curve,
228 request_config: Arc::clone(&request_config),
229 template,
230 cancellation_token,
231 rps,
232 scenarios: None,
233 });
234
235 let curve_result = executor.execute().await?;
236
237 Ok(Some(RunStats {
238 elapsed: started_at.elapsed(),
239 mode: RunMode::Curve,
240 latency: curve_result.latency,
241 status_codes: curve_result.status_codes,
242 total_requests: curve_result.total_requests,
243 total_failures: curve_result.total_failures,
244 total_skipped: curve_result.total_skipped,
245 template_stats: None,
246 response_stats: curve_result.response_stats,
247 curve_stats: Some(CurveStats {
248 duration: curve_duration,
249 stages: curve_stages,
250 stage_stats: curve_result.stage_stats,
251 }),
252 scenario_stats: curve_result.scenario_stats,
253 }))
254 }
255 RequestSpec::Scenarios(scenarios) => execute_curve_scenarios(scenarios, curve, rps).await,
256 }
257}
258
259async fn execute_curve_scenarios(
261 scenarios: Vec<ResolvedScenario>,
262 curve: LoadCurve,
263 rps: Option<usize>,
264) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
265 use crate::command::HttpMethod;
266
267 let curve_duration = curve.total_duration();
268 let curve_stages = curve.stages.clone();
269
270 let peak_vus = curve
271 .stages
272 .iter()
273 .map(|s| s.target_vus as usize)
274 .max()
275 .unwrap_or(1);
276
277 let placeholder_config =
280 build_request_config(String::new(), HttpMethod::Get, None, None, vec![], peak_vus)?;
281
282 let cancellation_token = CancellationToken::new();
283 let cancel = cancellation_token.clone();
284 tokio::spawn(async move {
285 match tokio::signal::ctrl_c().await {
286 Ok(()) => {
287 eprintln!("\nShutdown signal received — cancelling curve execution...");
288 cancel.cancel();
289 }
290 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
291 }
292 });
293
294 let started_at = Instant::now();
295
296 let executor = CurveExecutor::new(CurveExecutorParams {
297 curve,
298 request_config: placeholder_config,
299 template: None,
300 cancellation_token,
301 rps,
302 scenarios: Some(scenarios),
303 });
304
305 let curve_result = executor.execute().await?;
306
307 Ok(Some(RunStats {
308 elapsed: started_at.elapsed(),
309 mode: RunMode::Curve,
310 latency: curve_result.latency,
311 status_codes: curve_result.status_codes,
312 total_requests: curve_result.total_requests,
313 total_failures: curve_result.total_failures,
314 total_skipped: curve_result.total_skipped,
315 template_stats: None,
316 response_stats: curve_result.response_stats,
317 curve_stats: Some(CurveStats {
318 duration: curve_duration,
319 stages: curve_stages,
320 stage_stats: curve_result.stage_stats,
321 }),
322 scenario_stats: curve_result.scenario_stats,
323 }))
324}
325
326#[cfg(test)]
329mod tests {
330 use std::time::Duration;
331
332 use crate::execution::{CurveStats, RunMode, RunStats};
333 use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
334 use crate::load_curve::{RampType, Stage};
335
336 fn make_stats_fixed() -> RunStats {
337 RunStats {
338 elapsed: Duration::from_secs(1),
339 mode: RunMode::Fixed,
340 latency: LatencyHistogram::new(),
341 status_codes: StatusCodeHistogram::new(),
342 total_requests: 10,
343 total_failures: 0,
344 total_skipped: 0,
345 template_stats: None,
346 response_stats: None,
347 curve_stats: None,
348 scenario_stats: None,
349 }
350 }
351
352 fn make_stats_curve(stages: Vec<Stage>) -> RunStats {
353 use crate::execution::StageStats;
354 let n = stages.len();
355 RunStats {
356 elapsed: Duration::from_secs(10),
357 mode: RunMode::Curve,
358 latency: LatencyHistogram::new(),
359 status_codes: StatusCodeHistogram::new(),
360 total_requests: 100,
361 total_failures: 2,
362 total_skipped: 0,
363 template_stats: None,
364 response_stats: None,
365 curve_stats: Some(CurveStats {
366 duration: Duration::from_secs(10),
367 stages,
368 stage_stats: (0..n)
369 .map(|_| StageStats {
370 latency: LatencyHistogram::new(),
371 status_codes: StatusCodeHistogram::new(),
372 total_requests: 0,
373 total_failures: 0,
374 })
375 .collect(),
376 }),
377 scenario_stats: None,
378 }
379 }
380
381 #[test]
384 fn curve_stages_none_for_fixed_mode() {
385 let stats = make_stats_fixed();
386 assert!(
387 stats.curve_stats.is_none(),
388 "fixed-mode RunStats must have curve_stats == None"
389 );
390 }
391
392 #[test]
395 fn curve_stages_some_for_curve_mode() {
396 let stages = vec![
397 Stage {
398 duration: Duration::from_secs(5),
399 target_vus: 50,
400 ramp: RampType::Linear,
401 },
402 Stage {
403 duration: Duration::from_secs(5),
404 target_vus: 100,
405 ramp: RampType::Step,
406 },
407 ];
408 let stats = make_stats_curve(stages.clone());
409
410 let stored = stats
411 .curve_stats
412 .expect("curve_stats must be Some in curve mode")
413 .stages;
414 assert_eq!(stored.len(), 2);
415 assert_eq!(stored[0].target_vus, 50);
416 assert_eq!(stored[0].ramp, RampType::Linear);
417 assert_eq!(stored[1].target_vus, 100);
418 assert_eq!(stored[1].ramp, RampType::Step);
419 }
420
421 #[test]
424 fn curve_stages_count_matches_original() {
425 let stages: Vec<Stage> = (0..5)
426 .map(|i| Stage {
427 duration: Duration::from_secs(10),
428 target_vus: (i + 1) * 20,
429 ramp: RampType::Linear,
430 })
431 .collect();
432 let count = stages.len();
433 let stats = make_stats_curve(stages);
434 assert_eq!(
435 stats.curve_stats.unwrap().stages.len(),
436 count,
437 "stored stage count must equal original stage count"
438 );
439 }
440
441 #[test]
444 fn run_mode_fixed_variant() {
445 let stats = make_stats_fixed();
446 assert_eq!(stats.mode, RunMode::Fixed);
447 }
448
449 #[test]
452 fn run_mode_curve_variant() {
453 let stages = vec![Stage {
454 duration: Duration::from_secs(5),
455 target_vus: 10,
456 ramp: RampType::Linear,
457 }];
458 let stats = make_stats_curve(stages);
459 assert_eq!(stats.mode, RunMode::Curve);
460 }
461}