barter_integration/protocol/http/rest/
client.rs1use crate::{
2 error::SocketError,
3 metric::{Field, Metric, Tag},
4 protocol::http::{BuildStrategy, HttpParser, rest::RestRequest},
5};
6use bytes::Bytes;
7use chrono::Utc;
8
9#[derive(Debug)]
15pub struct RestClient<Strategy, Parser> {
16 pub http_client: reqwest::Client,
18
19 pub base_url: Box<str>,
21
22 pub strategy: Strategy,
30
31 pub parser: Parser,
34}
35
36impl<Strategy, Parser> RestClient<Strategy, Parser>
37where
38 Strategy: BuildStrategy,
39 Parser: HttpParser,
40{
41 pub async fn execute<Request>(
43 &self,
44 request: Request,
45 ) -> Result<(Request::Response, Metric), Parser::OutputError>
46 where
47 Request: RestRequest,
48 {
49 let request = self.build(request)?;
51
52 let (status, payload, latency) = self.measured_execution::<Request>(request).await?;
54
55 self.parser
57 .parse::<Request::Response>(status, &payload)
58 .map(|response| (response, latency))
59 }
60
61 pub fn build<Request>(&self, request: Request) -> Result<reqwest::Request, SocketError>
63 where
64 Request: RestRequest,
65 {
66 let url = format!("{}{}", self.base_url, request.path());
68
69 let mut builder = self
71 .http_client
72 .request(Request::method(), url)
73 .timeout(Request::timeout());
74
75 if let Some(query_params) = request.query_params() {
77 builder = builder.query(query_params);
78 }
79
80 if let Some(body) = request.body() {
82 builder = builder.json(body);
83 }
84
85 self.strategy.build(request, builder)
87 }
88
89 pub async fn measured_execution<Request>(
93 &self,
94 request: reqwest::Request,
95 ) -> Result<(reqwest::StatusCode, Bytes, Metric), SocketError>
96 where
97 Request: RestRequest,
98 {
99 let mut latency = Metric {
101 name: "http_request_duration",
102 time: Utc::now().timestamp_millis() as u64,
103 tags: vec![
104 Tag::new("http_method", Request::method().as_str()),
105 Tag::new("base_url", self.base_url.as_ref()),
106 Tag::new("path", request.url().path()),
107 ],
108 fields: Vec::with_capacity(1),
109 };
110
111 let start = std::time::Instant::now();
113 let response = self.http_client.execute(request).await?;
114 let duration = start.elapsed().as_millis() as u64;
115
116 latency
118 .tags
119 .push(Tag::new("status_code", response.status().as_str()));
120 latency.fields.push(Field::new("duration", duration));
121
122 let status_code = response.status();
124 let payload = response.bytes().await?;
125
126 Ok((status_code, payload, latency))
127 }
128}
129
130impl<Strategy, Parser> RestClient<Strategy, Parser> {
131 pub fn new(base_url: impl Into<Box<str>>, strategy: Strategy, parser: Parser) -> Self {
133 Self {
134 http_client: reqwest::Client::new(),
135 base_url: base_url.into(),
136 strategy,
137 parser,
138 }
139 }
140}