lmn_core/execution/fixed/
mod.rs1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6use tracing::Instrument;
7use tracing::info_span;
8
9use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
10use crate::http::{RequestConfig, RequestRecord};
11use crate::monitoring::SpanName;
12use crate::request_template::Template;
13use crate::response_template::stats::ResponseStats;
14use crate::vu::Vu;
15
16pub struct FixedExecutorParams {
20 pub request_config: Arc<RequestConfig>,
21 pub template: Option<Arc<Template>>,
22 pub total: usize,
23 pub concurrency: usize,
24 pub cancellation_token: CancellationToken,
25}
26
27pub struct FixedExecutionResult {
31 pub latency: LatencyHistogram,
32 pub status_codes: StatusCodeHistogram,
33 pub total_requests: u64,
34 pub total_failures: u64,
35 pub response_stats: Option<ResponseStats>,
36}
37
38pub struct FixedExecutor {
44 params: FixedExecutorParams,
45}
46
47impl FixedExecutor {
48 pub fn new(params: FixedExecutorParams) -> Self {
49 Self { params }
50 }
51
52 pub async fn execute(self) -> Result<FixedExecutionResult, crate::execution::RunError> {
56 let FixedExecutorParams {
57 request_config,
58 template,
59 total,
60 concurrency,
61 cancellation_token,
62 } = self.params;
63
64 let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
66 request_config
67 .headers
68 .iter()
69 .map(|(k, v)| (k.clone(), v.to_string()))
70 .collect(),
71 );
72
73 let has_tracked_fields = request_config.tracked_fields.is_some();
74
75 async {
76 let budget = Arc::new(AtomicUsize::new(total));
77 let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
78
79 let drain_handle = tokio::spawn(async move {
83 let mut rx = rx;
84 let mut latency = LatencyHistogram::new();
85 let mut status_codes = StatusCodeHistogram::new();
86 let mut total_requests: u64 = 0;
87 let mut total_failures: u64 = 0;
88 let mut response_stats: Option<ResponseStats> = if has_tracked_fields {
89 Some(ResponseStats::new())
90 } else {
91 None
92 };
93
94 while let Some(record) = rx.recv().await {
95 total_requests += 1;
96 if !record.success {
97 total_failures += 1;
98 }
99 latency.record(record.duration);
100 status_codes.record(record.status_code);
101 if let Some(extraction) = record.extraction
102 && let Some(ref mut rs) = response_stats
103 {
104 rs.record(extraction);
105 }
106 }
107
108 FixedExecutionResult {
109 latency,
110 status_codes,
111 total_requests,
112 total_failures,
113 response_stats,
114 }
115 });
116
117 let vu_handles: Vec<_> = (0..concurrency)
120 .map(|_| {
121 Vu {
122 request_config: Arc::clone(&request_config),
123 plain_headers: Arc::clone(&plain_headers),
124 template: template.as_ref().map(Arc::clone),
125 cancellation_token: cancellation_token.clone(),
126 result_tx: tx.clone(),
127 budget: Some(Arc::clone(&budget)),
128 }
129 .spawn()
130 })
131 .collect();
132
133 drop(tx);
136
137 for handle in vu_handles {
139 let _ = handle.await;
140 }
141
142 Ok(drain_handle.await?)
145 }
146 .instrument(info_span!(SpanName::REQUESTS, total))
147 .await
148 }
149}
150
151#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
160 fn struct_shape_fixed_execution_result() {
161 let result = FixedExecutionResult {
162 latency: LatencyHistogram::new(),
163 status_codes: StatusCodeHistogram::new(),
164 total_requests: 10,
165 total_failures: 1,
166 response_stats: None,
167 };
168 assert_eq!(result.total_requests, 10);
169 assert_eq!(result.total_failures, 1);
170 assert!(result.latency.is_empty());
171 assert!(result.response_stats.is_none());
172 }
173
174 #[test]
177 fn struct_shape_fixed_executor_params() {
178 use crate::command::HttpMethod;
179 use crate::http::RequestConfig;
180 use tokio_util::sync::CancellationToken;
181
182 let config = Arc::new(RequestConfig {
183 client: reqwest::Client::new(),
184 host: Arc::new("http://localhost".to_string()),
185 method: HttpMethod::Get,
186 body: Arc::new(None),
187 tracked_fields: None,
188 headers: Arc::new(vec![]),
189 });
190
191 let params = FixedExecutorParams {
192 request_config: Arc::clone(&config),
193 template: None,
194 total: 5,
195 concurrency: 2,
196 cancellation_token: CancellationToken::new(),
197 };
198
199 assert_eq!(params.total, 5);
200 assert_eq!(params.concurrency, 2);
201 assert!(params.template.is_none());
202 }
203
204 #[test]
207 fn fixed_executor_new_stores_params() {
208 use crate::command::HttpMethod;
209 use crate::http::RequestConfig;
210 use tokio_util::sync::CancellationToken;
211
212 let config = Arc::new(RequestConfig {
213 client: reqwest::Client::new(),
214 host: Arc::new("http://localhost".to_string()),
215 method: HttpMethod::Get,
216 body: Arc::new(None),
217 tracked_fields: None,
218 headers: Arc::new(vec![]),
219 });
220
221 let executor = FixedExecutor::new(FixedExecutorParams {
222 request_config: config,
223 template: None,
224 total: 1,
225 concurrency: 1,
226 cancellation_token: CancellationToken::new(),
227 });
228
229 assert_eq!(executor.params.total, 1);
230 assert_eq!(executor.params.concurrency, 1);
231 }
232}