1use hdrhistogram::Histogram;
2use karga::{Aggregate, Metric, Report};
3use reqwest::Client;
4use serde::{Deserialize, Serialize};
5use std::{collections::HashMap, time::Duration};
6use typed_builder::TypedBuilder;
7
8#[derive(Clone, PartialEq, PartialOrd)]
9pub struct HttpResponseMetric {
10 pub latency: Duration,
11 pub status_code: u16,
12 pub bytes_sent: u64,
13 pub bytes_received: u64,
14}
15
16#[derive(Clone, PartialEq, PartialOrd)]
18pub enum HttpMetric {
19 Success(HttpResponseMetric),
20 Failure,
21}
22
23impl Metric for HttpMetric {}
24pub struct HttpFailedRequestMetric {}
25#[derive(Clone)]
26pub struct HttpAggregate {
27 pub latency_hist: Histogram<u64>,
28 pub status_count: HashMap<u16, u64>,
29 pub total_bytes_sent: u64,
30 pub total_bytes_received: u64,
31 pub count: u64,
32 pub failure_count: u64,
33}
34
35impl Aggregate for HttpAggregate {
36 type Metric = HttpMetric;
37
38 fn new() -> Self {
39 Self {
40 latency_hist: Histogram::new(3).expect("Create histogram"),
41 status_count: HashMap::new(),
42 total_bytes_sent: 0,
43 total_bytes_received: 0,
44 count: 0,
45 failure_count: 0,
46 }
47 }
48
49 fn consume(&mut self, metric: &Self::Metric) {
50 match metric {
51 HttpMetric::Success(metric) => {
52 let res = self.latency_hist.record(metric.latency.as_nanos() as u64);
53 if let Err(res) = res {
54 tracing::warn!("Ignoring metric reading due to error: {res}");
55 self.failure_count += 1;
56 return;
57 }
58 *self.status_count.entry(metric.status_code).or_default() += 1;
59 self.total_bytes_sent += metric.bytes_sent;
60 self.total_bytes_received += metric.bytes_received;
61 }
62 HttpMetric::Failure => self.failure_count += 1,
63 };
64 self.count += 1;
65 }
66
67 fn merge(&mut self, other: Self) {
68 self.latency_hist += other.latency_hist;
69
70 for (status_code, other_count) in other.status_count {
71 *self.status_count.entry(status_code).or_default() += other_count;
72 }
73 self.total_bytes_sent += other.total_bytes_sent;
74 self.total_bytes_received += other.total_bytes_received;
75 self.failure_count += other.failure_count;
76 self.count += other.count;
77 }
78}
79
80#[derive(Serialize, Deserialize, Debug)]
81pub struct HttpLatencyStats {
82 pub avg: Duration,
83 pub min: Duration,
84 pub med: Duration,
85 pub max: Duration,
86 pub p90: Duration,
87 pub p95: Duration,
88}
89
90#[derive(Serialize, Deserialize, Debug)]
91pub struct HttpReport {
92 pub req_duration: HttpLatencyStats,
93 pub reqs_total: u64,
94 pub req_failure_ratio: f64,
95 pub status_codes: HashMap<u16, u64>,
96 pub data_sent: u64,
97 pub data_received: u64,
98}
99
100impl From<HttpAggregate> for HttpReport {
101 fn from(value: HttpAggregate) -> Self {
102 let req_duration = HttpLatencyStats {
103 avg: Duration::from_nanos(value.latency_hist.mean() as u64),
104 min: Duration::from_nanos(value.latency_hist.min()),
105 med: Duration::from_nanos(value.latency_hist.value_at_quantile(0.5)),
106 max: Duration::from_nanos(value.latency_hist.max()),
107 p90: Duration::from_nanos(value.latency_hist.value_at_quantile(0.90)),
108 p95: Duration::from_nanos(value.latency_hist.value_at_quantile(0.95)),
109 };
110
111 Self {
112 req_duration,
113 reqs_total: value.count,
114 req_failure_ratio: (value.failure_count as f64 / value.count as f64) * 100.0,
115 status_codes: value.status_count,
116 data_sent: value.total_bytes_sent,
117 data_received: value.total_bytes_received,
118 }
119 }
120}
121
122impl Report<HttpAggregate> for HttpReport {}
123
124pub use reqwest::header::HeaderMap as Headers;
125pub use reqwest::Body;
126pub use reqwest::Method;
127pub use reqwest::Url;
128
129#[derive(TypedBuilder)]
130pub struct HttpActionConfig {
131 #[builder(default = Client::new())]
132 pub client: Client,
133
134 pub method: Method,
135
136 #[builder(setter(transform = |s: &str| Url::parse(s).expect("Invalid URL passed to HttpActionConfig")))]
137 pub url: Url,
138
139 #[builder(default = None)]
140 pub headers: Option<Headers>,
141
142 #[builder(default = None)]
143 pub body: Option<Body>,
144}
145
146#[macro_export]
147macro_rules! make_http_action {
148 ($config:expr) => {{
149 let config = $config;
150
151 let mut req_builder = config
152 .client
153 .request(config.method.clone(), config.url.clone());
154 if let Some(h) = config.headers {
155 req_builder = req_builder.headers(h)
156 }
157
158 if let Some(b) = config.body {
159 req_builder = req_builder.body(b)
160 }
161 let req = req_builder.build().expect("Unable to build request");
162 req.try_clone().expect("request must be Clone");
163 let req = std::sync::Arc::new(req);
164 move || {
165 let client = config.client.clone();
166 let req = req.clone();
167 async move {
168 let req = req.try_clone().unwrap();
169 let start = std::time::Instant::now();
170 let client = client.clone();
171 let res = client.execute(req).await;
172 let elapsed = start.elapsed();
173 match res {
174 Ok(res) => $crate::HttpMetric::Success(HttpResponseMetric {
175 latency: elapsed,
176 status_code: res.status().into(),
177 bytes_received: res.content_length().unwrap_or(0),
178 bytes_sent: 0,
179 }),
180 Err(_) => $crate::HttpMetric::Failure,
181 }
182 }
183 }
184 }};
185}
186
187#[cfg(test)]
188mod tests {
189 use karga::Scenario;
190
191 use super::*;
192
193 #[test]
194 fn action_compatibility() {
195 let config = HttpActionConfig::builder()
196 .method(Method::GET)
197 .url("http://localhost:3000")
198 .build();
199
200 let _: Scenario<HttpAggregate, _, _> = Scenario::builder()
201 .name("random")
202 .action(make_http_action!(config))
203 .build();
204 }
205}