use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::info_span;
use crate::execution::{
DrainMetricsAccumulator, ResolvedScenario, RpsLimiter, ScenarioStats, assign_scenario,
};
use crate::histogram::{LatencyHistogram, StatusCodeHistogram};
use crate::http::{RequestConfig, RequestRecord};
use crate::monitoring::SpanName;
use crate::request_template::Template;
use crate::response_template::stats::ResponseStats;
use crate::vu::Vu;
use crate::vu::scenario::{ScenarioVu, StepExec};
pub struct FixedExecutorParams {
pub request_config: Arc<RequestConfig>,
pub template: Option<Arc<Template>>,
pub total: usize,
pub concurrency: usize,
pub rps: Option<usize>,
pub cancellation_token: CancellationToken,
pub scenarios: Option<Vec<ResolvedScenario>>,
}
pub struct FixedExecutionResult {
pub latency: LatencyHistogram,
pub status_codes: StatusCodeHistogram,
pub total_requests: u64,
pub total_failures: u64,
pub total_skipped: u64,
pub response_stats: Option<ResponseStats>,
pub scenario_stats: Option<Vec<ScenarioStats>>,
}
pub struct FixedExecutor {
params: FixedExecutorParams,
}
impl FixedExecutor {
pub fn new(params: FixedExecutorParams) -> Self {
Self { params }
}
pub async fn execute(self) -> Result<FixedExecutionResult, crate::execution::RunError> {
let FixedExecutorParams {
request_config,
template,
total,
concurrency,
rps,
cancellation_token,
scenarios,
} = self.params;
let plain_headers: Arc<Vec<(String, String)>> = Arc::new(
request_config
.headers
.iter()
.map(|(k, v)| (k.clone(), v.to_string()))
.collect(),
);
let has_tracked_fields = if let Some(ref sc) = scenarios {
sc.iter()
.flat_map(|s| s.steps.iter())
.any(|step| step.response_template.is_some())
} else {
request_config.tracked_fields.is_some()
};
let rate_limiter = rps.and_then(RpsLimiter::new);
async {
let budget = Arc::new(AtomicUsize::new(total));
let (tx, rx) = mpsc::unbounded_channel::<RequestRecord>();
let drain_handle = tokio::spawn(async move {
let mut rx = rx;
let mut acc = DrainMetricsAccumulator::new(has_tracked_fields);
while let Some(record) = rx.recv().await {
acc.record_request(&record);
acc.record_extraction(record.extraction);
}
let scenario_stats = acc.finalize_scenario_stats();
FixedExecutionResult {
latency: acc.latency,
status_codes: acc.status_codes,
total_requests: acc.total_requests,
total_failures: acc.total_failures,
total_skipped: acc.total_skipped,
response_stats: acc.response_stats,
scenario_stats,
}
});
let vu_handles: Vec<_> = if let Some(ref scenarios) = scenarios {
(0..concurrency)
.map(|vu_idx| {
let scenario = &scenarios[assign_scenario(vu_idx, scenarios)];
let steps = scenario
.steps
.iter()
.map(|step| StepExec {
step_name: Arc::clone(&step.name),
request_config: Arc::clone(&step.request_config),
plain_headers: Arc::clone(&step.plain_headers),
request_template: step.request_template.as_ref().map(Arc::clone),
response_template: step.response_template.as_ref().map(Arc::clone),
captures: step.captures.clone(),
inline_body: step.inline_body.clone(),
has_capture_headers: step.has_capture_headers,
})
.collect();
ScenarioVu {
scenario_name: Arc::clone(&scenario.name),
steps,
on_step_failure: scenario.on_step_failure,
cancellation_token: cancellation_token.clone(),
result_tx: tx.clone(),
budget: Some(Arc::clone(&budget)),
rate_limiter: rate_limiter.as_ref().map(Arc::clone),
}
.spawn()
})
.collect()
} else {
(0..concurrency)
.map(|_| {
Vu {
request_config: Arc::clone(&request_config),
plain_headers: Arc::clone(&plain_headers),
template: template.as_ref().map(Arc::clone),
scenario_label: None,
step_label: None,
cancellation_token: cancellation_token.clone(),
result_tx: tx.clone(),
budget: Some(Arc::clone(&budget)),
rate_limiter: rate_limiter.as_ref().map(Arc::clone),
}
.spawn()
})
.collect()
};
drop(tx);
for handle in vu_handles {
let _ = handle.await;
}
Ok(drain_handle.await?)
}
.instrument(info_span!(SpanName::REQUESTS, total))
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn struct_shape_fixed_execution_result() {
let result = FixedExecutionResult {
latency: LatencyHistogram::new(),
status_codes: StatusCodeHistogram::new(),
total_requests: 10,
total_failures: 1,
total_skipped: 0,
response_stats: None,
scenario_stats: None,
};
assert_eq!(result.total_requests, 10);
assert_eq!(result.total_failures, 1);
assert!(result.latency.is_empty());
assert!(result.response_stats.is_none());
assert!(result.scenario_stats.is_none());
}
#[test]
fn struct_shape_fixed_executor_params() {
use crate::command::HttpMethod;
use crate::http::RequestConfig;
use tokio_util::sync::CancellationToken;
let config = Arc::new(RequestConfig {
client: reqwest::Client::new(),
host: Arc::new("http://localhost".to_string()),
method: HttpMethod::Get,
body: Arc::new(None),
tracked_fields: None,
headers: Arc::new(vec![]),
});
let params = FixedExecutorParams {
request_config: Arc::clone(&config),
template: None,
total: 5,
concurrency: 2,
rps: None,
cancellation_token: CancellationToken::new(),
scenarios: None,
};
assert_eq!(params.total, 5);
assert_eq!(params.concurrency, 2);
assert!(params.template.is_none());
assert!(params.scenarios.is_none());
assert!(params.rps.is_none());
}
#[test]
fn fixed_executor_new_stores_params() {
use crate::command::HttpMethod;
use crate::http::RequestConfig;
use tokio_util::sync::CancellationToken;
let config = Arc::new(RequestConfig {
client: reqwest::Client::new(),
host: Arc::new("http://localhost".to_string()),
method: HttpMethod::Get,
body: Arc::new(None),
tracked_fields: None,
headers: Arc::new(vec![]),
});
let executor = FixedExecutor::new(FixedExecutorParams {
request_config: config,
template: None,
total: 1,
concurrency: 1,
rps: Some(50),
cancellation_token: CancellationToken::new(),
scenarios: None,
});
assert_eq!(executor.params.total, 1);
assert_eq!(executor.params.concurrency, 1);
assert_eq!(executor.params.rps, Some(50));
}
}