Skip to main content

lmn_core/execution/curve/
mod.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use tokio::sync::mpsc;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7use tracing::debug;
8
9use crate::execution::{
10    DrainMetricsAccumulator, ResolvedScenario, RpsLimiter, ScenarioStats, StageStats,
11    assign_scenario,
12};
13use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
14use crate::http::{RequestConfig, RequestRecord};
15use crate::load_curve::{LoadCurve, Stage};
16use crate::request_template::Template;
17use crate::response_template::stats::ResponseStats;
18use crate::vu::Vu;
19use crate::vu::scenario::{ScenarioVu, StepExec};
20
21// ── CurveExecutorParams ───────────────────────────────────────────────────────
22
23/// Parameters for constructing a `CurveExecutor`.
24pub struct CurveExecutorParams {
25    pub curve: LoadCurve,
26    pub request_config: Arc<RequestConfig>,
27    pub template: Option<Arc<Template>>,
28    pub cancellation_token: CancellationToken,
29    /// Optional upper bound on aggregate requests-per-second across all VUs.
30    /// `None` means no rate limit. Values that overflow `u32` or equal zero
31    /// are treated as unset.
32    pub rps: Option<usize>,
33    /// When present, the executor spawns `ScenarioVu` instances instead of
34    /// plain `Vu` instances. VUs are assigned via weighted round-robin using a
35    /// monotonically increasing counter. Budget is always `None` in curve mode.
36    pub scenarios: Option<Vec<ResolvedScenario>>,
37}
38
39// ── CurveExecutionResult ──────────────────────────────────────────────────────
40
41/// Result returned by `CurveExecutor::execute`.
42pub struct CurveExecutionResult {
43    pub latency: LatencyHistogram,
44    pub status_codes: StatusCodeHistogram,
45    pub total_requests: u64,
46    pub total_failures: u64,
47    pub total_skipped: u64,
48    pub response_stats: Option<ResponseStats>,
49    pub stage_stats: Vec<StageStats>,
50    pub scenario_stats: Option<Vec<ScenarioStats>>,
51}
52
53// ── stage_index_at ────────────────────────────────────────────────────────────
54
55/// Returns the 0-based stage index for a given elapsed duration.
56fn stage_index_at(stages: &[Stage], elapsed: Duration) -> usize {
57    let mut offset = Duration::ZERO;
58    for (i, stage) in stages.iter().enumerate() {
59        offset += stage.duration;
60        if elapsed < offset {
61            return i;
62        }
63    }
64    stages.len().saturating_sub(1)
65}
66
67// ── CurveExecutor ─────────────────────────────────────────────────────────────
68
69/// Executes a load test driven by a `LoadCurve`, dynamically scaling VUs.
70pub struct CurveExecutor {
71    params: CurveExecutorParams,
72}
73
74impl CurveExecutor {
75    pub fn new(params: CurveExecutorParams) -> Self {
76        Self { params }
77    }
78
79    /// Runs the load curve, spawning and cancelling VU tasks as the curve
80    /// dictates. Returns a `CurveExecutionResult` when the curve completes or a
81    /// cancellation signal is received.
82    pub async fn execute(self) -> Result<CurveExecutionResult, crate::execution::RunError> {
83        let CurveExecutorParams {
84            curve,
85            request_config,
86            template,
87            cancellation_token,
88            rps,
89            scenarios,
90        } = self.params;
91
92        // Build the shared RPS limiter once. Every VU spawned during the curve
93        // receives the same `Arc` clone so the cap is genuinely aggregate.
94        let rate_limiter = rps.and_then(RpsLimiter::new);
95
96        let total_duration = curve.total_duration();
97        let run_start = Instant::now();
98
99        // Pre-convert headers once before spawning any VUs to avoid per-VU allocation.
100        let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
101            request_config
102                .headers
103                .iter()
104                .map(|(k, v)| (k.clone(), v.to_string()))
105                .collect(),
106        );
107
108        // `has_tracked_fields` is true when the single-request path uses a
109        // response template, OR when any step across any scenario does.
110        let has_tracked_fields = if let Some(ref sc) = scenarios {
111            sc.iter()
112                .flat_map(|s| s.steps.iter())
113                .any(|step| step.response_template.is_some())
114        } else {
115            request_config.tracked_fields.is_some()
116        };
117
118        let n_stages = curve.stages.len();
119
120        // Clone the stages vec so the drain task can own it without holding onto `curve`.
121        let drain_stages = curve.stages.clone();
122
123        // Unbounded channel; VUs push results as they complete without risk of blocking.
124        let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
125
126        // Spawn a dedicated drain task that owns the receiver and all accumulator
127        // state. It attributes each record to the correct stage via `completed_at`.
128        let drain_handle = tokio::spawn(async move {
129            let mut rx = rx;
130            let mut acc = DrainMetricsAccumulator::new(has_tracked_fields);
131
132            // Pre-allocate per-stage accumulators.
133            let mut stage_stats: Vec<StageStats> = (0..n_stages)
134                .map(|_| StageStats {
135                    latency: LatencyHistogram::new(),
136                    status_codes: StatusCodeHistogram::new(),
137                    total_requests: 0,
138                    total_failures: 0,
139                })
140                .collect();
141
142            while let Some(record) = rx.recv().await {
143                acc.record_request(&record);
144
145                // Determine which stage this record belongs to using its
146                // wall-clock completion time relative to the run start.
147                let elapsed = record
148                    .completed_at
149                    .checked_duration_since(run_start)
150                    .unwrap_or_default();
151                let stage_idx = stage_index_at(&drain_stages, elapsed);
152
153                stage_stats[stage_idx].total_requests += 1;
154                if !record.skipped {
155                    stage_stats[stage_idx].latency.record(record.duration);
156                    stage_stats[stage_idx]
157                        .status_codes
158                        .record(record.status_code);
159                    if !record.success {
160                        stage_stats[stage_idx].total_failures += 1;
161                    }
162                }
163
164                acc.record_extraction(record.extraction);
165            }
166            let scenario_stats = acc.finalize_scenario_stats();
167
168            CurveExecutionResult {
169                latency: acc.latency,
170                status_codes: acc.status_codes,
171                total_requests: acc.total_requests,
172                total_failures: acc.total_failures,
173                total_skipped: acc.total_skipped,
174                response_stats: acc.response_stats,
175                stage_stats,
176                scenario_stats,
177            }
178        });
179
180        // Track active VU handles and their per-VU cancellation tokens.
181        let mut vu_handles: Vec<(JoinHandle<()>, CancellationToken)> = Vec::new();
182
183        // Monotonically increasing counter used for deterministic scenario
184        // assignment. Each spawned VU gets the next index so the weighted
185        // round-robin assignment is stable regardless of despawn/respawn.
186        let mut vu_counter: usize = 0;
187
188        let mut ticker = tokio::time::interval(tokio::time::Duration::from_millis(100));
189
190        loop {
191            tokio::select! {
192                _ = cancellation_token.cancelled() => {
193                    debug!("curve executor: parent cancellation received");
194                    break;
195                }
196                _ = ticker.tick() => {
197                    let elapsed = run_start.elapsed();
198
199                    if elapsed >= total_duration {
200                        debug!("curve executor: total duration elapsed, shutting down");
201                        break;
202                    }
203
204                    let target = curve.target_vus_at(elapsed) as usize;
205                    let current = vu_handles.len();
206
207                    match target.cmp(&current) {
208                        std::cmp::Ordering::Greater => {
209                            // Spawn additional VUs
210                            let to_add = target - current;
211                            for _ in 0..to_add {
212                                let vu_token = CancellationToken::new();
213                                let handle = if let Some(ref scenarios) = scenarios {
214                                    // Scenario mode: assign scenario by weighted round-robin
215                                    // over the monotonic vu_counter.
216                                    let scenario = &scenarios[assign_scenario(vu_counter, scenarios)];
217                                    let steps = scenario
218                                        .steps
219                                        .iter()
220                                        .map(|step| StepExec {
221                                            step_name: Arc::clone(&step.name),
222                                            request_config: Arc::clone(&step.request_config),
223                                            plain_headers: Arc::clone(&step.plain_headers),
224                                            request_template: step
225                                                .request_template
226                                                .as_ref()
227                                                .map(Arc::clone),
228                                            response_template: step
229                                                .response_template
230                                                .as_ref()
231                                                .map(Arc::clone),
232                                            captures: step.captures.clone(),
233                                            inline_body: step.inline_body.clone(),
234                                            has_capture_headers: step.has_capture_headers,
235                                        })
236                                        .collect();
237                                    ScenarioVu {
238                                        scenario_name: Arc::clone(&scenario.name),
239                                        steps,
240                                        on_step_failure: scenario.on_step_failure,
241                                        cancellation_token: vu_token.clone(),
242                                        result_tx: tx.clone(),
243                                        budget: None, // curve mode: no budget
244                                        rate_limiter: rate_limiter.as_ref().map(Arc::clone),
245                                    }
246                                    .spawn()
247                                } else {
248                                    Vu {
249                                        request_config: Arc::clone(&request_config),
250                                        plain_headers: Arc::clone(&plain_headers),
251                                        template: template.as_ref().map(Arc::clone),
252                                        scenario_label: None,
253                                        step_label: None,
254                                        cancellation_token: vu_token.clone(),
255                                        result_tx: tx.clone(),
256                                        budget: None,
257                                        rate_limiter: rate_limiter.as_ref().map(Arc::clone),
258                                    }
259                                    .spawn()
260                                };
261                                vu_counter += 1;
262                                vu_handles.push((handle, vu_token));
263                            }
264                        }
265                        std::cmp::Ordering::Less => {
266                            // Cancel excess VUs (cancel from the end of the list)
267                            let to_remove = current - target;
268                            let drain_start = vu_handles.len() - to_remove;
269                            let excess: Vec<_> = vu_handles.drain(drain_start..).collect();
270                            // Cancel all tokens first so all VUs begin exiting simultaneously
271                            for (_, token) in &excess {
272                                token.cancel();
273                            }
274                            // Await sequentially — VUs are already exiting in parallel on the runtime
275                            for (handle, _) in excess {
276                                let _ = handle.await;
277                            }
278                        }
279                        std::cmp::Ordering::Equal => {}
280                    }
281                }
282            }
283        }
284
285        // Cancel all remaining VU tasks — cancel all tokens first, then await.
286        for (_, token) in &vu_handles {
287            token.cancel();
288        }
289        for (handle, _) in vu_handles {
290            let _ = handle.await;
291        }
292
293        // Drop the coordinator's sender so the channel closes once all VU
294        // senders (clones) are also dropped (they are, since tasks ended).
295        drop(tx);
296
297        // Await the drain task to get the fully accumulated result.
298        Ok(drain_handle.await?)
299    }
300}