rumeter_component/samplers/
http.rs

1use std::fmt;
2use std::{collections::HashMap, error::Error};
3
4use tracing::*;
5use async_trait::async_trait;
6
7use crate::{Sampler, record::{RecordData, ResponseResult}};
8
9pub type HeaderMap = reqwest::header::HeaderMap;
10pub type HeaderValue = reqwest::header::HeaderValue;
11pub type HeaderName = reqwest::header::HeaderName;
12
13#[derive(Clone)]
14pub struct HttpSampler {
15    label: String,
16    url: String,
17    method: Method,
18    headers: HeaderMap,
19    body: Option<String>,
20}
21
22#[derive(Debug)]
23pub struct RumeterErr {
24    message: String,
25}
26
27impl RumeterErr {
28    pub fn new(message: &str) -> Self {
29        Self { message: message.to_string() }
30    }
31}
32
33impl fmt::Display for RumeterErr {
34    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35        write!(f, "An Error Occurred, {}", self.message) // user-facing output
36    }
37}
38
39impl Error for RumeterErr {
40    
41}
42
43#[derive(Clone)]
44pub enum Method {
45    GET,
46    POST,
47    PUT,
48}
49
50impl Method {
51    pub fn from(m: &str) -> Result<Self, Box<dyn Error>> {
52        if m.eq_ignore_ascii_case("get") {
53            Ok(Method::GET)
54        } else if m.eq_ignore_ascii_case("post") {
55            Ok(Method::POST)
56        } else if m.eq_ignore_ascii_case("PUT") {
57            Ok(Method::PUT)
58        } else {
59            Err(Box::new(RumeterErr::new("method not supported")))
60        }
61    }
62
63    pub fn len(&self) -> u32{
64        match self {
65            Method::GET => 3,
66            Method::POST => 4,
67            Method::PUT => 3,
68        }
69    }
70}
71
72impl HttpSampler {
73    pub fn new(label: &str, url: &str, method: Method, headers: HeaderMap, body: Option<String>) -> Self {
74        Self { label: label.to_string(), url: url.to_string(), method, headers, body }
75    }
76
77    fn request_size(&self) -> u32 {
78        self.request_line_size() + self.request_headers_size() + self.request_body_size()
79    }
80
81    fn request_headers_size(&self) -> u32 {
82        let mut size = 0u32;
83        for (key, value) in self.headers.clone() {
84            match key {
85                Some(header_name) => {
86                    size = size + (header_name.to_string().len() + value.len() + ":\r\n".len()) as u32;
87                },
88                None => {},
89            }
90        }
91        size
92    }
93
94    fn request_line_size(&self) -> u32 {
95        self.method.len() + (self.url.len() + "  HTTP/1.1\r\n".len()) as u32
96    }
97
98    fn request_body_size(&self) -> u32 {
99        (self.body.clone().unwrap_or("".to_string()).len() + "\r\n".len()) as u32 
100    }
101}
102
103#[async_trait]
104impl Sampler for HttpSampler {
105    async fn run(&self) -> RecordData{
106        let client = reqwest::Client::new();
107        let s = self.clone();
108        let start_send_timestamp = chrono::Local::now();
109        let resp = match self.method {
110            Method::GET => client.get(s.url.clone()).headers(s.headers.clone()).send().await,
111            Method::POST => client.post(s.url.clone()).headers(s.headers.clone()).body(s.body.clone().unwrap()).send().await,
112            Method::PUT => client.put(s.url.clone()).headers(s.headers.clone()).body(s.body.clone().unwrap()).send().await,
113        };
114        
115        let finish_send_timestamp = chrono::Local::now();
116        match resp {
117            Ok(r) => {
118                let data_type = String::from("text");
119                let code = r.status().as_u16();
120                let resp_msg = r.status().canonical_reason().unwrap_or("Unknown");
121                let success = code < 400u16;
122                let fail_msg = if success {
123                    None
124                } else {
125                    Some(resp_msg.to_string())
126                };
127                let mut resp_headers: HashMap<String, String> = HashMap::new();
128                for (h_key, h_val) in r.headers() {
129                    resp_headers.insert(h_key.to_string(), h_val.to_str().unwrap().to_string());
130                }
131
132                let resp_body = r.text().await.unwrap_or("".to_string());
133
134                RecordData::new(
135                    start_send_timestamp.timestamp_millis() as u128,
136                    (finish_send_timestamp - start_send_timestamp).num_milliseconds() as u64,
137                    self.label.clone(),
138                    code,
139                    resp_msg.into(),
140                    "".to_string(),
141                    data_type,
142                    success,
143                    fail_msg,
144                    resp_body.len() as u64,
145                    s.request_size() as u64,
146                    0,
147                    0,
148                    s.url,
149                    (finish_send_timestamp - start_send_timestamp).num_milliseconds() as u64,
150                    0,
151                    0,
152                    Some(ResponseResult::new(resp_headers, resp_body)),
153                )
154
155            },
156            Err(e) => {
157                error!("failed! --> {}", e.to_string());
158                RecordData::new(
159                    start_send_timestamp.timestamp_millis() as u128,
160                    (finish_send_timestamp - start_send_timestamp).num_milliseconds() as u64,
161                    self.label.clone(),
162                    0,
163                    "no data".to_string(),
164                    "".to_string(),
165                    "no data".to_string(),
166                    false,
167                    Some(e.to_string()),
168                    0u64,
169                    s.request_size() as u64,
170                    0,
171                    0,
172                    s.url,
173                    (finish_send_timestamp - start_send_timestamp).num_milliseconds() as u64,
174                    0,
175                    0,
176                    None,
177                )
178            },
179        }
180    }
181
182}