lmn_core/execution/fixed/
mod.rs1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6use tracing::Instrument;
7use tracing::info_span;
8
9use crate::execution::{
10 DrainMetricsAccumulator, ResolvedScenario, RpsLimiter, ScenarioStats, assign_scenario,
11};
12use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
13use crate::http::{RequestConfig, RequestRecord};
14use crate::monitoring::SpanName;
15use crate::request_template::Template;
16use crate::response_template::stats::ResponseStats;
17use crate::vu::Vu;
18use crate::vu::scenario::{ScenarioVu, StepExec};
19
20pub struct FixedExecutorParams {
24 pub request_config: Arc<RequestConfig>,
25 pub template: Option<Arc<Template>>,
26 pub total: usize,
27 pub concurrency: usize,
28 pub rps: Option<usize>,
32 pub cancellation_token: CancellationToken,
33 pub scenarios: Option<Vec<ResolvedScenario>>,
37}
38
39pub struct FixedExecutionResult {
43 pub latency: LatencyHistogram,
44 pub status_codes: StatusCodeHistogram,
45 pub total_requests: u64,
46 pub total_failures: u64,
47 pub total_skipped: u64,
48 pub response_stats: Option<ResponseStats>,
49 pub scenario_stats: Option<Vec<ScenarioStats>>,
50}
51
52pub struct FixedExecutor {
58 params: FixedExecutorParams,
59}
60
61impl FixedExecutor {
62 pub fn new(params: FixedExecutorParams) -> Self {
63 Self { params }
64 }
65
66 pub async fn execute(self) -> Result<FixedExecutionResult, crate::execution::RunError> {
70 let FixedExecutorParams {
71 request_config,
72 template,
73 total,
74 concurrency,
75 rps,
76 cancellation_token,
77 scenarios,
78 } = self.params;
79
80 let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
82 request_config
83 .headers
84 .iter()
85 .map(|(k, v)| (k.clone(), v.to_string()))
86 .collect(),
87 );
88
89 let has_tracked_fields = if let Some(ref sc) = scenarios {
92 sc.iter()
93 .flat_map(|s| s.steps.iter())
94 .any(|step| step.response_template.is_some())
95 } else {
96 request_config.tracked_fields.is_some()
97 };
98
99 let rate_limiter = rps.and_then(RpsLimiter::new);
102
103 async {
104 let budget = Arc::new(AtomicUsize::new(total));
105 let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
106
107 let drain_handle = tokio::spawn(async move {
111 let mut rx = rx;
112 let mut acc = DrainMetricsAccumulator::new(has_tracked_fields);
113
114 while let Some(record) = rx.recv().await {
115 acc.record_request(&record);
116 acc.record_extraction(record.extraction);
117 }
118 let scenario_stats = acc.finalize_scenario_stats();
119
120 FixedExecutionResult {
121 latency: acc.latency,
122 status_codes: acc.status_codes,
123 total_requests: acc.total_requests,
124 total_failures: acc.total_failures,
125 total_skipped: acc.total_skipped,
126 response_stats: acc.response_stats,
127 scenario_stats,
128 }
129 });
130
131 let vu_handles: Vec<_> = if let Some(ref scenarios) = scenarios {
132 (0..concurrency)
136 .map(|vu_idx| {
137 let scenario = &scenarios[assign_scenario(vu_idx, scenarios)];
138 let steps = scenario
139 .steps
140 .iter()
141 .map(|step| StepExec {
142 step_name: Arc::clone(&step.name),
143 request_config: Arc::clone(&step.request_config),
144 plain_headers: Arc::clone(&step.plain_headers),
145 request_template: step.request_template.as_ref().map(Arc::clone),
146 response_template: step.response_template.as_ref().map(Arc::clone),
147 captures: step.captures.clone(),
148 inline_body: step.inline_body.clone(),
149 has_capture_headers: step.has_capture_headers,
150 })
151 .collect();
152 ScenarioVu {
153 scenario_name: Arc::clone(&scenario.name),
154 steps,
155 on_step_failure: scenario.on_step_failure,
156 cancellation_token: cancellation_token.clone(),
157 result_tx: tx.clone(),
158 budget: Some(Arc::clone(&budget)),
159 rate_limiter: rate_limiter.as_ref().map(Arc::clone),
160 }
161 .spawn()
162 })
163 .collect()
164 } else {
165 (0..concurrency)
167 .map(|_| {
168 Vu {
169 request_config: Arc::clone(&request_config),
170 plain_headers: Arc::clone(&plain_headers),
171 template: template.as_ref().map(Arc::clone),
172 scenario_label: None,
173 step_label: None,
174 cancellation_token: cancellation_token.clone(),
175 result_tx: tx.clone(),
176 budget: Some(Arc::clone(&budget)),
177 rate_limiter: rate_limiter.as_ref().map(Arc::clone),
178 }
179 .spawn()
180 })
181 .collect()
182 };
183
184 drop(tx);
187
188 for handle in vu_handles {
190 let _ = handle.await;
191 }
192
193 Ok(drain_handle.await?)
196 }
197 .instrument(info_span!(SpanName::REQUESTS, total))
198 .await
199 }
200}
201
202#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
211 fn struct_shape_fixed_execution_result() {
212 let result = FixedExecutionResult {
213 latency: LatencyHistogram::new(),
214 status_codes: StatusCodeHistogram::new(),
215 total_requests: 10,
216 total_failures: 1,
217 total_skipped: 0,
218 response_stats: None,
219 scenario_stats: None,
220 };
221 assert_eq!(result.total_requests, 10);
222 assert_eq!(result.total_failures, 1);
223 assert!(result.latency.is_empty());
224 assert!(result.response_stats.is_none());
225 assert!(result.scenario_stats.is_none());
226 }
227
228 #[test]
231 fn struct_shape_fixed_executor_params() {
232 use crate::command::HttpMethod;
233 use crate::http::RequestConfig;
234 use tokio_util::sync::CancellationToken;
235
236 let config = Arc::new(RequestConfig {
237 client: reqwest::Client::new(),
238 host: Arc::new("http://localhost".to_string()),
239 method: HttpMethod::Get,
240 body: Arc::new(None),
241 tracked_fields: None,
242 headers: Arc::new(vec![]),
243 });
244
245 let params = FixedExecutorParams {
246 request_config: Arc::clone(&config),
247 template: None,
248 total: 5,
249 concurrency: 2,
250 rps: None,
251 cancellation_token: CancellationToken::new(),
252 scenarios: None,
253 };
254
255 assert_eq!(params.total, 5);
256 assert_eq!(params.concurrency, 2);
257 assert!(params.template.is_none());
258 assert!(params.scenarios.is_none());
259 assert!(params.rps.is_none());
260 }
261
262 #[test]
265 fn fixed_executor_new_stores_params() {
266 use crate::command::HttpMethod;
267 use crate::http::RequestConfig;
268 use tokio_util::sync::CancellationToken;
269
270 let config = Arc::new(RequestConfig {
271 client: reqwest::Client::new(),
272 host: Arc::new("http://localhost".to_string()),
273 method: HttpMethod::Get,
274 body: Arc::new(None),
275 tracked_fields: None,
276 headers: Arc::new(vec![]),
277 });
278
279 let executor = FixedExecutor::new(FixedExecutorParams {
280 request_config: config,
281 template: None,
282 total: 1,
283 concurrency: 1,
284 rps: Some(50),
285 cancellation_token: CancellationToken::new(),
286 scenarios: None,
287 });
288
289 assert_eq!(executor.params.total, 1);
290 assert_eq!(executor.params.concurrency, 1);
291 assert_eq!(executor.params.rps, Some(50));
292 }
293}