1use hdrhistogram::Histogram;
2use karga::{Aggregate, Metric, Report};
3use reqwest::Client;
4use reqwest::Request;
5use serde::{Deserialize, Serialize};
6use std::time::Instant;
7use std::{collections::HashMap, sync::Arc, time::Duration};
8use typed_builder::TypedBuilder;
9
10pub use reqwest;
12
13#[derive(Clone, PartialEq, PartialOrd)]
14pub struct HttpResponseMetric {
15 pub latency: Duration,
16 pub status_code: u16,
17 pub bytes_sent: u64,
18 pub bytes_received: u64,
19}
20
21#[derive(Clone, PartialEq, PartialOrd)]
23pub enum HttpMetric {
24 Success(HttpResponseMetric),
25 Failure,
26}
27
28impl Metric for HttpMetric {}
29pub struct HttpFailedRequestMetric {}
30#[derive(Clone)]
31pub struct HttpAggregate {
32 pub latency_hist: Histogram<u64>,
33 pub status_count: HashMap<u16, u64>,
34 pub total_bytes_sent: u64,
35 pub total_bytes_received: u64,
36 pub count: u64,
37 pub failure_count: u64,
38}
39
40impl Aggregate for HttpAggregate {
41 type Metric = HttpMetric;
42
43 fn new() -> Self {
44 Self {
45 latency_hist: Histogram::new(3).expect("Create histogram"),
46 status_count: HashMap::new(),
47 total_bytes_sent: 0,
48 total_bytes_received: 0,
49 count: 0,
50 failure_count: 0,
51 }
52 }
53
54 fn consume(&mut self, metric: &Self::Metric) {
55 match metric {
56 HttpMetric::Success(metric) => {
57 let res = self.latency_hist.record(metric.latency.as_nanos() as u64);
58 if let Err(res) = res {
59 tracing::warn!("Ignoring metric reading due to error: {res}");
60 self.failure_count += 1;
61 return;
62 }
63 *self.status_count.entry(metric.status_code).or_default() += 1;
64 self.total_bytes_sent += metric.bytes_sent;
65 self.total_bytes_received += metric.bytes_received;
66 }
67 HttpMetric::Failure => self.failure_count += 1,
68 };
69 self.count += 1;
70 }
71
72 fn merge(&mut self, other: Self) {
73 self.latency_hist += other.latency_hist;
74
75 for (status_code, other_count) in other.status_count {
76 *self.status_count.entry(status_code).or_default() += other_count;
77 }
78 self.total_bytes_sent += other.total_bytes_sent;
79 self.total_bytes_received += other.total_bytes_received;
80 self.failure_count += other.failure_count;
81 self.count += other.count;
82 }
83}
84
85#[derive(Serialize, Deserialize, Debug)]
86pub struct HttpLatencyStats {
87 pub avg: Duration,
88 pub min: Duration,
89 pub med: Duration,
90 pub max: Duration,
91 pub p90: Duration,
92 pub p95: Duration,
93}
94
95#[derive(Serialize, Deserialize, Debug)]
96pub struct HttpReport {
97 pub req_duration: HttpLatencyStats,
98 pub reqs_total: u64,
99 pub req_failure_ratio: f64,
100 pub status_codes: HashMap<u16, u64>,
101 pub data_sent: u64,
102 pub data_received: u64,
103}
104
105impl From<HttpAggregate> for HttpReport {
106 fn from(value: HttpAggregate) -> Self {
107 let req_duration = HttpLatencyStats {
108 avg: Duration::from_nanos(value.latency_hist.mean() as u64),
109 min: Duration::from_nanos(value.latency_hist.min()),
110 med: Duration::from_nanos(value.latency_hist.value_at_quantile(0.5)),
111 max: Duration::from_nanos(value.latency_hist.max()),
112 p90: Duration::from_nanos(value.latency_hist.value_at_quantile(0.90)),
113 p95: Duration::from_nanos(value.latency_hist.value_at_quantile(0.95)),
114 };
115
116 Self {
117 req_duration,
118 reqs_total: value.count,
119 req_failure_ratio: (value.failure_count as f64 / value.count as f64) * 100.0,
120 status_codes: value.status_count,
121 data_sent: value.total_bytes_sent,
122 data_received: value.total_bytes_received,
123 }
124 }
125}
126
127impl Report<HttpAggregate> for HttpReport {}
128
129#[derive(TypedBuilder)]
130pub struct HttpAction {
131 pub client: Client,
132 pub request: Arc<Request>,
133}
134
135pub async fn http_action(client: Client, request: Arc<Request>) -> HttpMetric {
136 let request = request.try_clone().expect("Body of request must be Clone");
137 let start = Instant::now();
138 let client = client.clone();
139 let res = client.execute(request).await;
140 let elapsed = start.elapsed();
141 match res {
142 Ok(res) => HttpMetric::Success(HttpResponseMetric {
143 latency: elapsed,
144 status_code: res.status().into(),
145 bytes_received: res.content_length().unwrap_or(0),
146 bytes_sent: 0,
147 }),
148 Err(_) => HttpMetric::Failure,
149 }
150}