karga_http/
lib.rs

1use hdrhistogram::Histogram;
2use karga::{Aggregate, Metric, Report};
3use reqwest::Client;
4use serde::{Deserialize, Serialize};
5use std::{collections::HashMap, time::Duration};
6use typed_builder::TypedBuilder;
7
8#[derive(Clone, PartialEq, PartialOrd)]
9pub struct HttpResponseMetric {
10    pub latency: Duration,
11    pub status_code: u16,
12    pub bytes_sent: u64,
13    pub bytes_received: u64,
14}
15
16// Sometime a request can fail so the metrics shall be ignored
17#[derive(Clone, PartialEq, PartialOrd)]
18pub enum HttpMetric {
19    Success(HttpResponseMetric),
20    Failure,
21}
22
23impl Metric for HttpMetric {}
24pub struct HttpFailedRequestMetric {}
25#[derive(Clone)]
26pub struct HttpAggregate {
27    pub latency_hist: Histogram<u64>,
28    pub status_count: HashMap<u16, u64>,
29    pub total_bytes_sent: u64,
30    pub total_bytes_received: u64,
31    pub count: u64,
32    pub failure_count: u64,
33}
34
35impl Aggregate for HttpAggregate {
36    type Metric = HttpMetric;
37
38    fn new() -> Self {
39        Self {
40            latency_hist: Histogram::new(3).expect("Create histogram"),
41            status_count: HashMap::new(),
42            total_bytes_sent: 0,
43            total_bytes_received: 0,
44            count: 0,
45            failure_count: 0,
46        }
47    }
48
49    fn consume(&mut self, metric: &Self::Metric) {
50        match metric {
51            HttpMetric::Success(metric) => {
52                let res = self.latency_hist.record(metric.latency.as_nanos() as u64);
53                if let Err(res) = res {
54                    tracing::warn!("Ignoring metric reading due to error: {res}");
55                    self.failure_count += 1;
56                    return;
57                }
58                *self.status_count.entry(metric.status_code).or_default() += 1;
59                self.total_bytes_sent += metric.bytes_sent;
60                self.total_bytes_received += metric.bytes_received;
61            }
62            HttpMetric::Failure => self.failure_count += 1,
63        };
64        self.count += 1;
65    }
66
67    fn merge(&mut self, other: Self) {
68        self.latency_hist += other.latency_hist;
69
70        for (status_code, other_count) in other.status_count {
71            *self.status_count.entry(status_code).or_default() += other_count;
72        }
73        self.total_bytes_sent += other.total_bytes_sent;
74        self.total_bytes_received += other.total_bytes_received;
75        self.failure_count += other.failure_count;
76        self.count += other.count;
77    }
78}
79
80#[derive(Serialize, Deserialize, Debug)]
81pub struct HttpLatencyStats {
82    pub avg: Duration,
83    pub min: Duration,
84    pub med: Duration,
85    pub max: Duration,
86    pub p90: Duration,
87    pub p95: Duration,
88}
89
90#[derive(Serialize, Deserialize, Debug)]
91pub struct HttpReport {
92    pub req_duration: HttpLatencyStats,
93    pub reqs_total: u64,
94    pub req_failure_ratio: f64,
95    pub status_codes: HashMap<u16, u64>,
96    pub data_sent: u64,
97    pub data_received: u64,
98}
99
100impl From<HttpAggregate> for HttpReport {
101    fn from(value: HttpAggregate) -> Self {
102        let req_duration = HttpLatencyStats {
103            avg: Duration::from_nanos(value.latency_hist.mean() as u64),
104            min: Duration::from_nanos(value.latency_hist.min()),
105            med: Duration::from_nanos(value.latency_hist.value_at_quantile(0.5)),
106            max: Duration::from_nanos(value.latency_hist.max()),
107            p90: Duration::from_nanos(value.latency_hist.value_at_quantile(0.90)),
108            p95: Duration::from_nanos(value.latency_hist.value_at_quantile(0.95)),
109        };
110
111        Self {
112            req_duration,
113            reqs_total: value.count,
114            req_failure_ratio: (value.failure_count as f64 / value.count as f64) * 100.0,
115            status_codes: value.status_count,
116            data_sent: value.total_bytes_sent,
117            data_received: value.total_bytes_received,
118        }
119    }
120}
121
122impl Report<HttpAggregate> for HttpReport {}
123
124pub use reqwest::header::HeaderMap as Headers;
125pub use reqwest::Body;
126pub use reqwest::Method;
127pub use reqwest::Url;
128
129#[derive(TypedBuilder)]
130pub struct HttpActionConfig {
131    #[builder(default = Client::new())]
132    pub client: Client,
133
134    pub method: Method,
135
136    #[builder(setter(transform = |s: &str| Url::parse(s).expect("Invalid URL passed to HttpActionConfig")))]
137    pub url: Url,
138
139    #[builder(default = None)]
140    pub headers: Option<Headers>,
141
142    #[builder(default = None)]
143    pub body: Option<Body>,
144}
145
146#[macro_export]
147macro_rules! make_http_action {
148    ($config:expr) => {{
149        let config = $config;
150
151        let mut req_builder = config
152            .client
153            .request(config.method.clone(), config.url.clone());
154        if let Some(h) = config.headers {
155            req_builder = req_builder.headers(h)
156        }
157
158        if let Some(b) = config.body {
159            req_builder = req_builder.body(b)
160        }
161        let req = req_builder.build().expect("Unable to build request");
162        req.try_clone().expect("request must be Clone");
163        let req = std::sync::Arc::new(req);
164        move || {
165            let client = config.client.clone();
166            let req = req.clone();
167            async move {
168                let req = req.try_clone().unwrap();
169                let start = std::time::Instant::now();
170                let client = client.clone();
171                let res = client.execute(req).await;
172                let elapsed = start.elapsed();
173                match res {
174                    Ok(res) => $crate::HttpMetric::Success(HttpResponseMetric {
175                        latency: elapsed,
176                        status_code: res.status().into(),
177                        bytes_received: res.content_length().unwrap_or(0),
178                        bytes_sent: 0,
179                    }),
180                    Err(_) => $crate::HttpMetric::Failure,
181                }
182            }
183        }
184    }};
185}
186
187#[cfg(test)]
188mod tests {
189    use karga::Scenario;
190
191    use super::*;
192
193    #[test]
194    fn action_compatibility() {
195        let config = HttpActionConfig::builder()
196            .method(Method::GET)
197            .url("http://localhost:3000")
198            .build();
199
200        let _: Scenario<HttpAggregate, _, _> = Scenario::builder()
201            .name("random")
202            .action(make_http_action!(config))
203            .build();
204    }
205}