Skip to main content

lmn_core/command/
run.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use tokio_util::sync::CancellationToken;
5
6use crate::command::Command;
7use crate::execution::curve::{CurveExecutor, CurveExecutorParams};
8use crate::execution::fixed::{FixedExecutor, FixedExecutorParams};
9use crate::execution::{
10    CurveStats, ExecutionMode, RequestSpec, ResolvedScenario, RunMode, RunStats,
11    build_request_config, resolve_tracked_fields,
12};
13use crate::load_curve::LoadCurve;
14use crate::request_template::Template;
15
16// ── RunCommand ────────────────────────────────────────────────────────────────
17
18pub struct RunCommand {
19    pub request: RequestSpec,
20    pub execution: ExecutionMode,
21}
22
23impl Command for RunCommand {
24    async fn execute(self) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
25        match self.execution {
26            ExecutionMode::Fixed {
27                request_count,
28                concurrency,
29                rps,
30            } => execute_fixed(self.request, request_count, concurrency, rps).await,
31            ExecutionMode::Curve { curve, rps } => execute_curve(self.request, curve, rps).await,
32        }
33    }
34}
35
36// ── execute_fixed ─────────────────────────────────────────────────────────────
37
38/// Fixed-count semaphore-based execution path.
39async fn execute_fixed(
40    request_spec: RequestSpec,
41    total: usize,
42    concurrency: usize,
43    rps: Option<usize>,
44) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
45    match request_spec {
46        RequestSpec::Single {
47            host,
48            method,
49            body,
50            template_path,
51            response_template_path,
52            headers,
53        } => {
54            // Parse template for on-demand body generation (no pre-generation).
55            let template: Option<Arc<Template>> = template_path
56                .map(|path| Template::parse(&path).map(Arc::new))
57                .transpose()
58                .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
59
60            let tracked_fields = resolve_tracked_fields(response_template_path)?;
61            let request_config =
62                build_request_config(host, method, body, tracked_fields, headers, concurrency)?;
63
64            let cancellation_token = CancellationToken::new();
65            let cancel = cancellation_token.clone();
66            tokio::spawn(async move {
67                match tokio::signal::ctrl_c().await {
68                    Ok(()) => {
69                        eprintln!(
70                            "\nShutdown signal received — waiting for in-flight requests to finish..."
71                        );
72                        cancel.cancel();
73                    }
74                    Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
75                }
76            });
77
78            let started_at = Instant::now();
79
80            let result = FixedExecutor::new(FixedExecutorParams {
81                request_config: Arc::clone(&request_config),
82                template,
83                total,
84                concurrency,
85                rps,
86                cancellation_token,
87                scenarios: None,
88            })
89            .execute()
90            .await?;
91
92            Ok(Some(RunStats {
93                elapsed: started_at.elapsed(),
94                mode: RunMode::Fixed,
95                latency: result.latency,
96                status_codes: result.status_codes,
97                total_requests: result.total_requests,
98                total_failures: result.total_failures,
99                total_skipped: result.total_skipped,
100                template_stats: None,
101                response_stats: result.response_stats,
102                curve_stats: None,
103                scenario_stats: result.scenario_stats,
104            }))
105        }
106        RequestSpec::Scenarios(scenarios) => {
107            execute_fixed_scenarios(scenarios, total, concurrency, rps).await
108        }
109    }
110}
111
112/// Fixed-count execution path for scenario-based runs.
113async fn execute_fixed_scenarios(
114    scenarios: Vec<ResolvedScenario>,
115    total: usize,
116    concurrency: usize,
117    rps: Option<usize>,
118) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
119    use crate::command::HttpMethod;
120
121    // Build a placeholder RequestConfig — scenarios use per-step configs stored
122    // in ResolvedScenario.steps. The FixedExecutor routes to ScenarioVu when
123    // scenarios is Some, ignoring this placeholder.
124    let placeholder_config = build_request_config(
125        String::new(),
126        HttpMethod::Get,
127        None,
128        None,
129        vec![],
130        concurrency,
131    )?;
132
133    let cancellation_token = CancellationToken::new();
134    let cancel = cancellation_token.clone();
135    tokio::spawn(async move {
136        match tokio::signal::ctrl_c().await {
137            Ok(()) => {
138                eprintln!(
139                    "\nShutdown signal received — waiting for in-flight requests to finish..."
140                );
141                cancel.cancel();
142            }
143            Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
144        }
145    });
146
147    let started_at = Instant::now();
148
149    let result = FixedExecutor::new(FixedExecutorParams {
150        request_config: placeholder_config,
151        template: None,
152        total,
153        concurrency,
154        rps,
155        cancellation_token,
156        scenarios: Some(scenarios),
157    })
158    .execute()
159    .await?;
160
161    Ok(Some(RunStats {
162        elapsed: started_at.elapsed(),
163        mode: RunMode::Fixed,
164        latency: result.latency,
165        status_codes: result.status_codes,
166        total_requests: result.total_requests,
167        total_failures: result.total_failures,
168        total_skipped: result.total_skipped,
169        template_stats: None,
170        response_stats: result.response_stats,
171        curve_stats: None,
172        scenario_stats: result.scenario_stats,
173    }))
174}
175
176// ── execute_curve ─────────────────────────────────────────────────────────────
177
178/// Curve-based dynamic VU execution path.
179async fn execute_curve(
180    request_spec: RequestSpec,
181    curve: LoadCurve,
182    rps: Option<usize>,
183) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
184    match request_spec {
185        RequestSpec::Single {
186            host,
187            method,
188            body,
189            template_path,
190            response_template_path,
191            headers,
192        } => {
193            let curve_duration = curve.total_duration();
194            let curve_stages = curve.stages.clone();
195
196            // Parse template for on-demand body generation (no pre-generation in curve mode)
197            let template: Option<Arc<Template>> = template_path
198                .map(|path| Template::parse(&path).map(Arc::new))
199                .transpose()
200                .map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
201
202            let tracked_fields = resolve_tracked_fields(response_template_path)?;
203            let peak_vus = curve
204                .stages
205                .iter()
206                .map(|s| s.target_vus as usize)
207                .max()
208                .unwrap_or(1);
209            let request_config =
210                build_request_config(host, method, body, tracked_fields, headers, peak_vus)?;
211
212            let cancellation_token = CancellationToken::new();
213            let cancel = cancellation_token.clone();
214            tokio::spawn(async move {
215                match tokio::signal::ctrl_c().await {
216                    Ok(()) => {
217                        eprintln!("\nShutdown signal received — cancelling curve execution...");
218                        cancel.cancel();
219                    }
220                    Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
221                }
222            });
223
224            let started_at = Instant::now();
225
226            let executor = CurveExecutor::new(CurveExecutorParams {
227                curve,
228                request_config: Arc::clone(&request_config),
229                template,
230                cancellation_token,
231                rps,
232                scenarios: None,
233            });
234
235            let curve_result = executor.execute().await?;
236
237            Ok(Some(RunStats {
238                elapsed: started_at.elapsed(),
239                mode: RunMode::Curve,
240                latency: curve_result.latency,
241                status_codes: curve_result.status_codes,
242                total_requests: curve_result.total_requests,
243                total_failures: curve_result.total_failures,
244                total_skipped: curve_result.total_skipped,
245                template_stats: None,
246                response_stats: curve_result.response_stats,
247                curve_stats: Some(CurveStats {
248                    duration: curve_duration,
249                    stages: curve_stages,
250                    stage_stats: curve_result.stage_stats,
251                }),
252                scenario_stats: curve_result.scenario_stats,
253            }))
254        }
255        RequestSpec::Scenarios(scenarios) => execute_curve_scenarios(scenarios, curve, rps).await,
256    }
257}
258
259/// Curve-based execution path for scenario-based runs.
260async fn execute_curve_scenarios(
261    scenarios: Vec<ResolvedScenario>,
262    curve: LoadCurve,
263    rps: Option<usize>,
264) -> Result<Option<RunStats>, Box<dyn std::error::Error>> {
265    use crate::command::HttpMethod;
266
267    let curve_duration = curve.total_duration();
268    let curve_stages = curve.stages.clone();
269
270    let peak_vus = curve
271        .stages
272        .iter()
273        .map(|s| s.target_vus as usize)
274        .max()
275        .unwrap_or(1);
276
277    // Build a placeholder RequestConfig — CurveExecutor routes to ScenarioVu when
278    // scenarios is Some, ignoring this placeholder.
279    let placeholder_config =
280        build_request_config(String::new(), HttpMethod::Get, None, None, vec![], peak_vus)?;
281
282    let cancellation_token = CancellationToken::new();
283    let cancel = cancellation_token.clone();
284    tokio::spawn(async move {
285        match tokio::signal::ctrl_c().await {
286            Ok(()) => {
287                eprintln!("\nShutdown signal received — cancelling curve execution...");
288                cancel.cancel();
289            }
290            Err(e) => eprintln!("warning: failed to listen for ctrl_c: {e}"),
291        }
292    });
293
294    let started_at = Instant::now();
295
296    let executor = CurveExecutor::new(CurveExecutorParams {
297        curve,
298        request_config: placeholder_config,
299        template: None,
300        cancellation_token,
301        rps,
302        scenarios: Some(scenarios),
303    });
304
305    let curve_result = executor.execute().await?;
306
307    Ok(Some(RunStats {
308        elapsed: started_at.elapsed(),
309        mode: RunMode::Curve,
310        latency: curve_result.latency,
311        status_codes: curve_result.status_codes,
312        total_requests: curve_result.total_requests,
313        total_failures: curve_result.total_failures,
314        total_skipped: curve_result.total_skipped,
315        template_stats: None,
316        response_stats: curve_result.response_stats,
317        curve_stats: Some(CurveStats {
318            duration: curve_duration,
319            stages: curve_stages,
320            stage_stats: curve_result.stage_stats,
321        }),
322        scenario_stats: curve_result.scenario_stats,
323    }))
324}
325
326// ── Tests ─────────────────────────────────────────────────────────────────────
327
328#[cfg(test)]
329mod tests {
330    use std::time::Duration;
331
332    use crate::execution::{CurveStats, RunMode, RunStats};
333    use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
334    use crate::load_curve::{RampType, Stage};
335
336    fn make_stats_fixed() -> RunStats {
337        RunStats {
338            elapsed: Duration::from_secs(1),
339            mode: RunMode::Fixed,
340            latency: LatencyHistogram::new(),
341            status_codes: StatusCodeHistogram::new(),
342            total_requests: 10,
343            total_failures: 0,
344            total_skipped: 0,
345            template_stats: None,
346            response_stats: None,
347            curve_stats: None,
348            scenario_stats: None,
349        }
350    }
351
352    fn make_stats_curve(stages: Vec<Stage>) -> RunStats {
353        use crate::execution::StageStats;
354        let n = stages.len();
355        RunStats {
356            elapsed: Duration::from_secs(10),
357            mode: RunMode::Curve,
358            latency: LatencyHistogram::new(),
359            status_codes: StatusCodeHistogram::new(),
360            total_requests: 100,
361            total_failures: 2,
362            total_skipped: 0,
363            template_stats: None,
364            response_stats: None,
365            curve_stats: Some(CurveStats {
366                duration: Duration::from_secs(10),
367                stages,
368                stage_stats: (0..n)
369                    .map(|_| StageStats {
370                        latency: LatencyHistogram::new(),
371                        status_codes: StatusCodeHistogram::new(),
372                        total_requests: 0,
373                        total_failures: 0,
374                    })
375                    .collect(),
376            }),
377            scenario_stats: None,
378        }
379    }
380
381    // ── curve_stages_none_for_fixed_mode ──────────────────────────────────────
382
383    #[test]
384    fn curve_stages_none_for_fixed_mode() {
385        let stats = make_stats_fixed();
386        assert!(
387            stats.curve_stats.is_none(),
388            "fixed-mode RunStats must have curve_stats == None"
389        );
390    }
391
392    // ── curve_stages_some_for_curve_mode ──────────────────────────────────────
393
394    #[test]
395    fn curve_stages_some_for_curve_mode() {
396        let stages = vec![
397            Stage {
398                duration: Duration::from_secs(5),
399                target_vus: 50,
400                ramp: RampType::Linear,
401            },
402            Stage {
403                duration: Duration::from_secs(5),
404                target_vus: 100,
405                ramp: RampType::Step,
406            },
407        ];
408        let stats = make_stats_curve(stages.clone());
409
410        let stored = stats
411            .curve_stats
412            .expect("curve_stats must be Some in curve mode")
413            .stages;
414        assert_eq!(stored.len(), 2);
415        assert_eq!(stored[0].target_vus, 50);
416        assert_eq!(stored[0].ramp, RampType::Linear);
417        assert_eq!(stored[1].target_vus, 100);
418        assert_eq!(stored[1].ramp, RampType::Step);
419    }
420
421    // ── curve_stages_count_matches_original ───────────────────────────────────
422
423    #[test]
424    fn curve_stages_count_matches_original() {
425        let stages: Vec<Stage> = (0..5)
426            .map(|i| Stage {
427                duration: Duration::from_secs(10),
428                target_vus: (i + 1) * 20,
429                ramp: RampType::Linear,
430            })
431            .collect();
432        let count = stages.len();
433        let stats = make_stats_curve(stages);
434        assert_eq!(
435            stats.curve_stats.unwrap().stages.len(),
436            count,
437            "stored stage count must equal original stage count"
438        );
439    }
440
441    // ── run_mode_fixed_variant ────────────────────────────────────────────────
442
443    #[test]
444    fn run_mode_fixed_variant() {
445        let stats = make_stats_fixed();
446        assert_eq!(stats.mode, RunMode::Fixed);
447    }
448
449    // ── run_mode_curve_variant ────────────────────────────────────────────────
450
451    #[test]
452    fn run_mode_curve_variant() {
453        let stages = vec![Stage {
454            duration: Duration::from_secs(5),
455            target_vus: 10,
456            ramp: RampType::Linear,
457        }];
458        let stats = make_stats_curve(stages);
459        assert_eq!(stats.mode, RunMode::Curve);
460    }
461}