use std::str::FromStr;
use std::sync::{Arc};
use histogram::Histogram;
use std::time::{Duration, Instant};
use indicatif::ProgressBar;
use tokio::time::interval;
use anyhow::{Context};
use reqwest::{Method};
use tokio::sync::Mutex;
use reqwest::header::{HeaderMap, HeaderValue, COOKIE, HeaderName};
use serde_json::Value;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::core::parse_form_data;
use crate::core::status_share::{RESULT_QUEUE, SHOULD_STOP};
use crate::models::http_error_stats::HttpErrorStats;
use crate::models::result::TestResult;
pub async fn run(
url: &str,
test_duration_secs: u64,
concurrent_requests: i32,
timeout_secs:u64,
verbose: bool,
method: &str,
json_str: Option<String>,
form_data_str: Option<String>,
headers: Option<Vec<String>>,
cookie: Option<String>
) -> anyhow::Result<TestResult> {
let method = method.to_owned();
let histogram = Arc::new(Mutex::new(Histogram::new(14, 20).unwrap()));
let successful_requests = Arc::new(Mutex::new(0));
let total_requests = Arc::new(Mutex::new(0));
let max_response_time = Arc::new(Mutex::new(0u64));
let min_response_time = Arc::new(Mutex::new(u64::MAX));
let err_count = Arc::new(Mutex::new(0));
let mut handles = Vec::new();
let total_response_size = Arc::new(Mutex::new(0u64));
let http_errors = Arc::new(Mutex::new(HttpErrorStats::new()));
if json_str.is_some() && form_data_str.is_some(){
return Err(anyhow::Error::msg("json和form不允许同时发送"));
}
let mut json_obj: Arc<Option<Value>> = Arc::new(None);
if let Some(ref json_str) = json_str {
let json: Value = serde_json::from_str(json_str).expect("解析json失败");
json_obj = Arc::new(Some(json));
}
let mut header_map = Arc::new(None);
if let Some(headers) = headers{
let mut temp_headers_map = HeaderMap::new();
for header in headers {
let parts: Vec<&str> = header.splitn(2, ':').collect();
if parts.len() == 2 {
match parts[0].trim().parse::<HeaderName>() {
Ok(header_name) =>{
match HeaderValue::from_str(parts[1].trim()) {
Ok(header_value)=>{
temp_headers_map.insert(header_name, header_value);
}
Err(_) => {
return Err(anyhow::Error::msg("无法解析header的值"));
}
}
}
Err(_) => {
return Err(anyhow::Error::msg("无法解析header名称"));
}
}
}
}
header_map = Arc::new(Some(temp_headers_map))
}
let mut form_map = Arc::new(None);
if let Some(ref form_str) = form_data_str{
let form_data = parse_form_data::parse_form_data(&form_str);
form_map = Arc::new(Some(form_data));
}
let test_start = Instant::now();
let test_end = test_start + Duration::from_secs(test_duration_secs);
for _ in 0..concurrent_requests {
let client_builder = reqwest::Client::builder();
let client = if timeout_secs > 0 {
client_builder.timeout(Duration::from_secs(timeout_secs)).build().context("构建带超时的http客户端失败")?
} else {
client_builder.build().context("构建http客户端失败")?
};
let cookie_clone = cookie.clone();
let method_clone = method.clone();
let json_obj_clone = json_obj.clone();
let form_map_clone = form_map.clone();
let url = url.to_string();
let histogram_clone = histogram.clone();
let successful_requests_clone = successful_requests.clone();
let max_response_time_clone = max_response_time.clone();
let min_response_time_clone = min_response_time.clone();
let err_count_clone = err_count.clone();
let total_response_size_clone = total_response_size.clone();
let total_requests_clone = total_requests.clone();
let http_errors_clone = http_errors.clone();
let header_map_clone = header_map.clone();
let handle = tokio::spawn(async move {
while Instant::now() < test_end {
*total_requests_clone.lock().await += 1;
let start = Instant::now();
let method = Method::from_str(&method_clone.to_uppercase()).expect("无效的方法");
let mut request = client.request(method, &url);
let mut headers = HeaderMap::new();
if let Some(header_map) = &*header_map_clone {
headers.extend(header_map.iter().map(|(k, v)| (k.clone(), v.clone())));
}
if let Some(ref cookie_clone) = cookie_clone {
match HeaderValue::from_str(cookie_clone) {
Ok(h) => {
headers.insert(COOKIE, h);
},
Err(e) => {
eprintln!("无法添加cookie:{:?}", e);
}
}
}
request = request.headers(headers);
if let Some(value) = &*json_obj_clone {
request = request.json(value);
}
if let Some(form_map) = &*form_map_clone{
request = request.form(form_map);
}
match request.send().await {
Ok(response) if response.status().is_success() => {
let duration = start.elapsed().as_millis() as u64;
let mut max_rt = max_response_time_clone.lock().await;
*max_rt = (*max_rt).max(duration);
let mut min_rt = min_response_time_clone.lock().await;
*min_rt = (*min_rt).min(duration);
*successful_requests_clone.lock().await += 1;
match histogram_clone.lock().await.increment(duration) {
Ok(_) => {},
Err(err) => eprintln!("错误:{}", err),
}
if let Some(content_length) = response.content_length() {
let mut total_size = total_response_size_clone.lock().await;
*total_size += content_length;
}
if verbose {
match response.bytes().await.context("读取响应体失败"){
Ok(bytes) => {
let buffer = String::from_utf8(bytes.to_vec()).expect("无法转换响应体为字符串");
println!("{:+?}", buffer);
}
Err(e) => {
eprintln!("读取响应失败:{:?}", e.to_string())
}
};
}
},
Err(e) => {
*err_count_clone.lock().await += 1;
let status_code: u16;
match e.status(){
None => {
status_code = 0;
}
Some(code) => {
status_code = u16::from(code);
}
}
let err_msg = e.to_string();
http_errors_clone.lock().await.increment(status_code, err_msg);
}
res => {
*err_count_clone.lock().await += 1;
match res {
Ok(response) => {
let status_code = response.status().as_u16();
match response.bytes().await {
Ok(bytes) => {
let bytes_vec = bytes.to_vec();
match String::from_utf8(bytes_vec) {
Ok(body) => {
http_errors_clone.lock().await.increment(status_code, body);
},
Err(e) => {
http_errors_clone.lock().await.increment(status_code, format!("{:?}", e));
}
}
},
Err(e) => {
eprintln!("错误信息转换失败:{:?}", e)
}
}
}
Err(e) => {
eprintln!("获取错误响应体失败:{:?}", e)
}
}
}
}
}
});
handles.push(handle);
}
{
let total_requests_clone = Arc::clone(&total_requests);
let successful_requests_clone = Arc::clone(&successful_requests);
let histogram_clone = Arc::clone(&histogram);
let total_response_size_clone = Arc::clone(&total_response_size);
let http_errors_clone = Arc::clone(&http_errors);
let err_count_clone = Arc::clone(&err_count);
let max_resp_time_clone = Arc::clone(&max_response_time);
let min_resp_time_clone = Arc::clone(&min_response_time);
let err_count = *err_count_clone.lock().await;
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(1));
let should_stop = *SHOULD_STOP.lock();
while !should_stop {
interval.tick().await;
let max_response_time_c = *max_resp_time_clone.lock().await;
let min_response_time_c = *min_resp_time_clone.lock().await;
let total_duration = (Instant::now() - test_start).as_secs_f64();
let total_requests = *total_requests_clone.lock().await as f64;
let successful_requests = *successful_requests_clone.lock().await as f64;
let success_rate = (total_requests - err_count as f64) / total_requests * 100.0;
let histogram = histogram_clone.lock().await;
let total_response_size_kb = *total_response_size_clone.lock().await as f64 / 1024.0;
let throughput_kb_s = total_response_size_kb / total_duration;
let http_errors = http_errors_clone.lock().await.errors.clone();
let rps = successful_requests / total_duration;
let resp_median_line = match histogram.percentile(50.0){
Ok(bucket) => *bucket.range().start(),
Err(_) =>0
};
let resp_95_line = match histogram.percentile(95.0){
Ok(bucket) => *bucket.range().start(),
Err(_) =>0
};
let resp_99_line = match histogram.percentile(99.0){
Ok(bucket) => *bucket.range().start(),
Err(_) =>0
};
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_millis(),
Err(_) => 0,
};
let mut queue = RESULT_QUEUE.lock();
queue.push_back(TestResult{
total_duration,
success_rate,
median_response_time: resp_median_line,
response_time_95: resp_95_line,
response_time_99: resp_99_line,
total_requests: total_requests as i32,
rps,
max_response_time: max_response_time_c,
min_response_time:min_response_time_c,
err_count,
total_data_kb:total_response_size_kb,
throughput_per_second_kb: throughput_kb_s,
http_errors: http_errors.lock().unwrap().clone(),
timestamp
});
}
});
}
match verbose{
true => {
for handle in handles {
handle.await.unwrap();
}
}
false => {
let pb = ProgressBar::new(100);
let progress_interval = Duration::from_millis(300);
let mut interval = interval(progress_interval);
tokio::spawn(async move {
while Instant::now() < test_end {
interval.tick().await;
let elapsed = Instant::now().duration_since(test_start).as_secs_f64();
let progress = (elapsed / test_duration_secs as f64) * 100.0;
pb.set_position(progress as u64);
}
pb.finish_and_clear();
}).await.unwrap();
let bar = ProgressBar::new_spinner();
bar.enable_steady_tick(Duration::from_millis(100));
bar.set_message("等待所有请求响应");
for handle in handles {
handle.await.unwrap();
}
bar.finish_with_message("");
bar.finish();
}
}
let total_duration = (Instant::now() - test_start).as_secs_f64();
let total_requests = *total_requests.lock().await as f64;
let successful_requests = *successful_requests.lock().await as f64;
let success_rate = successful_requests / total_requests * 100.0;
let histogram = histogram.lock().await;
let total_response_size_kb = *total_response_size.lock().await as f64 / 1024.0;
let throughput_kb_s = total_response_size_kb / test_duration_secs as f64;
let http_errors = http_errors.lock().await.errors.clone();
let err_count_clone = Arc::clone(&err_count);
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_millis(),
Err(_) => 0,
};
let test_result = TestResult {
total_duration,
success_rate,
median_response_time: *histogram.percentile(50.0)?.range().start(),
response_time_95: *histogram.percentile(95.0)?.range().start(),
response_time_99: *histogram.percentile(99.0)?.range().start(),
total_requests: total_requests as i32,
rps: successful_requests / test_duration_secs as f64,
max_response_time: *max_response_time.lock().await,
min_response_time: *min_response_time.lock().await,
err_count:*err_count_clone.lock().await,
total_data_kb:total_response_size_kb,
throughput_per_second_kb: throughput_kb_s,
http_errors: http_errors.lock().unwrap().clone(),
timestamp
};
let mut should_stop = SHOULD_STOP.lock();
*should_stop = true;
eprintln!("压测结束");
Ok(test_result)
}