Skip to main content

lmn_core/command/
run.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use tokio_util::sync::CancellationToken;
5
6use crate::command::Command;
7use crate::execution::curve::{CurveExecutor, CurveExecutorParams};
8use crate::execution::fixed::{FixedExecutor, FixedExecutorParams};
9use crate::execution::{
10    CurveStats, ExecutionMode, RequestSpec, RunMode, RunStats, build_request_config,
11    resolve_tracked_fields,
12};
13use crate::load_curve::LoadCurve;
14use crate::request_template::Template;
15
16// ── RunCommand ────────────────────────────────────────────────────────────────
17
18pub struct RunCommand {
19    pub request: RequestSpec,
20    pub execution: ExecutionMode,
21}
22
23impl Command for RunCommand {
24    async fn execute(self) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
25        match self.execution {
26            ExecutionMode::Fixed {
27                request_count,
28                concurrency,
29            } => execute_fixed(self.request, request_count, concurrency).await,
30            ExecutionMode::Curve(curve) => execute_curve(self.request, curve).await,
31        }
32    }
33}
34
35// ── execute_fixed ─────────────────────────────────────────────────────────────
36
37/// Fixed-count semaphore-based execution path.
38async fn execute_fixed(
39    request_spec: RequestSpec,
40    total: usize,
41    concurrency: usize,
42) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
43    let RequestSpec {
44        host,
45        method,
46        body,
47        template_path,
48        response_template_path,
49        headers,
50    } = request_spec;
51
52    // Parse template for on-demand body generation (no pre-generation).
53    let template: Option<Arc<Template>> = template_path
54        .map(|path| Template::parse(&path).map(Arc::new))
55        .transpose()
56        .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
57
58    let tracked_fields = resolve_tracked_fields(response_template_path)?;
59    let request_config =
60        build_request_config(host, method, body, tracked_fields, headers, concurrency)?;
61
62    let cancellation_token = CancellationToken::new();
63    let cancel = cancellation_token.clone();
64    tokio::spawn(async move {
65        match tokio::signal::ctrl_c().await {
66            Ok(()) => {
67                eprintln!(
68                    "\nShutdown signal received — waiting for in-flight requests to finish..."
69                );
70                cancel.cancel();
71            }
72            Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
73        }
74    });
75
76    let started_at = Instant::now();
77
78    let result = FixedExecutor::new(FixedExecutorParams {
79        request_config: Arc::clone(&request_config),
80        template,
81        total,
82        concurrency,
83        cancellation_token,
84    })
85    .execute()
86    .await?;
87
88    Ok(Some(RunStats {
89        elapsed: started_at.elapsed(),
90        mode: RunMode::Fixed,
91        latency: result.latency,
92        status_codes: result.status_codes,
93        total_requests: result.total_requests,
94        total_failures: result.total_failures,
95        template_stats: None,
96        response_stats: result.response_stats,
97        curve_stats: None,
98    }))
99}
100
101// ── execute_curve ─────────────────────────────────────────────────────────────
102
103/// Curve-based dynamic VU execution path.
104async fn execute_curve(
105    request_spec: RequestSpec,
106    curve: LoadCurve,
107) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
108    let RequestSpec {
109        host,
110        method,
111        body,
112        template_path,
113        response_template_path,
114        headers,
115    } = request_spec;
116    let curve_duration = curve.total_duration();
117    let curve_stages = curve.stages.clone();
118
119    // Parse template for on-demand body generation (no pre-generation in curve mode)
120    let template: Option<Arc<Template>> = template_path
121        .map(|path| Template::parse(&path).map(Arc::new))
122        .transpose()
123        .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
124
125    let tracked_fields = resolve_tracked_fields(response_template_path)?;
126    let peak_vus = curve
127        .stages
128        .iter()
129        .map(|s| s.target_vus as usize)
130        .max()
131        .unwrap_or(1);
132    let request_config =
133        build_request_config(host, method, body, tracked_fields, headers, peak_vus)?;
134
135    let cancellation_token = CancellationToken::new();
136    let cancel = cancellation_token.clone();
137    tokio::spawn(async move {
138        match tokio::signal::ctrl_c().await {
139            Ok(()) => {
140                eprintln!("\nShutdown signal received — cancelling curve execution...");
141                cancel.cancel();
142            }
143            Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
144        }
145    });
146
147    let started_at = Instant::now();
148
149    let executor = CurveExecutor::new(CurveExecutorParams {
150        curve,
151        request_config: Arc::clone(&request_config),
152        template,
153        cancellation_token,
154    });
155
156    let curve_result = executor.execute().await?;
157
158    Ok(Some(RunStats {
159        elapsed: started_at.elapsed(),
160        mode: RunMode::Curve,
161        latency: curve_result.latency,
162        status_codes: curve_result.status_codes,
163        total_requests: curve_result.total_requests,
164        total_failures: curve_result.total_failures,
165        template_stats: None,
166        response_stats: curve_result.response_stats,
167        curve_stats: Some(CurveStats {
168            duration: curve_duration,
169            stages: curve_stages,
170            stage_stats: curve_result.stage_stats,
171        }),
172    }))
173}
174
175// ── Tests ─────────────────────────────────────────────────────────────────────
176
177#[cfg(test)]
178mod tests {
179    use std::time::Duration;
180
181    use crate::execution::{CurveStats, RunMode, RunStats};
182    use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
183    use crate::load_curve::{RampType, Stage};
184
185    fn make_stats_fixed() -> RunStats {
186        RunStats {
187            elapsed: Duration::from_secs(1),
188            mode: RunMode::Fixed,
189            latency: LatencyHistogram::new(),
190            status_codes: StatusCodeHistogram::new(),
191            total_requests: 10,
192            total_failures: 0,
193            template_stats: None,
194            response_stats: None,
195            curve_stats: None,
196        }
197    }
198
199    fn make_stats_curve(stages: Vec<Stage>) -> RunStats {
200        use crate::execution::StageStats;
201        let n = stages.len();
202        RunStats {
203            elapsed: Duration::from_secs(10),
204            mode: RunMode::Curve,
205            latency: LatencyHistogram::new(),
206            status_codes: StatusCodeHistogram::new(),
207            total_requests: 100,
208            total_failures: 2,
209            template_stats: None,
210            response_stats: None,
211            curve_stats: Some(CurveStats {
212                duration: Duration::from_secs(10),
213                stages,
214                stage_stats: (0..n)
215                    .map(|_| StageStats {
216                        latency: LatencyHistogram::new(),
217                        status_codes: StatusCodeHistogram::new(),
218                        total_requests: 0,
219                        total_failures: 0,
220                    })
221                    .collect(),
222            }),
223        }
224    }
225
226    // ── curve_stages_none_for_fixed_mode ──────────────────────────────────────
227
228    #[test]
229    fn curve_stages_none_for_fixed_mode() {
230        let stats = make_stats_fixed();
231        assert!(
232            stats.curve_stats.is_none(),
233            "fixed-mode RunStats must have curve_stats == None"
234        );
235    }
236
237    // ── curve_stages_some_for_curve_mode ──────────────────────────────────────
238
239    #[test]
240    fn curve_stages_some_for_curve_mode() {
241        let stages = vec![
242            Stage {
243                duration: Duration::from_secs(5),
244                target_vus: 50,
245                ramp: RampType::Linear,
246            },
247            Stage {
248                duration: Duration::from_secs(5),
249                target_vus: 100,
250                ramp: RampType::Step,
251            },
252        ];
253        let stats = make_stats_curve(stages.clone());
254
255        let stored = stats
256            .curve_stats
257            .expect("curve_stats must be Some in curve mode")
258            .stages;
259        assert_eq!(stored.len(), 2);
260        assert_eq!(stored[0].target_vus, 50);
261        assert_eq!(stored[0].ramp, RampType::Linear);
262        assert_eq!(stored[1].target_vus, 100);
263        assert_eq!(stored[1].ramp, RampType::Step);
264    }
265
266    // ── curve_stages_count_matches_original ───────────────────────────────────
267
268    #[test]
269    fn curve_stages_count_matches_original() {
270        let stages: Vec<Stage> = (0..5)
271            .map(|i| Stage {
272                duration: Duration::from_secs(10),
273                target_vus: (i + 1) * 20,
274                ramp: RampType::Linear,
275            })
276            .collect();
277        let count = stages.len();
278        let stats = make_stats_curve(stages);
279        assert_eq!(
280            stats.curve_stats.unwrap().stages.len(),
281            count,
282            "stored stage count must equal original stage count"
283        );
284    }
285
286    // ── run_mode_fixed_variant ────────────────────────────────────────────────
287
288    #[test]
289    fn run_mode_fixed_variant() {
290        let stats = make_stats_fixed();
291        assert_eq!(stats.mode, RunMode::Fixed);
292    }
293
294    // ── run_mode_curve_variant ────────────────────────────────────────────────
295
296    #[test]
297    fn run_mode_curve_variant() {
298        let stages = vec![Stage {
299            duration: Duration::from_secs(5),
300            target_vus: 10,
301            ramp: RampType::Linear,
302        }];
303        let stats = make_stats_curve(stages);
304        assert_eq!(stats.mode, RunMode::Curve);
305    }
306}