pub mod scenario;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::execution::RpsLimiter;
use crate::http::{Request, RequestConfig, RequestRecord};
use crate::request_template::Template;
pub struct Vu {
pub request_config: Arc<RequestConfig>,
pub plain_headers: Arc<Vec<(String, String)>>,
pub template: Option<Arc<Template>>,
pub scenario_label: Option<Arc<str>>,
pub step_label: Option<Arc<str>>,
pub cancellation_token: CancellationToken,
pub result_tx: mpsc::UnboundedSender<RequestRecord>,
pub budget: Option<Arc<AtomicUsize>>,
pub rate_limiter: Option<Arc<RpsLimiter>>,
}
impl Vu {
fn claim_budget(&self) -> bool {
match &self.budget {
None => true,
Some(b) => b
.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
if n > 0 { Some(n - 1) } else { None }
})
.is_ok(),
}
}
pub fn spawn(self) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
if !self.claim_budget() {
break;
}
if let Some(ref rl) = self.rate_limiter {
tokio::select! {
_ = self.cancellation_token.cancelled() => break,
_ = rl.acquire() => {}
}
}
let body = match self.template.as_ref().map(|t| t.generate_one()) {
None => None,
Some(Ok(s)) => Some(s),
Some(Err(e)) => {
eprintln!("error: template serialization failed, skipping request: {e}");
continue;
}
};
let resolved = self.request_config.resolve_body(body);
let client = self.request_config.client.clone();
let url = Arc::clone(&self.request_config.host);
let method = self.request_config.method;
let tracked_fields = self.request_config.tracked_fields.clone();
let capture_body = tracked_fields.is_some();
let headers = if self.plain_headers.is_empty() {
None
} else {
Some(Arc::clone(&self.plain_headers))
};
let result_fut = async {
let mut req = Request::new(client, url, method);
if let Some((content, content_type)) = resolved {
req = req.body(content, content_type);
}
if capture_body {
req = req.read_response();
}
if let Some(h) = headers {
req = req.headers(h);
}
req.execute().await
};
tokio::select! {
_ = self.cancellation_token.cancelled() => break,
result = result_fut => {
let extraction = if let Some(ref fields) = tracked_fields {
if let Some(ref body_str) = result.response_body {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(body_str) {
Some(crate::response_template::extractor::extract(&val, fields))
} else {
None
}
} else {
None
}
} else {
None
};
let record = RequestRecord {
duration: result.duration,
completed_at: result.completed_at,
success: result.success,
status_code: result.status_code,
extraction,
scenario: self.scenario_label.as_ref().map(Arc::clone),
step: self.step_label.as_ref().map(Arc::clone),
skipped: false,
};
if self.result_tx.send(record).is_err() {
break;
}
}
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn struct_shape_vu() {
use crate::command::HttpMethod;
use crate::http::RequestConfig;
let config = Arc::new(RequestConfig {
client: reqwest::Client::new(),
host: Arc::new("http://localhost".to_string()),
method: HttpMethod::Get,
body: Arc::new(None),
tracked_fields: None,
headers: Arc::new(vec![]),
});
let (tx, _rx) = mpsc::unbounded_channel();
let vu = Vu {
request_config: Arc::clone(&config),
plain_headers: Arc::new(vec![]),
template: None,
scenario_label: None,
step_label: None,
cancellation_token: CancellationToken::new(),
result_tx: tx,
budget: None,
rate_limiter: None,
};
assert!(vu.template.is_none());
assert!(vu.scenario_label.is_none());
assert!(vu.step_label.is_none());
assert!(vu.budget.is_none());
}
#[test]
fn struct_shape_vu_with_budget() {
use crate::command::HttpMethod;
use crate::http::RequestConfig;
let config = Arc::new(RequestConfig {
client: reqwest::Client::new(),
host: Arc::new("http://localhost".to_string()),
method: HttpMethod::Get,
body: Arc::new(None),
tracked_fields: None,
headers: Arc::new(vec![]),
});
let budget = Arc::new(AtomicUsize::new(100));
let (tx, _rx) = mpsc::unbounded_channel();
let vu = Vu {
request_config: Arc::clone(&config),
plain_headers: Arc::new(vec![]),
template: None,
scenario_label: None,
step_label: None,
cancellation_token: CancellationToken::new(),
result_tx: tx,
budget: Some(Arc::clone(&budget)),
rate_limiter: None,
};
assert_eq!(vu.budget.unwrap().load(Ordering::Relaxed), 100);
}
#[test]
fn budget_fetch_update_prevents_underflow() {
let budget = Arc::new(AtomicUsize::new(1));
let first = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
if n > 0 { Some(n - 1) } else { None }
});
assert!(first.is_ok());
assert_eq!(budget.load(Ordering::Relaxed), 0);
let second = budget.fetch_update(Ordering::Acquire, Ordering::Relaxed, |n| {
if n > 0 { Some(n - 1) } else { None }
});
assert!(second.is_err());
assert_eq!(budget.load(Ordering::Relaxed), 0);
}
}