use std::collections::HashMap;
use std::sync::Arc;
use crate::client::util::{color_green, color_red, color_yellow};
use std::time::Duration;
use std::io::BufRead;
use crate::cli::cli_parser::{validate_url, validate_method, parse_header, parse_query};
use reqwest::{Client, Method as ReqwestMethod};
use futures::future::join_all;
pub struct BurstEngine {
pub requests: Vec<Arc<BurstRequest>>,
pub options: BurstOptions,
}
#[derive(Debug, Clone)]
pub struct BurstRequest {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub query: HashMap<String, String>,
pub body: Option<Vec<u8>>,
pub output: Option<String>,
}
impl BurstRequest {
pub fn parse_from_line(line: &str) -> Result<Self, String> {
let mut method = None;
let mut url = None;
let mut headers = HashMap::new();
let mut query = HashMap::new();
let mut body = None;
let mut output = None;
let tokens: Vec<&str> = line.split_whitespace().collect();
for token in tokens {
if method.is_none() {
validate_method(token).map_err(|e| format!("Invalid method: {e}"))?;
method = Some(token.to_string());
continue;
}
if url.is_none() {
validate_url(token).map_err(|e| format!("Invalid URL: {e}"))?;
url = Some(token.to_string());
continue;
}
if let Some(h) = token.strip_prefix("header:") {
let (k, v) = parse_header(h).map_err(|e| format!("Invalid header: {e}"))?;
headers.insert(k, v);
} else if let Some(q) = token.strip_prefix("query:") {
let (k, v) = parse_query(q).map_err(|e| format!("Invalid query: {e}"))?;
query.insert(k, v);
} else if let Some(b) = token.strip_prefix("body:") {
body = Some(b.as_bytes().to_vec());
} else if let Some(o) = token.strip_prefix("output:") {
output = Some(o.to_string());
}
}
let method = method.ok_or("Missing HTTP method")?;
let url = url.ok_or("Missing URL")?;
Ok(BurstRequest { url, method, headers, query, body, output })
}
pub fn parse_batch<R: BufRead>(reader: R) -> Result<Vec<BurstRequest>, String> {
let mut requests = Vec::new();
for (i, line) in reader.lines().enumerate() {
let line = line.map_err(|e| format!("IO error on line {i}: {e}"))?;
let line = line.trim();
if line.is_empty() || line.starts_with('#') { continue; }
let req = BurstRequest::parse_from_line(line)
.map_err(|e| format!("Parse error on line {}: {}", i + 1, e))?;
requests.push(req);
}
Ok(requests)
}
}
pub struct BurstOptions {
pub concurrency: usize, pub throttle_per_sec: Option<usize>, pub retries: usize, pub backoff: Option<Duration>, pub summary: bool, pub output_dir: Option<String>, pub timeout: Option<Duration>, }
pub struct BurstResult {
pub request: Arc<BurstRequest>,
pub status: Option<u16>,
pub success: bool,
pub duration: Duration,
pub error: Option<String>,
pub output_path: Option<String>,
}
impl BurstEngine {
pub fn new(requests: Vec<BurstRequest>, mut options: BurstOptions) -> Self {
if options.timeout.is_none() {
options.timeout = Some(Duration::from_secs(30));
}
let requests = requests.into_iter().map(Arc::new).collect();
BurstEngine { requests, options }
}
pub fn new_with_timeout(requests: Vec<BurstRequest>, mut options: BurstOptions, timeout: Duration) -> Self {
options.timeout = Some(timeout);
let requests = requests.into_iter().map(Arc::new).collect();
BurstEngine { requests, options }
}
pub async fn run(&self) -> Vec<BurstResult> {
use tokio::fs;
let client = Client::builder()
.timeout(self.options.timeout.unwrap_or(Duration::from_secs(30)))
.build()
.expect("Failed to build reqwest client with timeout");
let throttle = self.options.throttle_per_sec.unwrap_or(0);
let retries = self.options.retries;
let backoff = self.options.backoff.unwrap_or(Duration::from_millis(0));
let output_dir = self.options.output_dir.clone();
if let Some(ref dir) = output_dir {
if let Err(e) = fs::create_dir_all(dir).await {
eprintln!("[Burst] Failed to create output directory '{}': {}", dir, e);
}
}
let handles: Vec<_> = self.requests.iter().enumerate().map(|(i, req)| {
let req = Arc::clone(req);
let client = client.clone();
let output_dir = output_dir.clone();
async move {
if throttle > 0 && i > 0 {
let delay = Duration::from_millis(1000 / throttle as u64);
tokio::time::sleep(delay).await;
}
let mut attempt = 0;
let start = std::time::Instant::now();
loop {
let mut builder = client.request(
ReqwestMethod::from_bytes(req.method.as_bytes()).unwrap_or(ReqwestMethod::GET),
&req.url,
);
for (k, v) in &req.headers {
builder = builder.header(k, v);
}
if !req.query.is_empty() {
builder = builder.query(&req.query);
}
if let Some(body) = &req.body {
builder = builder.body(body.clone());
}
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let success = resp.status().is_success();
let duration = start.elapsed();
let body = resp.bytes().await.ok();
let (output_path, error) = if success {
if let Some(ref out) = req.output {
let path = if let Some(ref dir) = output_dir {
format!("{}/{}", dir, out)
} else {
out.clone()
};
if let Some(ref b) = body {
match fs::write(&path, b).await {
Ok(_) => (Some(path), None),
Err(e) => (None, Some(format!("Failed to write output: {}", e))),
}
} else {
(None, None)
}
} else if let Some(ref dir) = output_dir {
let path = format!("{}/response_{}.bin", dir, i);
if let Some(ref b) = body {
match fs::write(&path, b).await {
Ok(_) => (Some(path), None),
Err(e) => (None, Some(format!("Failed to write output: {}", e))),
}
} else {
(None, None)
}
} else if let Some(ref b) = body {
if let Ok(s) = std::str::from_utf8(b) {
println!("[Burst][{}] {} {}\n{}", i, req.method, req.url, s);
} else {
println!("[Burst][{}] {} {} (binary {} bytes)", i, req.method, req.url, b.len());
}
(None, None)
} else {
(None, None)
}
} else {
(None, Some(format!("HTTP error: status {}", status)))
};
return BurstResult {
request: req,
status: Some(status),
success,
duration,
error,
output_path,
};
}
Err(e) => {
attempt += 1;
if attempt > retries {
return BurstResult {
request: req,
status: None,
success: false,
duration: start.elapsed(),
error: Some(e.to_string()),
output_path: None,
};
}
let jitter = rand::random::<u64>() % 100;
let sleep = backoff * attempt as u32 + Duration::from_millis(jitter);
tokio::time::sleep(sleep).await;
}
}
}
}
}).collect();
let results = join_all(handles.into_iter().map(tokio::spawn)).await;
let results: Vec<BurstResult> = results.into_iter().filter_map(|r| r.ok()).collect();
if self.options.summary {
let total = results.len();
let successes = results.iter().filter(|r| r.success).count();
let failures = total - successes;
let total_time: Duration = results.iter().map(|r| r.duration).sum();
let avg_time = if total > 0 { total_time / (total as u32) } else { Duration::from_secs(0) };
println!("\n--- Burst Summary ---");
println!("Total: {} | Success: {} | Fail: {}", total, color_green(&successes.to_string()), color_red(&failures.to_string()));
println!("Avg Time: {:.2?} | Total Time: {:.2?}", avg_time, total_time);
for (i, r) in results.iter().enumerate() {
let is_timeout = r.error.as_ref().map_or(false, |e| {
let e = e.to_lowercase();
e.contains("timeout") || e.contains("timed out")
});
let (status_str, color_fn): (&str, Box<dyn Fn(&str) -> String>) = if r.success {
("OK", Box::new(color_green))
} else if is_timeout {
("TIMEOUT", Box::new(color_yellow))
} else {
("FAIL", Box::new(color_red))
};
let status_colored = color_fn(status_str);
println!("[{}] {} {} -> {}{}{}", i, r.request.method, r.request.url, status_colored, r.status.map(|s| format!(" [{}]", s)).unwrap_or_default(), r.output_path.as_ref().map(|p| format!(" -> {}", p)).unwrap_or_default());
if let Some(ref e) = r.error {
let err_colored = if is_timeout {
color_yellow(&format!("TIMEOUT: {}", e))
} else {
color_red(e)
};
println!(" Error: {}", err_colored);
}
}
println!("---------------------\n");
}
results
}
}
impl BurstEngine {
pub async fn run_with_verbosity(&self, verbose: bool, quiet: bool) -> Vec<BurstResult> {
use tokio::fs;
let client = reqwest::Client::builder()
.timeout(self.options.timeout.unwrap_or(std::time::Duration::from_secs(30)))
.build()
.expect("Failed to build reqwest client with timeout");
let throttle = self.options.throttle_per_sec.unwrap_or(0);
let retries = self.options.retries;
let backoff = self.options.backoff.unwrap_or(std::time::Duration::from_millis(0));
let output_dir = self.options.output_dir.clone();
if let Some(ref dir) = output_dir {
if dir == "/" || dir == "C:/" || dir == "C:\\" || dir == "\\" {
panic!("Refusing to use system root as output directory!");
}
if let Err(e) = fs::create_dir_all(dir).await {
eprintln!("[Burst] Failed to create output directory '{}': {}", dir, e);
}
}
if self.options.concurrency > 1000 {
panic!("Concurrency too high ({} > 1000). Refusing to run.", self.options.concurrency);
}
let wall_start = std::time::Instant::now();
let handles: Vec<_> = self.requests.iter().enumerate().map(|(i, req)| {
let req = std::sync::Arc::clone(req);
let client = client.clone();
let output_dir = output_dir.clone();
async move {
if throttle > 0 && i > 0 {
let delay = std::time::Duration::from_millis(1000 / throttle as u64);
tokio::time::sleep(delay).await;
}
let mut attempt = 0;
let start = std::time::Instant::now();
loop {
let mut builder = client.request(
reqwest::Method::from_bytes(req.method.as_bytes()).unwrap_or(reqwest::Method::GET),
&req.url,
);
for (k, v) in &req.headers {
builder = builder.header(k, v);
}
if !req.query.is_empty() {
builder = builder.query(&req.query);
}
if let Some(body) = &req.body {
builder = builder.body(body.clone());
}
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let success = resp.status().is_success();
let duration = start.elapsed();
let body = resp.bytes().await.ok();
let (output_path, error) = if success {
if let Some(ref out) = req.output {
let sanitized = out.replace("..", "_").replace('/', "_").replace('\\', "_");
let sanitized = if sanitized.len() > 128 { sanitized[..128].to_string() } else { sanitized };
let path = if let Some(ref dir) = output_dir {
format!("{}/{}", dir, sanitized)
} else {
sanitized
};
if path == "/" || path == "C:/" || path == "C:\\" || path == "\\" {
return BurstResult {
request: req,
status: None,
success: false,
duration,
error: Some("Refusing to overwrite system file!".to_string()),
output_path: None,
};
}
if let Some(ref b) = body {
if fs::metadata(&path).await.is_ok() {
eprintln!("[Burst][WARN] Output file '{}' already exists, will overwrite.", path);
}
match fs::write(&path, b).await {
Ok(_) => (Some(path), None),
Err(e) => (None, Some(format!("Failed to write output: {}", e))),
}
} else {
(None, None)
}
} else if let Some(ref dir) = output_dir {
let path = format!("{}/response_{}.bin", dir, i);
if let Some(ref b) = body {
if fs::metadata(&path).await.is_ok() {
eprintln!("[Burst][WARN] Output file '{}' already exists, will overwrite.", path);
}
match fs::write(&path, b).await {
Ok(_) => (Some(path), None),
Err(e) => (None, Some(format!("Failed to write output: {}", e))),
}
} else {
(None, None)
}
} else if let Some(ref b) = body {
if verbose && !quiet {
if let Ok(s) = std::str::from_utf8(b) {
println!("[Burst][{}] {} {}\n{}", i, req.method, req.url, s);
} else {
println!("[Burst][{}] {} {} (binary {} bytes)", i, req.method, req.url, b.len());
}
}
(None, None)
} else {
(None, None)
}
} else {
(None, Some(format!("HTTP error: status {}", status)))
};
return BurstResult {
request: req,
status: Some(status),
success,
duration,
error,
output_path,
};
}
Err(e) => {
attempt += 1;
if attempt > retries {
if verbose && !quiet {
println!("[Burst][{}] {} {} -> ERROR: {}", i, req.method, req.url, e);
}
return BurstResult {
request: req,
status: None,
success: false,
duration: start.elapsed(),
error: Some(e.to_string()),
output_path: None,
};
}
let jitter = rand::random::<u64>() % 100;
let sleep = backoff * attempt as u32 + std::time::Duration::from_millis(jitter);
tokio::time::sleep(sleep).await;
}
}
}
}
}).collect();
let results = futures::future::join_all(handles.into_iter().map(tokio::spawn)).await;
let results: Vec<BurstResult> = results.into_iter().filter_map(|r| r.ok()).collect();
if self.options.summary && !quiet {
let total = results.len();
let successes = results.iter().filter(|r| r.success).count();
let failures = total - successes;
let wall_elapsed = wall_start.elapsed();
let mut times: Vec<_> = results.iter().map(|r| r.duration).collect();
times.sort();
let avg_time = if total > 0 { times.iter().sum::<std::time::Duration>() / (total as u32) } else { std::time::Duration::from_secs(0) };
let min_time = times.first().cloned().unwrap_or_default();
let max_time = times.last().cloned().unwrap_or_default();
let median_time = if total == 0 {
std::time::Duration::from_secs(0)
} else if total % 2 == 1 {
times[total / 2]
} else {
let t1 = times[total / 2 - 1];
let t2 = times[total / 2];
t1 + (t2 - t1) / 2
};
let throughput = if wall_elapsed.as_secs_f64() > 0.0 {
total as f64 / wall_elapsed.as_secs_f64()
} else {
0.0
};
println!("\n--- Burst Summary ---");
println!("Total Requests: {} | Success: {} | Fail: {}", total, color_green(&successes.to_string()), color_red(&failures.to_string()));
println!("Elapsed (wall): {:.2?} | Throughput: {:.2} req/s", wall_elapsed, throughput);
println!("Avg: {:.2?} | Median: {:.2?} | Min: {:.2?} | Max: {:.2?}", avg_time, median_time, min_time, max_time);
println!(" (All times are per-request duration)");
println!(" Elapsed = total wall-clock time for all requests");
println!(" Throughput = requests per second");
for (i, r) in results.iter().enumerate() {
let is_timeout = r.error.as_ref().map_or(false, |e| {
let e = e.to_lowercase();
e.contains("timeout") || e.contains("timed out")
});
let (status_str, color_fn): (&str, Box<dyn Fn(&str) -> String>) = if r.success {
("OK", Box::new(color_green))
} else if is_timeout {
("TIMEOUT", Box::new(color_yellow))
} else {
("FAIL", Box::new(color_red))
};
let status_colored = color_fn(status_str);
println!("[{}] {} {} -> {}{}{}", i, r.request.method, r.request.url, status_colored, r.status.map(|s| format!(" [{}]", s)).unwrap_or_default(), r.output_path.as_ref().map(|p| format!(" -> {}", p)).unwrap_or_default());
if let Some(ref e) = r.error {
let err_colored = if is_timeout {
color_yellow(&format!("TIMEOUT: {}", e))
} else {
color_red(e)
};
println!(" Error: {}", err_colored);
}
}
println!("---------------------\n");
}
results
}
}