use crate::core::exponential_moving_average;
use crate::core::fixed_size_queue;
use crate::models::assert_error_stats::AssertErrorStats;
use crate::models::http_error_stats::HttpErrorStats;
use crate::models::result::{ApiResult, BatchResult};
use histogram::Histogram;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::select;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver;
use tokio::sync::Mutex;
use tokio::time::interval;
use url::Url;
pub(crate) async fn collect_results(
result_channel: Sender<Option<BatchResult>>,
should_stop_rx: Receiver<()>,
total_requests: Arc<AtomicUsize>,
successful_requests: Arc<AtomicUsize>,
histogram: Arc<Mutex<Histogram>>,
total_response_size: Arc<AtomicUsize>,
http_errors: Arc<Mutex<HttpErrorStats>>,
err_count: Arc<AtomicUsize>,
max_resp_time: Arc<Mutex<u64>>,
min_resp_time: Arc<Mutex<u64>>,
assert_error: Arc<Mutex<AssertErrorStats>>,
api_results: Arc<Mutex<Vec<ApiResult>>>,
concurrent_number: Arc<AtomicUsize>,
dura: Arc<Mutex<f64>>,
number_of_last_requests: Arc<AtomicUsize>,
number_of_last_errors: Arc<AtomicUsize>,
rps_queue: Arc<Mutex<fixed_size_queue::FixedSizeQueue<f64>>>,
api_rps_queue_arc: Arc<Mutex<BTreeMap<String, fixed_size_queue::FixedSizeQueue<f64>>>>,
queue_cap: usize,
verbose: bool,
test_start: Instant,
ema_alpha: f64,
) {
let mut api_res_number_map: HashMap<String, usize> = HashMap::new();
let mut interval = interval(Duration::from_secs(1));
let mut api_rps_queue_map = api_rps_queue_arc.lock().await.clone();
let ema: Option<exponential_moving_average::ExponentialMovingAverage> = match ema_alpha > 0f64 {
true => Some(exponential_moving_average::ExponentialMovingAverage::new(
ema_alpha,
)),
false => None,
};
select! {
_ = should_stop_rx => {
println!("收到停止信号");
return;
}
_ = async {
loop{
interval.tick().await;
let err_count = err_count.load(Ordering::SeqCst) as i32;
let max_response_time_c = *max_resp_time.lock().await;
let min_response_time_c = *min_resp_time.lock().await;
let total_duration = (Instant::now() - test_start).as_secs_f64();
let mut d = dura.lock().await;
let this_duration = total_duration - *d;
*d = total_duration;
let total_requests = total_requests.load(Ordering::SeqCst) as f64;
let successful_requests = successful_requests.load(Ordering::SeqCst) as f64;
let success_rate = match total_requests == 0f64 {
true => 0f64,
false => successful_requests / total_requests * 100.0,
};
let error_rate = match total_requests == 0f64 {
true => 0f64,
false => err_count as f64 / total_requests * 100.0,
};
let histogram = histogram.lock().await;
let total_response_size_kb = total_response_size.load(Ordering::SeqCst) as f64 / 1024.0;
let throughput_kb_s = total_response_size_kb / total_duration;
let http_errors = http_errors.lock().await.errors.clone();
let assert_errors = assert_error.lock().await.errors.clone();
let resp_median_line = match histogram.percentile(50.0) {
Ok(bucket) => *bucket.range().start(),
Err(_) => 0,
};
let resp_95_line = match histogram.percentile(95.0) {
Ok(bucket) => *bucket.range().start(),
Err(_) => 0,
};
let resp_99_line = match histogram.percentile(99.0) {
Ok(bucket) => *bucket.range().start(),
Err(_) => 0,
};
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_millis(),
Err(_) => 0,
};
let mut api_results = api_results.lock().await;
for (index, res) in api_results.clone().into_iter().enumerate() {
let api_latest_request_number = match api_res_number_map.get_mut(&res.name){
None => {
0
}
Some(v) => *{
v
}
};
let api_requests_per_second = res.total_requests as usize - api_latest_request_number;
api_res_number_map.insert(res.name.clone(), api_requests_per_second + api_latest_request_number);
let mut rps = api_requests_per_second as f64 / this_duration;
match api_rps_queue_map.get_mut(&res.name){
None => {
api_rps_queue_map.insert(res.name.clone(), fixed_size_queue::FixedSizeQueue::new(queue_cap));
if let Some(queue) = api_rps_queue_map.get_mut(&res.name){
queue.push(rps).await
};
}
Some(queue) => {
queue.push(rps).await
}
}
rps = match ema.clone(){
None => {
rps
}
Some(mut e) => {
e.add(rps)
}
};
api_results[index].rps = rps;
if let Ok(url) = Url::parse(&*res.url) {
if let Some(host) = url.host() {
api_results[index].host = host.to_string();
};
api_results[index].path = url.path().to_string();
};
}
let total_concurrent_number = concurrent_number.load(Ordering::SeqCst) as i32;
let errors_per_second = err_count as usize - number_of_last_errors.load(Ordering::SeqCst);
number_of_last_errors.fetch_add(errors_per_second, Ordering::Relaxed);
let requests_per_second = total_requests as usize - number_of_last_requests.load(Ordering::SeqCst);
number_of_last_requests.fetch_add(requests_per_second, Ordering::Relaxed);
let mut rps = requests_per_second as f64 / this_duration;
rps = match ema.clone(){
None => {rps}
Some(mut e) => {
e.add(rps)
}
};
let mut rps_queue = rps_queue.lock().await;
rps_queue.push(rps).await;
let result = BatchResult {
total_duration,
success_rate,
error_rate,
median_response_time: resp_median_line,
response_time_95: resp_95_line,
response_time_99: resp_99_line,
total_requests: total_requests as u64,
rps,
max_response_time: max_response_time_c,
min_response_time: min_response_time_c,
err_count,
total_data_kb: total_response_size_kb,
throughput_per_second_kb: throughput_kb_s,
http_errors: http_errors.lock().await.clone(),
timestamp,
assert_errors: assert_errors.lock().await.clone(),
total_concurrent_number,
api_results: api_results.to_vec().clone(),
errors_per_second,
};
let elapsed = test_start.elapsed();
if verbose {
println!("{:?}-{:#?}", elapsed.as_millis(), result.clone());
};
let _ = result_channel.send(Some(result)).await;
}
} => {
eprintln!("推送意外停止")
}
}
}