use crate::executor::context::Context;
use crate::executor::error::ExecutionError;
use crate::transpiler::ir::{HttpMethod, HttpMode, HttpProbe, ProbeResult, Value};
use std::collections::HashMap;
use std::time::Instant;
pub async fn execute(probe: &HttpProbe, ctx: &Context) -> Result<ProbeResult, ExecutionError> {
let path = ctx.interpolate(&probe.path);
let host = &ctx.config.host;
let port = ctx.config.port.unwrap_or(80);
let base_url = format!("http://{host}:{port}");
let url = format!("{base_url}{path}");
match &probe.mode {
HttpMode::Single => execute_single(probe, &url, ctx).await,
HttpMode::Concurrent { clients } => execute_concurrent(probe, &url, ctx, *clients).await,
HttpMode::Keepalive { requests } => execute_keepalive(probe, &url, ctx, *requests).await,
HttpMode::Pipelined { requests } => execute_pipelined(probe, &url, ctx, *requests).await,
HttpMode::Burst { count, window_ms } => {
execute_burst(probe, &url, ctx, *count, *window_ms).await
}
HttpMode::Chunked => execute_single(probe, &url, ctx).await,
}
}
async fn execute_single(
probe: &HttpProbe,
url: &str,
ctx: &Context,
) -> Result<ProbeResult, ExecutionError> {
let start = Instant::now();
let client = reqwest::Client::new();
let mut builder = match probe.method {
HttpMethod::GET => client.get(url),
HttpMethod::POST => client.post(url),
HttpMethod::PUT => client.put(url),
HttpMethod::DELETE => client.delete(url),
HttpMethod::PATCH => client.patch(url),
HttpMethod::HEAD => client.head(url),
HttpMethod::OPTIONS => client.request(reqwest::Method::OPTIONS, url),
};
for (k, v) in &probe.headers {
let v = ctx.interpolate(v);
builder = builder.header(k, v);
}
if let Some(body) = &probe.body {
let body = ctx.interpolate(body);
builder = builder
.header("Content-Type", "application/json")
.body(body);
}
let response = builder
.send()
.await
.map_err(|e| ExecutionError::new(format!("HTTP request failed: {e}")))?;
let duration = start.elapsed();
let status = response.status().as_u16();
let mut fields = HashMap::new();
fields.insert("status".to_string(), Value::Int(status as i64));
fields.insert(
"latency".to_string(),
Value::Int(duration.as_millis() as i64),
);
fields.insert(
"duration".to_string(),
Value::Int(duration.as_millis() as i64),
);
for (key, value) in response.headers() {
if let Ok(v) = value.to_str() {
let header_key = format!("header.{}", key.as_str());
fields.insert(header_key, Value::String(v.to_string()));
}
}
let body = response
.text()
.await
.map_err(|e| ExecutionError::new(format!("failed to read response body: {e}")))?;
fields.insert("body".to_string(), Value::String(body.clone()));
if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
flatten_json("body.json", &json, &mut fields);
}
Ok(ProbeResult {
fields,
raw_stdout: Some(body),
duration_ms: duration.as_millis() as u64,
})
}
async fn execute_concurrent(
probe: &HttpProbe,
url: &str,
ctx: &Context,
clients: u32,
) -> Result<ProbeResult, ExecutionError> {
let start = Instant::now();
let mut handles = Vec::new();
for _ in 0..clients {
let probe = probe.clone();
let url = url.to_string();
let ctx = ctx.clone();
handles.push(tokio::spawn(async move {
execute_single(&probe, &url, &ctx).await
}));
}
let mut all_passed = true;
let mut statuses = Vec::new();
let mut total_fields = HashMap::new();
for handle in handles {
match handle.await {
Ok(Ok(result)) => {
if let Some(Value::Int(s)) = result.fields.get("status") {
statuses.push(*s);
}
}
_ => {
all_passed = false;
}
}
}
let duration = start.elapsed();
total_fields.insert(
"status".to_string(),
Value::Int(statuses.first().copied().unwrap_or(0)),
);
total_fields.insert("all_passed".to_string(), Value::Bool(all_passed));
total_fields.insert(
"response_count".to_string(),
Value::Int(statuses.len() as i64),
);
let all_same = statuses.windows(2).all(|w| w[0] == w[1]);
total_fields.insert("all_status_same".to_string(), Value::Bool(all_same));
if let Some(&first) = statuses.first() {
if all_same {
total_fields.insert("all_status".to_string(), Value::Int(first));
}
}
Ok(ProbeResult {
fields: total_fields,
raw_stdout: None,
duration_ms: duration.as_millis() as u64,
})
}
async fn execute_keepalive(
probe: &HttpProbe,
url: &str,
ctx: &Context,
requests: u32,
) -> Result<ProbeResult, ExecutionError> {
let start = Instant::now();
let client = reqwest::Client::new();
let mut statuses = Vec::new();
for _ in 0..requests {
let mut builder = match probe.method {
HttpMethod::GET => client.get(url),
HttpMethod::POST => client.post(url),
HttpMethod::PUT => client.put(url),
HttpMethod::DELETE => client.delete(url),
HttpMethod::PATCH => client.patch(url),
HttpMethod::HEAD => client.head(url),
HttpMethod::OPTIONS => client.request(reqwest::Method::OPTIONS, url),
};
for (k, v) in &probe.headers {
let v = ctx.interpolate(v);
builder = builder.header(k, v);
}
match builder.send().await {
Ok(resp) => statuses.push(resp.status().as_u16() as i64),
Err(e) => {
return Err(ExecutionError::new(format!(
"keepalive request failed: {e}"
)))
}
}
}
let duration = start.elapsed();
let mut fields = HashMap::new();
let all_same = statuses.windows(2).all(|w| w[0] == w[1]);
fields.insert(
"status".to_string(),
Value::Int(statuses.first().copied().unwrap_or(0)),
);
fields.insert("all_status_same".to_string(), Value::Bool(all_same));
fields.insert(
"response_count".to_string(),
Value::Int(statuses.len() as i64),
);
if all_same {
if let Some(&s) = statuses.first() {
fields.insert("all_status".to_string(), Value::Int(s));
}
}
Ok(ProbeResult {
fields,
raw_stdout: None,
duration_ms: duration.as_millis() as u64,
})
}
async fn execute_pipelined(
probe: &HttpProbe,
url: &str,
ctx: &Context,
requests: u32,
) -> Result<ProbeResult, ExecutionError> {
execute_keepalive(probe, url, ctx, requests).await
}
async fn execute_burst(
probe: &HttpProbe,
url: &str,
ctx: &Context,
count: u32,
window_ms: u64,
) -> Result<ProbeResult, ExecutionError> {
let start = Instant::now();
let mut handles = Vec::new();
let window = std::time::Duration::from_millis(window_ms);
let delay_per_request = window
.checked_div(count)
.unwrap_or(std::time::Duration::ZERO);
for i in 0..count {
let probe = probe.clone();
let url = url.to_string();
let ctx = ctx.clone();
let delay = delay_per_request * i;
handles.push(tokio::spawn(async move {
tokio::time::sleep(delay).await;
execute_single(&probe, &url, &ctx).await
}));
}
let mut accepted = 0i64;
let mut rejected = 0i64;
let mut rejected_status = 0i64;
for handle in handles {
match handle.await {
Ok(Ok(result)) => {
if let Some(Value::Int(s)) = result.fields.get("status") {
if *s == 429 {
rejected += 1;
rejected_status = 429;
} else {
accepted += 1;
}
}
}
_ => rejected += 1,
}
}
let duration = start.elapsed();
let mut fields = HashMap::new();
fields.insert("accepted".to_string(), Value::Int(accepted));
fields.insert("rejected".to_string(), Value::Int(rejected));
fields.insert("rejected-status".to_string(), Value::Int(rejected_status));
fields.insert(
"status-any".to_string(),
Value::Int(if accepted > 0 { 201 } else { 429 }),
);
Ok(ProbeResult {
fields,
raw_stdout: None,
duration_ms: duration.as_millis() as u64,
})
}
fn flatten_json(prefix: &str, value: &serde_json::Value, fields: &mut HashMap<String, Value>) {
match value {
serde_json::Value::Object(obj) => {
for (key, val) in obj {
let path = format!("{prefix}.{key}");
flatten_json(&path, val, fields);
match val {
serde_json::Value::String(s) => {
fields.insert(path, Value::String(s.clone()));
}
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
fields.insert(path, Value::Int(i));
} else if let Some(f) = n.as_f64() {
fields.insert(path, Value::Float(f));
}
}
serde_json::Value::Bool(b) => {
fields.insert(path, Value::Bool(*b));
}
serde_json::Value::Null => {
fields.insert(path, Value::Null);
}
_ => {} }
}
}
serde_json::Value::Array(arr) => {
for (i, val) in arr.iter().enumerate() {
let path = format!("{prefix}[{i}]");
flatten_json(&path, val, fields);
}
}
serde_json::Value::String(s) => {
fields.insert(prefix.to_string(), Value::String(s.clone()));
}
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
fields.insert(prefix.to_string(), Value::Int(i));
} else if let Some(f) = n.as_f64() {
fields.insert(prefix.to_string(), Value::Float(f));
}
}
serde_json::Value::Bool(b) => {
fields.insert(prefix.to_string(), Value::Bool(*b));
}
serde_json::Value::Null => {
fields.insert(prefix.to_string(), Value::Null);
}
}
}