karga_http/
lib.rs

1use hdrhistogram::Histogram;
2use karga::{Aggregate, Metric, Report};
3use reqwest::Client;
4use reqwest::Request;
5use serde::{Deserialize, Serialize};
6use std::time::Instant;
7use std::{collections::HashMap, sync::Arc, time::Duration};
8use typed_builder::TypedBuilder;
9
10/// so you dont have to add it yourself
11pub use reqwest;
12
13#[derive(Clone, PartialEq, PartialOrd)]
14pub struct HttpResponseMetric {
15    pub latency: Duration,
16    pub status_code: u16,
17    pub bytes_sent: u64,
18    pub bytes_received: u64,
19}
20
21// Sometime a request can fail so the metrics shall be ignored
22#[derive(Clone, PartialEq, PartialOrd)]
23pub enum HttpMetric {
24    Success(HttpResponseMetric),
25    Failure,
26}
27
28impl Metric for HttpMetric {}
29pub struct HttpFailedRequestMetric {}
30#[derive(Clone)]
31pub struct HttpAggregate {
32    pub latency_hist: Histogram<u64>,
33    pub status_count: HashMap<u16, u64>,
34    pub total_bytes_sent: u64,
35    pub total_bytes_received: u64,
36    pub count: u64,
37    pub failure_count: u64,
38}
39
40impl Aggregate for HttpAggregate {
41    type Metric = HttpMetric;
42
43    fn new() -> Self {
44        Self {
45            latency_hist: Histogram::new(3).expect("Create histogram"),
46            status_count: HashMap::new(),
47            total_bytes_sent: 0,
48            total_bytes_received: 0,
49            count: 0,
50            failure_count: 0,
51        }
52    }
53
54    fn consume(&mut self, metric: &Self::Metric) {
55        match metric {
56            HttpMetric::Success(metric) => {
57                let res = self.latency_hist.record(metric.latency.as_nanos() as u64);
58                if let Err(res) = res {
59                    tracing::warn!("Ignoring metric reading due to error: {res}");
60                    self.failure_count += 1;
61                    return;
62                }
63                *self.status_count.entry(metric.status_code).or_default() += 1;
64                self.total_bytes_sent += metric.bytes_sent;
65                self.total_bytes_received += metric.bytes_received;
66            }
67            HttpMetric::Failure => self.failure_count += 1,
68        };
69        self.count += 1;
70    }
71
72    fn merge(&mut self, other: Self) {
73        self.latency_hist += other.latency_hist;
74
75        for (status_code, other_count) in other.status_count {
76            *self.status_count.entry(status_code).or_default() += other_count;
77        }
78        self.total_bytes_sent += other.total_bytes_sent;
79        self.total_bytes_received += other.total_bytes_received;
80        self.failure_count += other.failure_count;
81        self.count += other.count;
82    }
83}
84
85#[derive(Serialize, Deserialize, Debug)]
86pub struct HttpLatencyStats {
87    pub avg: Duration,
88    pub min: Duration,
89    pub med: Duration,
90    pub max: Duration,
91    pub p90: Duration,
92    pub p95: Duration,
93}
94
95#[derive(Serialize, Deserialize, Debug)]
96pub struct HttpReport {
97    pub req_duration: HttpLatencyStats,
98    pub reqs_total: u64,
99    pub req_failure_ratio: f64,
100    pub status_codes: HashMap<u16, u64>,
101    pub data_sent: u64,
102    pub data_received: u64,
103}
104
105impl From<HttpAggregate> for HttpReport {
106    fn from(value: HttpAggregate) -> Self {
107        let req_duration = HttpLatencyStats {
108            avg: Duration::from_nanos(value.latency_hist.mean() as u64),
109            min: Duration::from_nanos(value.latency_hist.min()),
110            med: Duration::from_nanos(value.latency_hist.value_at_quantile(0.5)),
111            max: Duration::from_nanos(value.latency_hist.max()),
112            p90: Duration::from_nanos(value.latency_hist.value_at_quantile(0.90)),
113            p95: Duration::from_nanos(value.latency_hist.value_at_quantile(0.95)),
114        };
115
116        Self {
117            req_duration,
118            reqs_total: value.count,
119            req_failure_ratio: (value.failure_count as f64 / value.count as f64) * 100.0,
120            status_codes: value.status_count,
121            data_sent: value.total_bytes_sent,
122            data_received: value.total_bytes_received,
123        }
124    }
125}
126
127impl Report<HttpAggregate> for HttpReport {}
128
129#[derive(TypedBuilder)]
130pub struct HttpAction {
131    pub client: Client,
132    pub request: Arc<Request>,
133}
134
135pub async fn http_action(client: Client, request: Arc<Request>) -> HttpMetric {
136    let request = request.try_clone().expect("Body of request must be Clone");
137    let start = Instant::now();
138    let client = client.clone();
139    let res = client.execute(request).await;
140    let elapsed = start.elapsed();
141    match res {
142        Ok(res) => HttpMetric::Success(HttpResponseMetric {
143            latency: elapsed,
144            status_code: res.status().into(),
145            bytes_received: res.content_length().unwrap_or(0),
146            bytes_sent: 0,
147        }),
148        Err(_) => HttpMetric::Failure,
149    }
150}