1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3
4use tokio::sync::mpsc;
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use crate::http::{Request, RequestConfig, RequestRecord};
9use crate::request_template::Template;
10
11pub struct Vu {
19 pub request_config: Arc<RequestConfig>,
20 pub plain_headers: Arc<Vec<(String, String)>>,
22 pub template: Option<Arc<Template>>,
23 pub cancellation_token: CancellationToken,
24 pub result_tx: mpsc::UnboundedSender<RequestRecord>,
25 pub budget: Option<Arc<AtomicUsize>>,
31}
32
33impl Vu {
34 fn claim_budget(&self) -> bool {
46 match &self.budget {
47 None => true,
48 Some(b) => b
49 .fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
50 if n > 0 { Some(n - 1) } else { None }
51 })
52 .is_ok(),
53 }
54 }
55
56 pub fn spawn(self) -> JoinHandle<()> {
58 tokio::spawn(async move {
59 loop {
60 if !self.claim_budget() {
61 break;
62 }
63
64 let body = match self.template.as_ref().map(|t| t.generate_one()) {
65 None => None,
66 Some(Ok(s)) => Some(s),
67 Some(Err(e)) => {
68 tracing::error!(error = %e, "template serialization failed, skipping request");
69 continue;
70 }
71 };
72 let resolved = self.request_config.resolve_body(body);
73
74 let client = self.request_config.client.clone();
75 let url = Arc::clone(&self.request_config.host);
76 let method = self.request_config.method;
77 let tracked_fields = self.request_config.tracked_fields.clone();
78 let capture_body = tracked_fields.is_some();
79
80 let headers = if self.plain_headers.is_empty() {
83 None
84 } else {
85 Some(Arc::clone(&self.plain_headers))
86 };
87
88 let result_fut = async {
89 let mut req = Request::new(client, url, method);
90 if let Some((content, content_type)) = resolved {
91 req = req.body(content, content_type);
92 }
93 if capture_body {
94 req = req.read_response();
95 }
96 if let Some(h) = headers {
97 req = req.headers(h);
98 }
99 req.execute().await
100 };
101
102 tokio::select! {
103 _ = self.cancellation_token.cancelled() => break,
104 result = result_fut => {
105 let extraction = if let Some(ref fields) = tracked_fields {
109 if let Some(ref body_str) = result.response_body {
110 if let Ok(val) = serde_json::from_str::<serde_json::Value>(body_str) {
111 Some(crate::response_template::extractor::extract(&val, fields))
112 } else {
113 None
114 }
115 } else {
116 None
117 }
118 } else {
119 None
120 };
121
122 let record = RequestRecord {
123 duration: result.duration,
124 completed_at: result.completed_at,
125 success: result.success,
126 status_code: result.status_code,
127 extraction,
128 };
129
130 if self.result_tx.send(record).is_err() {
131 break;
132 }
133 }
134 }
135 }
136 })
137 }
138}
139
140#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
149 fn struct_shape_vu() {
150 use crate::command::HttpMethod;
151 use crate::http::RequestConfig;
152
153 let config = Arc::new(RequestConfig {
154 client: reqwest::Client::new(),
155 host: Arc::new("http://localhost".to_string()),
156 method: HttpMethod::Get,
157 body: Arc::new(None),
158 tracked_fields: None,
159 headers: Arc::new(vec![]),
160 });
161
162 let (tx, _rx) = mpsc::unbounded_channel();
163 let vu = Vu {
164 request_config: Arc::clone(&config),
165 plain_headers: Arc::new(vec![]),
166 template: None,
167 cancellation_token: CancellationToken::new(),
168 result_tx: tx,
169 budget: None,
170 };
171
172 assert!(vu.template.is_none());
173 assert!(vu.budget.is_none());
174 }
175
176 #[test]
179 fn struct_shape_vu_with_budget() {
180 use crate::command::HttpMethod;
181 use crate::http::RequestConfig;
182
183 let config = Arc::new(RequestConfig {
184 client: reqwest::Client::new(),
185 host: Arc::new("http://localhost".to_string()),
186 method: HttpMethod::Get,
187 body: Arc::new(None),
188 tracked_fields: None,
189 headers: Arc::new(vec![]),
190 });
191
192 let budget = Arc::new(AtomicUsize::new(100));
193 let (tx, _rx) = mpsc::unbounded_channel();
194 let vu = Vu {
195 request_config: Arc::clone(&config),
196 plain_headers: Arc::new(vec![]),
197 template: None,
198 cancellation_token: CancellationToken::new(),
199 result_tx: tx,
200 budget: Some(Arc::clone(&budget)),
201 };
202
203 assert_eq!(vu.budget.unwrap().load(Ordering::Relaxed), 100);
204 }
205
206 #[test]
209 fn budget_fetch_update_prevents_underflow() {
210 let budget = Arc::new(AtomicUsize::new(1));
211
212 let first = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
214 if n > 0 { Some(n - 1) } else { None }
215 });
216 assert!(first.is_ok());
217 assert_eq!(budget.load(Ordering::Relaxed), 0);
218
219 let second = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
221 if n > 0 { Some(n - 1) } else { None }
222 });
223 assert!(second.is_err());
224 assert_eq!(budget.load(Ordering::Relaxed), 0);
225 }
226}