Skip to main content

lmn_core/vu/
mod.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use tokio::sync::mpsc;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::http::{Request, RequestConfig, RequestRecord};
9use crate::request_template::Template;
10
11// ── Vu ────────────────────────────────────────────────────────────────────────
12
13/// A virtual user. Owns its configuration and drives its own execution loop.
14///
15/// Call `spawn` to launch the VU as a Tokio task. The VU loops, making requests
16/// as fast as it can until its budget is exhausted (fixed mode) or its
17/// cancellation token fires (curve mode).
18pub struct Vu {
19    pub request_config: Arc<RequestConfig>,
20    /// Pre-converted header pairs shared across all VUs — avoids per-request allocation.
21    pub plain_headers: Arc<Vec<(String, String)>>,
22    pub template: Option<Arc<Template>>,
23    pub cancellation_token: CancellationToken,
24    pub result_tx: mpsc::UnboundedSender<RequestRecord>,
25    /// Optional request budget shared across all VUs in fixed-count mode.
26    ///
27    /// Each VU atomically claims one unit before dispatching a request and stops
28    /// when the counter reaches zero. `None` means run until the cancellation
29    /// token is triggered (curve mode).
30    pub budget: Option<Arc<AtomicUsize>>,
31}
32
33impl Vu {
34    /// Attempts to claim one unit from the shared request budget.
35    ///
36    /// Returns `true` if the VU should proceed with the next request, `false`
37    /// if the budget is exhausted and the VU should stop.
38    ///
39    /// In curve mode (`budget` is `None`) always returns `true` — the VU runs
40    /// until its cancellation token fires.
41    ///
42    /// `fetch_update` is used instead of `fetch_sub` to prevent `usize` underflow:
43    /// the decrement is only committed when the value is still `> 0`, so exactly
44    /// one VU claims the last unit even when many race simultaneously.
45    fn claim_budget(&self) -> bool {
46        match &self.budget {
47            None => true,
48            Some(b) => b
49                .fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
50                    if n > 0 { Some(n - 1) } else { None }
51                })
52                .is_ok(),
53        }
54    }
55
56    /// Consumes the VU and spawns it as a Tokio task.
57    pub fn spawn(self) -> JoinHandle<()> {
58        tokio::spawn(async move {
59            loop {
60                if !self.claim_budget() {
61                    break;
62                }
63
64                let body = match self.template.as_ref().map(|t| t.generate_one()) {
65                    None => None,
66                    Some(Ok(s)) => Some(s),
67                    Some(Err(e)) => {
68                        tracing::error!(error = %e, "template serialization failed, skipping request");
69                        continue;
70                    }
71                };
72                let resolved = self.request_config.resolve_body(body);
73
74                let client = self.request_config.client.clone();
75                let url = Arc::clone(&self.request_config.host);
76                let method = self.request_config.method;
77                let tracked_fields = self.request_config.tracked_fields.clone();
78                let capture_body = tracked_fields.is_some();
79
80                // Only clone the Arc when there are headers — avoids an atomic op on
81                // the common no-headers path.
82                let headers = if self.plain_headers.is_empty() {
83                    None
84                } else {
85                    Some(Arc::clone(&self.plain_headers))
86                };
87
88                let result_fut = async {
89                    let mut req = Request::new(client, url, method);
90                    if let Some((content, content_type)) = resolved {
91                        req = req.body(content, content_type);
92                    }
93                    if capture_body {
94                        req = req.read_response();
95                    }
96                    if let Some(h) = headers {
97                        req = req.headers(h);
98                    }
99                    req.execute().await
100                };
101
102                tokio::select! {
103                    _ = self.cancellation_token.cancelled() => break,
104                    result = result_fut => {
105                        // Perform response body extraction inline in the VU before
106                        // sending over the channel, so raw response bodies never
107                        // transit the channel.
108                        let extraction = if let Some(ref fields) = tracked_fields {
109                            if let Some(ref body_str) = result.response_body {
110                                if let Ok(val) = serde_json::from_str::<serde_json::Value>(body_str) {
111                                    Some(crate::response_template::extractor::extract(&val, fields))
112                                } else {
113                                    None
114                                }
115                            } else {
116                                None
117                            }
118                        } else {
119                            None
120                        };
121
122                        let record = RequestRecord {
123                            duration: result.duration,
124                            completed_at: result.completed_at,
125                            success: result.success,
126                            status_code: result.status_code,
127                            extraction,
128                        };
129
130                        if self.result_tx.send(record).is_err() {
131                            break;
132                        }
133                    }
134                }
135            }
136        })
137    }
138}
139
140// ── Tests ─────────────────────────────────────────────────────────────────────
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    // ── struct_shape_vu ───────────────────────────────────────────────────────
147
148    #[test]
149    fn struct_shape_vu() {
150        use crate::command::HttpMethod;
151        use crate::http::RequestConfig;
152
153        let config = Arc::new(RequestConfig {
154            client: reqwest::Client::new(),
155            host: Arc::new("http://localhost".to_string()),
156            method: HttpMethod::Get,
157            body: Arc::new(None),
158            tracked_fields: None,
159            headers: Arc::new(vec![]),
160        });
161
162        let (tx, _rx) = mpsc::unbounded_channel();
163        let vu = Vu {
164            request_config: Arc::clone(&config),
165            plain_headers: Arc::new(vec![]),
166            template: None,
167            cancellation_token: CancellationToken::new(),
168            result_tx: tx,
169            budget: None,
170        };
171
172        assert!(vu.template.is_none());
173        assert!(vu.budget.is_none());
174    }
175
176    // ── struct_shape_vu_with_budget ───────────────────────────────────────────
177
178    #[test]
179    fn struct_shape_vu_with_budget() {
180        use crate::command::HttpMethod;
181        use crate::http::RequestConfig;
182
183        let config = Arc::new(RequestConfig {
184            client: reqwest::Client::new(),
185            host: Arc::new("http://localhost".to_string()),
186            method: HttpMethod::Get,
187            body: Arc::new(None),
188            tracked_fields: None,
189            headers: Arc::new(vec![]),
190        });
191
192        let budget = Arc::new(AtomicUsize::new(100));
193        let (tx, _rx) = mpsc::unbounded_channel();
194        let vu = Vu {
195            request_config: Arc::clone(&config),
196            plain_headers: Arc::new(vec![]),
197            template: None,
198            cancellation_token: CancellationToken::new(),
199            result_tx: tx,
200            budget: Some(Arc::clone(&budget)),
201        };
202
203        assert_eq!(vu.budget.unwrap().load(Ordering::Relaxed), 100);
204    }
205
206    // ── budget_fetch_update_prevents_underflow ────────────────────────────────
207
208    #[test]
209    fn budget_fetch_update_prevents_underflow() {
210        let budget = Arc::new(AtomicUsize::new(1));
211
212        // First claim succeeds
213        let first = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
214            if n > 0 { Some(n - 1) } else { None }
215        });
216        assert!(first.is_ok());
217        assert_eq!(budget.load(Ordering::Relaxed), 0);
218
219        // Second claim fails — counter does not underflow
220        let second = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
221            if n > 0 { Some(n - 1) } else { None }
222        });
223        assert!(second.is_err());
224        assert_eq!(budget.load(Ordering::Relaxed), 0);
225    }
226}