lmn_core/execution/curve/mod.rs
1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use tokio::sync::mpsc;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7use tracing::debug;
8
9use crate::execution::{
10 DrainMetricsAccumulator, ResolvedScenario, RpsLimiter, ScenarioStats, StageStats,
11 assign_scenario,
12};
13use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
14use crate::http::{RequestConfig, RequestRecord};
15use crate::load_curve::{LoadCurve, Stage};
16use crate::request_template::Template;
17use crate::response_template::stats::ResponseStats;
18use crate::vu::Vu;
19use crate::vu::scenario::{ScenarioVu, StepExec};
20
21// ── CurveExecutorParams ───────────────────────────────────────────────────────
22
23/// Parameters for constructing a `CurveExecutor`.
24pub struct CurveExecutorParams {
25 pub curve: LoadCurve,
26 pub request_config: Arc<RequestConfig>,
27 pub template: Option<Arc<Template>>,
28 pub cancellation_token: CancellationToken,
29 /// Optional upper bound on aggregate requests-per-second across all VUs.
30 /// `None` means no rate limit. Values that overflow `u32` or equal zero
31 /// are treated as unset.
32 pub rps: Option<usize>,
33 /// When present, the executor spawns `ScenarioVu` instances instead of
34 /// plain `Vu` instances. VUs are assigned via weighted round-robin using a
35 /// monotonically increasing counter. Budget is always `None` in curve mode.
36 pub scenarios: Option<Vec<ResolvedScenario>>,
37}
38
39// ── CurveExecutionResult ──────────────────────────────────────────────────────
40
41/// Result returned by `CurveExecutor::execute`.
42pub struct CurveExecutionResult {
43 pub latency: LatencyHistogram,
44 pub status_codes: StatusCodeHistogram,
45 pub total_requests: u64,
46 pub total_failures: u64,
47 pub total_skipped: u64,
48 pub response_stats: Option<ResponseStats>,
49 pub stage_stats: Vec<StageStats>,
50 pub scenario_stats: Option<Vec<ScenarioStats>>,
51}
52
53// ── stage_index_at ────────────────────────────────────────────────────────────
54
55/// Returns the 0-based stage index for a given elapsed duration.
56fn stage_index_at(stages: &[Stage], elapsed: Duration) -> usize {
57 let mut offset = Duration::ZERO;
58 for (i, stage) in stages.iter().enumerate() {
59 offset += stage.duration;
60 if elapsed < offset {
61 return i;
62 }
63 }
64 stages.len().saturating_sub(1)
65}
66
67// ── CurveExecutor ─────────────────────────────────────────────────────────────
68
69/// Executes a load test driven by a `LoadCurve`, dynamically scaling VUs.
70pub struct CurveExecutor {
71 params: CurveExecutorParams,
72}
73
74impl CurveExecutor {
75 pub fn new(params: CurveExecutorParams) -> Self {
76 Self { params }
77 }
78
79 /// Runs the load curve, spawning and cancelling VU tasks as the curve
80 /// dictates. Returns a `CurveExecutionResult` when the curve completes or a
81 /// cancellation signal is received.
82 pub async fn execute(self) -> Result<CurveExecutionResult, crate::execution::RunError> {
83 let CurveExecutorParams {
84 curve,
85 request_config,
86 template,
87 cancellation_token,
88 rps,
89 scenarios,
90 } = self.params;
91
92 // Build the shared RPS limiter once. Every VU spawned during the curve
93 // receives the same `Arc` clone so the cap is genuinely aggregate.
94 let rate_limiter = rps.and_then(RpsLimiter::new);
95
96 let total_duration = curve.total_duration();
97 let run_start = Instant::now();
98
99 // Pre-convert headers once before spawning any VUs to avoid per-VU allocation.
100 let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
101 request_config
102 .headers
103 .iter()
104 .map(|(k, v)| (k.clone(), v.to_string()))
105 .collect(),
106 );
107
108 // `has_tracked_fields` is true when the single-request path uses a
109 // response template, OR when any step across any scenario does.
110 let has_tracked_fields = if let Some(ref sc) = scenarios {
111 sc.iter()
112 .flat_map(|s| s.steps.iter())
113 .any(|step| step.response_template.is_some())
114 } else {
115 request_config.tracked_fields.is_some()
116 };
117
118 let n_stages = curve.stages.len();
119
120 // Clone the stages vec so the drain task can own it without holding onto `curve`.
121 let drain_stages = curve.stages.clone();
122
123 // Unbounded channel; VUs push results as they complete without risk of blocking.
124 let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
125
126 // Spawn a dedicated drain task that owns the receiver and all accumulator
127 // state. It attributes each record to the correct stage via `completed_at`.
128 let drain_handle = tokio::spawn(async move {
129 let mut rx = rx;
130 let mut acc = DrainMetricsAccumulator::new(has_tracked_fields);
131
132 // Pre-allocate per-stage accumulators.
133 let mut stage_stats: Vec<StageStats> = (0..n_stages)
134 .map(|_| StageStats {
135 latency: LatencyHistogram::new(),
136 status_codes: StatusCodeHistogram::new(),
137 total_requests: 0,
138 total_failures: 0,
139 })
140 .collect();
141
142 while let Some(record) = rx.recv().await {
143 acc.record_request(&record);
144
145 // Determine which stage this record belongs to using its
146 // wall-clock completion time relative to the run start.
147 let elapsed = record
148 .completed_at
149 .checked_duration_since(run_start)
150 .unwrap_or_default();
151 let stage_idx = stage_index_at(&drain_stages, elapsed);
152
153 stage_stats[stage_idx].total_requests += 1;
154 if !record.skipped {
155 stage_stats[stage_idx].latency.record(record.duration);
156 stage_stats[stage_idx]
157 .status_codes
158 .record(record.status_code);
159 if !record.success {
160 stage_stats[stage_idx].total_failures += 1;
161 }
162 }
163
164 acc.record_extraction(record.extraction);
165 }
166 let scenario_stats = acc.finalize_scenario_stats();
167
168 CurveExecutionResult {
169 latency: acc.latency,
170 status_codes: acc.status_codes,
171 total_requests: acc.total_requests,
172 total_failures: acc.total_failures,
173 total_skipped: acc.total_skipped,
174 response_stats: acc.response_stats,
175 stage_stats,
176 scenario_stats,
177 }
178 });
179
180 // Track active VU handles and their per-VU cancellation tokens.
181 let mut vu_handles: Vec<(JoinHandle<()>, CancellationToken)> = Vec::new();
182
183 // Monotonically increasing counter used for deterministic scenario
184 // assignment. Each spawned VU gets the next index so the weighted
185 // round-robin assignment is stable regardless of despawn/respawn.
186 let mut vu_counter: usize = 0;
187
188 let mut ticker = tokio::time::interval(tokio::time::Duration::from_millis(100));
189
190 loop {
191 tokio::select! {
192 _ = cancellation_token.cancelled() => {
193 debug!("curve executor: parent cancellation received");
194 break;
195 }
196 _ = ticker.tick() => {
197 let elapsed = run_start.elapsed();
198
199 if elapsed >= total_duration {
200 debug!("curve executor: total duration elapsed, shutting down");
201 break;
202 }
203
204 let target = curve.target_vus_at(elapsed) as usize;
205 let current = vu_handles.len();
206
207 match target.cmp(¤t) {
208 std::cmp::Ordering::Greater => {
209 // Spawn additional VUs
210 let to_add = target - current;
211 for _ in 0..to_add {
212 let vu_token = CancellationToken::new();
213 let handle = if let Some(ref scenarios) = scenarios {
214 // Scenario mode: assign scenario by weighted round-robin
215 // over the monotonic vu_counter.
216 let scenario = &scenarios[assign_scenario(vu_counter, scenarios)];
217 let steps = scenario
218 .steps
219 .iter()
220 .map(|step| StepExec {
221 step_name: Arc::clone(&step.name),
222 request_config: Arc::clone(&step.request_config),
223 plain_headers: Arc::clone(&step.plain_headers),
224 request_template: step
225 .request_template
226 .as_ref()
227 .map(Arc::clone),
228 response_template: step
229 .response_template
230 .as_ref()
231 .map(Arc::clone),
232 captures: step.captures.clone(),
233 inline_body: step.inline_body.clone(),
234 has_capture_headers: step.has_capture_headers,
235 })
236 .collect();
237 ScenarioVu {
238 scenario_name: Arc::clone(&scenario.name),
239 steps,
240 on_step_failure: scenario.on_step_failure,
241 cancellation_token: vu_token.clone(),
242 result_tx: tx.clone(),
243 budget: None, // curve mode: no budget
244 rate_limiter: rate_limiter.as_ref().map(Arc::clone),
245 }
246 .spawn()
247 } else {
248 Vu {
249 request_config: Arc::clone(&request_config),
250 plain_headers: Arc::clone(&plain_headers),
251 template: template.as_ref().map(Arc::clone),
252 scenario_label: None,
253 step_label: None,
254 cancellation_token: vu_token.clone(),
255 result_tx: tx.clone(),
256 budget: None,
257 rate_limiter: rate_limiter.as_ref().map(Arc::clone),
258 }
259 .spawn()
260 };
261 vu_counter += 1;
262 vu_handles.push((handle, vu_token));
263 }
264 }
265 std::cmp::Ordering::Less => {
266 // Cancel excess VUs (cancel from the end of the list)
267 let to_remove = current - target;
268 let drain_start = vu_handles.len() - to_remove;
269 let excess: Vec<_> = vu_handles.drain(drain_start..).collect();
270 // Cancel all tokens first so all VUs begin exiting simultaneously
271 for (_, token) in &excess {
272 token.cancel();
273 }
274 // Await sequentially — VUs are already exiting in parallel on the runtime
275 for (handle, _) in excess {
276 let _ = handle.await;
277 }
278 }
279 std::cmp::Ordering::Equal => {}
280 }
281 }
282 }
283 }
284
285 // Cancel all remaining VU tasks — cancel all tokens first, then await.
286 for (_, token) in &vu_handles {
287 token.cancel();
288 }
289 for (handle, _) in vu_handles {
290 let _ = handle.await;
291 }
292
293 // Drop the coordinator's sender so the channel closes once all VU
294 // senders (clones) are also dropped (they are, since tasks ended).
295 drop(tx);
296
297 // Await the drain task to get the fully accumulated result.
298 Ok(drain_handle.await?)
299 }
300}