Skip to main content

lmn_core/execution/fixed/
mod.rs

1use std::sync::Arc;
2use std::sync::atomic::AtomicUsize;
3
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6use tracing::Instrument;
7use tracing::info_span;
8
9use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
10use crate::http::{RequestConfig, RequestRecord};
11use crate::monitoring::SpanName;
12use crate::request_template::Template;
13use crate::response_template::stats::ResponseStats;
14use crate::vu::Vu;
15
16// ── FixedExecutorParams ───────────────────────────────────────────────────────
17
18/// Parameters for constructing a `FixedExecutor`.
19pub struct FixedExecutorParams {
20    pub request_config: Arc<RequestConfig>,
21    pub template: Option<Arc<Template>>,
22    pub total: usize,
23    pub concurrency: usize,
24    pub cancellation_token: CancellationToken,
25}
26
27// ── FixedExecutionResult ──────────────────────────────────────────────────────
28
29/// Result returned by `FixedExecutor::execute`.
30pub struct FixedExecutionResult {
31    pub latency: LatencyHistogram,
32    pub status_codes: StatusCodeHistogram,
33    pub total_requests: u64,
34    pub total_failures: u64,
35    pub response_stats: Option<ResponseStats>,
36}
37
38// ── FixedExecutor ─────────────────────────────────────────────────────────────
39
40/// Executes a fixed-count load test using a worker-pool model: spawns exactly
41/// `concurrency` long-lived VU tasks that share an atomic request budget and
42/// self-terminate when the budget is exhausted.
43pub struct FixedExecutor {
44    params: FixedExecutorParams,
45}
46
47impl FixedExecutor {
48    pub fn new(params: FixedExecutorParams) -> Self {
49        Self { params }
50    }
51
52    /// Runs the fixed load test. Spawns `concurrency` VU tasks sharing a budget
53    /// of `total` requests. Returns a `FixedExecutionResult` when all requests
54    /// complete or a cancellation signal is received.
55    pub async fn execute(self) -> Result<FixedExecutionResult, crate::execution::RunError> {
56        let FixedExecutorParams {
57            request_config,
58            template,
59            total,
60            concurrency,
61            cancellation_token,
62        } = self.params;
63
64        // Pre-convert headers once before spawning VUs to avoid per-request allocation.
65        let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
66            request_config
67                .headers
68                .iter()
69                .map(|(k, v)| (k.clone(), v.to_string()))
70                .collect(),
71        );
72
73        let has_tracked_fields = request_config.tracked_fields.is_some();
74
75        async {
76            let budget = Arc::new(AtomicUsize::new(total));
77            let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
78
79            // Spawn a dedicated drain task that owns the receiver and all
80            // accumulator state. It returns a `FixedExecutionResult` once the
81            // channel closes (all VU senders dropped).
82            let drain_handle = tokio::spawn(async move {
83                let mut rx = rx;
84                let mut latency = LatencyHistogram::new();
85                let mut status_codes = StatusCodeHistogram::new();
86                let mut total_requests: u64 = 0;
87                let mut total_failures: u64 = 0;
88                let mut response_stats: Option<ResponseStats> = if has_tracked_fields {
89                    Some(ResponseStats::new())
90                } else {
91                    None
92                };
93
94                while let Some(record) = rx.recv().await {
95                    total_requests += 1;
96                    if !record.success {
97                        total_failures += 1;
98                    }
99                    latency.record(record.duration);
100                    status_codes.record(record.status_code);
101                    if let Some(extraction) = record.extraction
102                        && let Some(ref mut rs) = response_stats
103                    {
104                        rs.record(extraction);
105                    }
106                }
107
108                FixedExecutionResult {
109                    latency,
110                    status_codes,
111                    total_requests,
112                    total_failures,
113                    response_stats,
114                }
115            });
116
117            // Spawn exactly `concurrency` VU tasks. Each claims requests from the
118            // shared budget and self-terminates when the budget is exhausted.
119            let vu_handles: Vec<_> = (0..concurrency)
120                .map(|_| {
121                    Vu {
122                        request_config: Arc::clone(&request_config),
123                        plain_headers: Arc::clone(&plain_headers),
124                        template: template.as_ref().map(Arc::clone),
125                        cancellation_token: cancellation_token.clone(),
126                        result_tx: tx.clone(),
127                        budget: Some(Arc::clone(&budget)),
128                    }
129                    .spawn()
130                })
131                .collect();
132
133            // Drop the coordinator's sender so the channel closes once all VU
134            // senders are also dropped (they are, once each VU task exits).
135            drop(tx);
136
137            // Await all VU tasks to ensure they have finished sending.
138            for handle in vu_handles {
139                let _ = handle.await;
140            }
141
142            // All VU senders are now dropped — channel is closed. Await the
143            // drain task to get the accumulated result.
144            Ok(drain_handle.await?)
145        }
146        .instrument(info_span!(SpanName::REQUESTS, total))
147        .await
148    }
149}
150
151// ── Tests ─────────────────────────────────────────────────────────────────────
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    // ── struct_shape_fixed_execution_result ───────────────────────────────────
158
159    #[test]
160    fn struct_shape_fixed_execution_result() {
161        let result = FixedExecutionResult {
162            latency: LatencyHistogram::new(),
163            status_codes: StatusCodeHistogram::new(),
164            total_requests: 10,
165            total_failures: 1,
166            response_stats: None,
167        };
168        assert_eq!(result.total_requests, 10);
169        assert_eq!(result.total_failures, 1);
170        assert!(result.latency.is_empty());
171        assert!(result.response_stats.is_none());
172    }
173
174    // ── struct_shape_fixed_executor_params ────────────────────────────────────
175
176    #[test]
177    fn struct_shape_fixed_executor_params() {
178        use crate::command::HttpMethod;
179        use crate::http::RequestConfig;
180        use tokio_util::sync::CancellationToken;
181
182        let config = Arc::new(RequestConfig {
183            client: reqwest::Client::new(),
184            host: Arc::new("http://localhost".to_string()),
185            method: HttpMethod::Get,
186            body: Arc::new(None),
187            tracked_fields: None,
188            headers: Arc::new(vec![]),
189        });
190
191        let params = FixedExecutorParams {
192            request_config: Arc::clone(&config),
193            template: None,
194            total: 5,
195            concurrency: 2,
196            cancellation_token: CancellationToken::new(),
197        };
198
199        assert_eq!(params.total, 5);
200        assert_eq!(params.concurrency, 2);
201        assert!(params.template.is_none());
202    }
203
204    // ── fixed_executor_new_stores_params ─────────────────────────────────────
205
206    #[test]
207    fn fixed_executor_new_stores_params() {
208        use crate::command::HttpMethod;
209        use crate::http::RequestConfig;
210        use tokio_util::sync::CancellationToken;
211
212        let config = Arc::new(RequestConfig {
213            client: reqwest::Client::new(),
214            host: Arc::new("http://localhost".to_string()),
215            method: HttpMethod::Get,
216            body: Arc::new(None),
217            tracked_fields: None,
218            headers: Arc::new(vec![]),
219        });
220
221        let executor = FixedExecutor::new(FixedExecutorParams {
222            request_config: config,
223            template: None,
224            total: 1,
225            concurrency: 1,
226            cancellation_token: CancellationToken::new(),
227        });
228
229        assert_eq!(executor.params.total, 1);
230        assert_eq!(executor.params.concurrency, 1);
231    }
232}