1use std::sync::Arc;
2use std::time::Instant;
3
4use tokio_util::sync::CancellationToken;
5
6use crate::command::Command;
7use crate::execution::curve::{CurveExecutor, CurveExecutorParams};
8use crate::execution::fixed::{FixedExecutor, FixedExecutorParams};
9use crate::execution::{
10 CurveStats, ExecutionMode, RequestSpec, RunMode, RunStats, build_request_config,
11 resolve_tracked_fields,
12};
13use crate::load_curve::LoadCurve;
14use crate::request_template::Template;
15
16pub struct RunCommand {
19 pub request: RequestSpec,
20 pub execution: ExecutionMode,
21}
22
23impl Command for RunCommand {
24 async fn execute(self) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
25 match self.execution {
26 ExecutionMode::Fixed {
27 request_count,
28 concurrency,
29 } => execute_fixed(self.request, request_count, concurrency).await,
30 ExecutionMode::Curve(curve) => execute_curve(self.request, curve).await,
31 }
32 }
33}
34
35async fn execute_fixed(
39 request_spec: RequestSpec,
40 total: usize,
41 concurrency: usize,
42) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
43 let RequestSpec {
44 host,
45 method,
46 body,
47 template_path,
48 response_template_path,
49 headers,
50 } = request_spec;
51
52 let template: Option<Arc<Template>> = template_path
54 .map(|path| Template::parse(&path).map(Arc::new))
55 .transpose()
56 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
57
58 let tracked_fields = resolve_tracked_fields(response_template_path)?;
59 let request_config =
60 build_request_config(host, method, body, tracked_fields, headers, concurrency)?;
61
62 let cancellation_token = CancellationToken::new();
63 let cancel = cancellation_token.clone();
64 tokio::spawn(async move {
65 match tokio::signal::ctrl_c().await {
66 Ok(()) => {
67 eprintln!(
68 "\nShutdown signal received — waiting for in-flight requests to finish..."
69 );
70 cancel.cancel();
71 }
72 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
73 }
74 });
75
76 let started_at = Instant::now();
77
78 let result = FixedExecutor::new(FixedExecutorParams {
79 request_config: Arc::clone(&request_config),
80 template,
81 total,
82 concurrency,
83 cancellation_token,
84 })
85 .execute()
86 .await?;
87
88 Ok(Some(RunStats {
89 elapsed: started_at.elapsed(),
90 mode: RunMode::Fixed,
91 latency: result.latency,
92 status_codes: result.status_codes,
93 total_requests: result.total_requests,
94 total_failures: result.total_failures,
95 template_stats: None,
96 response_stats: result.response_stats,
97 curve_stats: None,
98 }))
99}
100
101async fn execute_curve(
105 request_spec: RequestSpec,
106 curve: LoadCurve,
107) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
108 let RequestSpec {
109 host,
110 method,
111 body,
112 template_path,
113 response_template_path,
114 headers,
115 } = request_spec;
116 let curve_duration = curve.total_duration();
117 let curve_stages = curve.stages.clone();
118
119 let template: Option<Arc<Template>> = template_path
121 .map(|path| Template::parse(&path).map(Arc::new))
122 .transpose()
123 .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
124
125 let tracked_fields = resolve_tracked_fields(response_template_path)?;
126 let peak_vus = curve
127 .stages
128 .iter()
129 .map(|s| s.target_vus as usize)
130 .max()
131 .unwrap_or(1);
132 let request_config =
133 build_request_config(host, method, body, tracked_fields, headers, peak_vus)?;
134
135 let cancellation_token = CancellationToken::new();
136 let cancel = cancellation_token.clone();
137 tokio::spawn(async move {
138 match tokio::signal::ctrl_c().await {
139 Ok(()) => {
140 eprintln!("\nShutdown signal received — cancelling curve execution...");
141 cancel.cancel();
142 }
143 Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
144 }
145 });
146
147 let started_at = Instant::now();
148
149 let executor = CurveExecutor::new(CurveExecutorParams {
150 curve,
151 request_config: Arc::clone(&request_config),
152 template,
153 cancellation_token,
154 });
155
156 let curve_result = executor.execute().await?;
157
158 Ok(Some(RunStats {
159 elapsed: started_at.elapsed(),
160 mode: RunMode::Curve,
161 latency: curve_result.latency,
162 status_codes: curve_result.status_codes,
163 total_requests: curve_result.total_requests,
164 total_failures: curve_result.total_failures,
165 template_stats: None,
166 response_stats: curve_result.response_stats,
167 curve_stats: Some(CurveStats {
168 duration: curve_duration,
169 stages: curve_stages,
170 stage_stats: curve_result.stage_stats,
171 }),
172 }))
173}
174
175#[cfg(test)]
178mod tests {
179 use std::time::Duration;
180
181 use crate::execution::{CurveStats, RunMode, RunStats};
182 use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
183 use crate::load_curve::{RampType, Stage};
184
185 fn make_stats_fixed() -> RunStats {
186 RunStats {
187 elapsed: Duration::from_secs(1),
188 mode: RunMode::Fixed,
189 latency: LatencyHistogram::new(),
190 status_codes: StatusCodeHistogram::new(),
191 total_requests: 10,
192 total_failures: 0,
193 template_stats: None,
194 response_stats: None,
195 curve_stats: None,
196 }
197 }
198
199 fn make_stats_curve(stages: Vec<Stage>) -> RunStats {
200 use crate::execution::StageStats;
201 let n = stages.len();
202 RunStats {
203 elapsed: Duration::from_secs(10),
204 mode: RunMode::Curve,
205 latency: LatencyHistogram::new(),
206 status_codes: StatusCodeHistogram::new(),
207 total_requests: 100,
208 total_failures: 2,
209 template_stats: None,
210 response_stats: None,
211 curve_stats: Some(CurveStats {
212 duration: Duration::from_secs(10),
213 stages,
214 stage_stats: (0..n)
215 .map(|_| StageStats {
216 latency: LatencyHistogram::new(),
217 status_codes: StatusCodeHistogram::new(),
218 total_requests: 0,
219 total_failures: 0,
220 })
221 .collect(),
222 }),
223 }
224 }
225
226 #[test]
229 fn curve_stages_none_for_fixed_mode() {
230 let stats = make_stats_fixed();
231 assert!(
232 stats.curve_stats.is_none(),
233 "fixed-mode RunStats must have curve_stats == None"
234 );
235 }
236
237 #[test]
240 fn curve_stages_some_for_curve_mode() {
241 let stages = vec![
242 Stage {
243 duration: Duration::from_secs(5),
244 target_vus: 50,
245 ramp: RampType::Linear,
246 },
247 Stage {
248 duration: Duration::from_secs(5),
249 target_vus: 100,
250 ramp: RampType::Step,
251 },
252 ];
253 let stats = make_stats_curve(stages.clone());
254
255 let stored = stats
256 .curve_stats
257 .expect("curve_stats must be Some in curve mode")
258 .stages;
259 assert_eq!(stored.len(), 2);
260 assert_eq!(stored[0].target_vus, 50);
261 assert_eq!(stored[0].ramp, RampType::Linear);
262 assert_eq!(stored[1].target_vus, 100);
263 assert_eq!(stored[1].ramp, RampType::Step);
264 }
265
266 #[test]
269 fn curve_stages_count_matches_original() {
270 let stages: Vec<Stage> = (0..5)
271 .map(|i| Stage {
272 duration: Duration::from_secs(10),
273 target_vus: (i + 1) * 20,
274 ramp: RampType::Linear,
275 })
276 .collect();
277 let count = stages.len();
278 let stats = make_stats_curve(stages);
279 assert_eq!(
280 stats.curve_stats.unwrap().stages.len(),
281 count,
282 "stored stage count must equal original stage count"
283 );
284 }
285
286 #[test]
289 fn run_mode_fixed_variant() {
290 let stats = make_stats_fixed();
291 assert_eq!(stats.mode, RunMode::Fixed);
292 }
293
294 #[test]
297 fn run_mode_curve_variant() {
298 let stages = vec![Stage {
299 duration: Duration::from_secs(5),
300 target_vus: 10,
301 ramp: RampType::Linear,
302 }];
303 let stats = make_stats_curve(stages);
304 assert_eq!(stats.mode, RunMode::Curve);
305 }
306}