use std::collections::HashMap;
use std::error::Error;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering::*};
use std::time::{Duration, Instant};
use num::integer::Roots;
use reqwest::{Response, StatusCode};
use tokio::{sync as tsync, time as ttime};
#[derive(Debug)]
pub(crate) struct Statistics {
pub(crate) rsp1xx: AtomicU64,
pub(crate) rsp2xx: AtomicU64,
pub(crate) rsp3xx: AtomicU64,
pub(crate) rsp4xx: AtomicU64,
pub(crate) rsp5xx: AtomicU64,
pub(crate) rsp_others: AtomicU64,
pub(crate) errors: tsync::Mutex<HashMap<String, u64>>,
started_at: tsync::Mutex<Instant>,
total_success: AtomicU64,
total: AtomicU64,
pub(crate) max_req_per_second: tsync::Mutex<f64>,
pub(crate) avg_req_per_second: tsync::Mutex<f64>,
pub(crate) stdev_per_second: tsync::Mutex<f64>,
req_per_second: tsync::Mutex<Vec<u64>>,
current_cumulative: AtomicU64,
pub(crate) avg_req_used_time: tsync::Mutex<Duration>,
pub(crate) max_req_used_time: tsync::Mutex<Duration>,
pub(crate) stdev_req_used_time: tsync::Mutex<Duration>,
used_time: tsync::Mutex<Vec<Duration>>,
is_stopped: AtomicBool,
stopped_at: tsync::Mutex<Option<Instant>>,
pub(crate) throughput: tsync::Mutex<f64>,
pub(crate) latencies: tsync::Mutex<Vec<(f32, Duration)>>,
}
impl Statistics {
pub(crate) fn new() -> Statistics {
Self {
rsp1xx: AtomicU64::new(0),
rsp2xx: AtomicU64::new(0),
rsp3xx: AtomicU64::new(0),
rsp4xx: AtomicU64::new(0),
rsp5xx: AtomicU64::new(0),
rsp_others: AtomicU64::new(0),
errors: tsync::Mutex::new(HashMap::new()),
started_at: tsync::Mutex::new(Instant::now()),
total: AtomicU64::new(0),
total_success: AtomicU64::new(0),
req_per_second: tsync::Mutex::new(Vec::new()),
avg_req_per_second: tsync::Mutex::new(0.0),
max_req_per_second: tsync::Mutex::new(0.0),
stdev_per_second: tsync::Mutex::new(0.0),
is_stopped: AtomicBool::new(false),
current_cumulative: AtomicU64::new(0),
stopped_at: tsync::Mutex::new(None),
latencies: tsync::Mutex::new(Vec::new()),
throughput: tsync::Mutex::new(0.0),
used_time: tsync::Mutex::new(Vec::new()),
avg_req_used_time: tsync::Mutex::new(Duration::from_secs(0)),
max_req_used_time: tsync::Mutex::new(Duration::from_secs(0)),
stdev_req_used_time: tsync::Mutex::new(Duration::from_secs(0)),
}
}
pub(crate) fn get_total(&self) -> u64 {
self.total.load(Acquire)
}
pub(crate) async fn reset_start_time(&self) {
let mut started_at = self.started_at.lock().await;
*started_at = Instant::now();
}
pub(crate) async fn timer_per_second(&self) {
let mut timer = ttime::interval(Duration::from_secs(1));
timer.tick().await; loop {
timer.tick().await;
{
let mut req_per_second = self.req_per_second.lock().await;
req_per_second.push(self.current_cumulative.load(Acquire));
self.current_cumulative.store(0, SeqCst);
}
if self.is_stopped.load(Acquire) {
break;
}
}
}
fn statistics_rsp_code(&self, status: StatusCode) {
match status {
status
if status >= StatusCode::CONTINUE
&& status < StatusCode::OK =>
{
self.rsp1xx.fetch_add(1, SeqCst);
},
status
if status >= StatusCode::OK
&& status < StatusCode::MULTIPLE_CHOICES =>
{
self.rsp2xx.fetch_add(1, SeqCst);
},
status
if status >= StatusCode::MULTIPLE_CHOICES
&& status < StatusCode::BAD_REQUEST =>
{
self.rsp3xx.fetch_add(1, SeqCst);
},
status
if status >= StatusCode::BAD_REQUEST
&& status < StatusCode::INTERNAL_SERVER_ERROR =>
{
self.rsp4xx.fetch_add(1, SeqCst);
},
status
if status >= StatusCode::INTERNAL_SERVER_ERROR
&& status
<= StatusCode::NETWORK_AUTHENTICATION_REQUIRED =>
{
self.rsp5xx.fetch_add(1, SeqCst);
},
_ => {
self.rsp_others.fetch_add(1, SeqCst);
},
}
}
async fn handle_resp_error(&self, err: reqwest::Error) {
let err_msg = format!("{}", err.source().as_ref().unwrap());
{
let mut errors = self.errors.lock().await;
errors
.entry(err_msg)
.and_modify(|count| *count += 1)
.or_insert(1);
}
if let Some(status) = err.status() {
self.statistics_rsp_code(status);
}
}
pub(crate) async fn handle_message(&self, message: Message) {
let Message {
rsp_at,
req_at,
response,
} = message;
self.total.fetch_add(1, SeqCst);
if response.is_err() {
let err = response.err().unwrap();
self.handle_resp_error(err).await;
return;
}
let response = response.unwrap();
self.statistics_rsp_code(response.status());
self.total_success.fetch_add(1, SeqCst);
self.current_cumulative.fetch_add(1, SeqCst);
let mut used_time = self.used_time.lock().await;
used_time.push(rsp_at - req_at);
}
pub(crate) async fn stop_timer(&self) {
self.is_stopped.store(true, SeqCst);
let mut stopped_at = self.stopped_at.lock().await;
*stopped_at = Some(Instant::now());
}
async fn calculate_max_per_second(&self) {
let req_per_second = self.req_per_second.lock().await;
if let Some(max) = req_per_second.iter().max() {
let mut max_per_second = self.max_req_per_second.lock().await;
*max_per_second = *max as f64;
}
}
async fn calculate_avg_per_second(&self) {
let req_per_second = self.req_per_second.lock().await;
if (*req_per_second).is_empty() {
return;
}
let mut origin = &*req_per_second as &[u64];
if origin.len() > 2 {
origin = &origin[..origin.len() - 1];
}
let mut avg_per_second = self.avg_req_per_second.lock().await;
*avg_per_second =
origin.iter().sum::<u64>() as f64 / origin.len() as f64;
}
async fn calculate_stdev_per_second(&self) {
let req_per_second = self.req_per_second.lock().await;
if (*req_per_second).is_empty() {
return;
}
let mut origin = &*req_per_second as &[u64];
if origin.len() > 2 {
origin = &origin[..origin.len() - 1];
}
let count = origin.len();
let sum = origin.iter().sum::<u64>();
let mean = sum as f64 / count as f64;
let variance = origin
.iter()
.map(|x| {
let diff = *x as f64 - mean;
diff * diff
})
.sum::<f64>()
/ count as f64;
let mut stdev_per_second = self.stdev_per_second.lock().await;
*stdev_per_second = variance.sqrt();
}
async fn calculate_elapsed_time(&self) {
let mut used_time = self.used_time.lock().await;
if (*used_time).is_empty() {
return;
}
used_time.sort();
let mut avg_req_used_time = self.avg_req_used_time.lock().await;
let total: Duration = used_time.iter().sum();
let count = used_time.len();
*avg_req_used_time = total / count as u32;
let mut max_req_used_time = self.max_req_used_time.lock().await;
if let Some(max) = used_time.iter().max() {
*max_req_used_time = *max;
}
let sum = (*used_time).iter().sum::<Duration>();
let mean = (sum as Duration / count as u32).as_nanos();
let variance: u128 = (*used_time)
.iter()
.map(|x| {
let diff: i128 = (*x).as_nanos() as i128 - mean as i128;
(diff * diff) as u128
})
.sum::<u128>()
/ count as u128;
let stdev = variance.sqrt();
let mut stdev_req_used_time = self.stdev_req_used_time.lock().await;
*stdev_req_used_time = Duration::from_nanos(stdev as u64);
}
async fn calculate_throughput(&self, connections: u16) {
let avg_req_used_time = self.avg_req_used_time.lock().await;
let mut throughput = self.throughput.lock().await;
let sec = (*avg_req_used_time).as_secs_f64();
if sec > 0f64 {
*throughput = connections as f64 / sec;
}
}
async fn calculate_latencies(&self, percentiles: Vec<f32>) {
let mut used_time = self.used_time.lock().await;
if used_time.is_empty() {
return;
}
if !used_time.is_sorted() {
used_time.sort();
}
let mut latencies = self.latencies.lock().await;
let count = used_time.len();
for percent in percentiles {
let percent_len = (count as f32 * percent) as usize;
if percent_len > count || percent_len == 0 {
continue;
}
let percent_elapsed_time: &[Duration] =
&(*used_time)[..percent_len];
let sum = percent_elapsed_time.iter().sum::<Duration>();
latencies.push((percent, sum / percent_len as u32));
}
}
async fn clear_temporary_data(&self) {
let mut used_time = self.used_time.lock().await;
used_time.clear();
used_time.shrink_to(0);
}
pub(crate) async fn summary(
&self,
connections: u16,
percentiles: Vec<f32>,
) {
self.calculate_max_per_second().await;
self.calculate_avg_per_second().await;
self.calculate_elapsed_time().await;
self.calculate_stdev_per_second().await;
self.calculate_throughput(connections).await;
self.calculate_latencies(percentiles).await;
self.clear_temporary_data().await;
}
}
impl Default for Statistics {
fn default() -> Self {
Statistics::new()
}
}
#[derive(Debug)]
pub(crate) struct Message {
rsp_at: Instant,
req_at: Instant,
response: Result<Response, reqwest::Error>,
}
impl Message {
pub(crate) fn new(
response: Result<Response, reqwest::Error>,
req_at: Instant,
rsp_at: Instant,
) -> Message {
Self {
rsp_at,
req_at,
response,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_statistics_new() {
let stats = Statistics::new();
assert_eq!(stats.rsp1xx.load(Acquire), 0);
assert_eq!(stats.rsp2xx.load(Acquire), 0);
assert_eq!(stats.rsp3xx.load(Acquire), 0);
assert_eq!(stats.rsp4xx.load(Acquire), 0);
assert_eq!(stats.rsp5xx.load(Acquire), 0);
assert_eq!(stats.rsp_others.load(Acquire), 0);
assert_eq!(stats.total.load(Acquire), 0);
assert_eq!(stats.total_success.load(Acquire), 0);
}
#[test]
fn test_statistics_default() {
let stats = Statistics::default();
assert_eq!(stats.rsp1xx.load(Acquire), 0);
assert_eq!(stats.rsp2xx.load(Acquire), 0);
}
#[test]
fn test_statistics_get_total() {
let stats = Statistics::new();
assert_eq!(stats.get_total(), 0);
stats.total.fetch_add(10, SeqCst);
assert_eq!(stats.get_total(), 10);
}
#[tokio::test]
async fn test_statistics_reset_start_time() {
let stats = Statistics::new();
let old_start = *stats.started_at.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
stats.reset_start_time().await;
let new_start = *stats.started_at.lock().await;
assert!(new_start > old_start);
}
#[tokio::test]
async fn test_statistics_stop_timer() {
let stats = Statistics::new();
assert!(!stats.is_stopped.load(Acquire));
stats.stop_timer().await;
assert!(stats.is_stopped.load(Acquire));
assert!(stats.stopped_at.lock().await.is_some());
}
#[test]
fn test_statistics_rsp_code_1xx() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::CONTINUE);
assert_eq!(stats.rsp1xx.load(Acquire), 1);
}
#[test]
fn test_statistics_rsp_code_2xx() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::OK);
assert_eq!(stats.rsp2xx.load(Acquire), 1);
}
#[test]
fn test_statistics_rsp_code_3xx() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::MULTIPLE_CHOICES);
assert_eq!(stats.rsp3xx.load(Acquire), 1);
}
#[test]
fn test_statistics_rsp_code_4xx() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::BAD_REQUEST);
assert_eq!(stats.rsp4xx.load(Acquire), 1);
}
#[test]
fn test_statistics_rsp_code_5xx() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(stats.rsp5xx.load(Acquire), 1);
}
#[test]
fn test_statistics_rsp_code_others() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::from_u16(600).unwrap());
assert_eq!(stats.rsp_others.load(Acquire), 1);
}
#[tokio::test]
async fn test_statistics_summary() {
let stats = Statistics::new();
stats.total.fetch_add(100, SeqCst);
stats.total_success.fetch_add(95, SeqCst);
stats.rsp2xx.fetch_add(90, SeqCst);
stats.rsp4xx.fetch_add(5, SeqCst);
let mut used_time = stats.used_time.lock().await;
for i in 0..10 {
used_time.push(Duration::from_millis(i as u64));
}
drop(used_time);
stats.summary(10, vec![0.5, 0.9]).await;
let avg = *stats.avg_req_per_second.lock().await;
assert!(avg >= 0.0);
}
#[tokio::test]
async fn test_message_new() {
let req_at = Instant::now();
let rsp_at = Instant::now();
let client = reqwest::Client::new();
let response = client
.get("http://invalid-url-that-does-not-exist-12345.com")
.send()
.await;
let message = Message::new(response, req_at, rsp_at);
assert!(message.response.is_err());
}
#[tokio::test]
async fn test_statistics_handle_message_success() {
let stats = Statistics::new();
let client = reqwest::Client::new();
let response = client.get("http://httpbin.org/status/200").send().await;
let message = Message::new(response, Instant::now(), Instant::now());
stats.handle_message(message).await;
assert_eq!(stats.total.load(Acquire), 1);
}
#[tokio::test]
async fn test_statistics_handle_message_error() {
let stats = Statistics::new();
let client = reqwest::Client::new();
let response = client
.get("http://invalid-url-that-does-not-exist-12345.com")
.send()
.await;
let message = Message::new(response, Instant::now(), Instant::now());
stats.handle_message(message).await;
assert_eq!(stats.total.load(Acquire), 1);
assert_eq!(stats.total_success.load(Acquire), 0);
}
#[tokio::test]
async fn test_statistics_calculate_elapsed_time() {
let stats = Statistics::new();
let mut used_time = stats.used_time.lock().await;
for i in 0..10 {
used_time.push(Duration::from_millis(i as u64 * 10));
}
drop(used_time);
stats.calculate_elapsed_time().await;
let avg = *stats.avg_req_used_time.lock().await;
let max = *stats.max_req_used_time.lock().await;
let stdev = *stats.stdev_req_used_time.lock().await;
assert!(avg > Duration::ZERO);
assert!(max > Duration::ZERO);
assert!(stdev >= Duration::ZERO);
}
#[tokio::test]
async fn test_statistics_calculate_throughput() {
let stats = Statistics::new();
let mut avg_time = stats.avg_req_used_time.lock().await;
*avg_time = Duration::from_millis(100);
drop(avg_time);
stats.calculate_throughput(10).await;
let throughput = *stats.throughput.lock().await;
assert!(throughput > 0.0);
}
#[tokio::test]
async fn test_statistics_calculate_latencies() {
let stats = Statistics::new();
let mut used_time = stats.used_time.lock().await;
for i in 0..100 {
used_time.push(Duration::from_millis(i as u64));
}
drop(used_time);
stats.calculate_latencies(vec![0.5, 0.9]).await;
let latencies = stats.latencies.lock().await;
assert!(!latencies.is_empty());
assert_eq!(latencies.len(), 2);
}
#[tokio::test]
async fn test_statistics_timer_per_second() {
use std::sync::Arc;
let stats = Arc::new(Statistics::new());
stats.current_cumulative.fetch_add(10, SeqCst);
let stats_clone = Arc::clone(&stats);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(1500)).await;
stats_clone.stop_timer().await;
});
stats.timer_per_second().await;
let req_per_second = stats.req_per_second.lock().await;
assert!(!req_per_second.is_empty());
}
#[test]
fn test_statistics_rsp_code_boundary_cases() {
let stats = Statistics::new();
stats.statistics_rsp_code(StatusCode::CONTINUE); stats.statistics_rsp_code(StatusCode::OK); stats.statistics_rsp_code(StatusCode::MULTIPLE_CHOICES); stats.statistics_rsp_code(StatusCode::BAD_REQUEST); stats.statistics_rsp_code(StatusCode::NETWORK_AUTHENTICATION_REQUIRED);
assert_eq!(stats.rsp1xx.load(Acquire), 1);
assert_eq!(stats.rsp2xx.load(Acquire), 1);
assert_eq!(stats.rsp3xx.load(Acquire), 1);
assert_eq!(stats.rsp4xx.load(Acquire), 1);
assert_eq!(stats.rsp5xx.load(Acquire), 1);
}
#[tokio::test]
async fn test_statistics_clear_temporary_data() {
let stats = Statistics::new();
let mut used_time = stats.used_time.lock().await;
for i in 0..10 {
used_time.push(Duration::from_millis(i as u64));
}
drop(used_time);
stats.clear_temporary_data().await;
let used_time = stats.used_time.lock().await;
assert!(used_time.is_empty());
}
#[tokio::test]
async fn test_statistics_calculate_avg_per_second() {
let stats = Statistics::new();
let mut req_per_second = stats.req_per_second.lock().await;
for i in 0..10 {
req_per_second.push(i * 10);
}
drop(req_per_second);
stats.calculate_avg_per_second().await;
let avg = *stats.avg_req_per_second.lock().await;
assert!(avg > 0.0);
}
#[tokio::test]
async fn test_statistics_calculate_max_per_second() {
let stats = Statistics::new();
let mut req_per_second = stats.req_per_second.lock().await;
for i in 0..10 {
req_per_second.push(i * 10);
}
drop(req_per_second);
stats.calculate_max_per_second().await;
let max = *stats.max_req_per_second.lock().await;
assert_eq!(max, 90.0);
}
#[tokio::test]
async fn test_statistics_calculate_stdev_per_second() {
let stats = Statistics::new();
let mut req_per_second = stats.req_per_second.lock().await;
for i in 0..10 {
req_per_second.push(i * 10);
}
drop(req_per_second);
stats.calculate_stdev_per_second().await;
let stdev = *stats.stdev_per_second.lock().await;
assert!(stdev > 0.0);
}
}