1use eyre::{OptionExt, WrapErr};
4use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
5use std::{
6 ops::Deref,
7 sync::{Arc, Mutex},
8 time::{Duration, Instant},
9};
10use tokio::sync::broadcast;
11use tracing::*;
12
13tokio::task_local! {
14 pub static CHANNEL: Arc<Mutex<Option<broadcast::Sender<Log>>>>;
15}
16
17pub fn subscribe() -> eyre::Result<broadcast::Receiver<Log>> {
19 let ch = CHANNEL.get();
20 let Ok(guard) = ch.lock() else {
21 eyre::bail!("failed to acquire http channel lock");
22 };
23 let Some(tx) = guard.deref() else {
24 eyre::bail!("http channel has been already closed");
25 };
26
27 Ok(tx.subscribe())
28}
29
30#[derive(Debug, thiserror::Error)]
31pub enum Error {
32 #[error("HttpError: {0}")]
33 Http(#[from] reqwest::Error),
34 #[error("failed to deserialize http response into the specified type: {0}")]
35 Deserialize(#[from] serde_json::Error),
36 #[error("{0:#}")]
37 Unexpected(#[from] eyre::Error),
38}
39
40#[derive(Debug, Clone)]
41pub struct LogRequest {
42 pub url: reqwest::Url,
43 pub method: reqwest::Method,
44 pub headers: reqwest::header::HeaderMap,
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct LogResponse {
49 pub headers: reqwest::header::HeaderMap,
50 pub body: String,
51 pub status: reqwest::StatusCode,
52 pub duration_req: Duration,
53}
54
55#[derive(Debug, Clone)]
56pub struct Log {
57 pub request: LogRequest,
58 pub response: LogResponse,
59}
60
61#[derive(Debug, Clone)]
62pub struct Response {
63 pub headers: reqwest::header::HeaderMap,
64 pub status: reqwest::StatusCode,
65 pub text: String,
66}
67
68impl Response {
69 pub fn status(&self) -> reqwest::StatusCode {
70 self.status
71 }
72
73 pub fn headers(&self) -> &reqwest::header::HeaderMap {
74 &self.headers
75 }
76
77 pub async fn text(self) -> Result<String, Error> {
78 Ok(self.text)
79 }
80
81 pub async fn json<T: serde::de::DeserializeOwned>(self) -> Result<T, Error> {
82 Ok(serde_json::from_str(&self.text)?)
83 }
84
85 async fn from(res: reqwest::Response) -> Self {
86 Response {
87 headers: res.headers().clone(),
88 status: res.status(),
89 text: res.text().await.unwrap_or_default(),
90 }
91 }
92}
93
94#[derive(Clone, Default)]
96pub struct Client {
97 pub(crate) inner: reqwest::Client,
98}
99
100impl Client {
101 pub fn new() -> Client {
103 Client::default()
104 }
105
106 pub fn get(&self, url: impl reqwest::IntoUrl) -> RequestBuilder {
107 let url = url.into_url().unwrap();
108 debug!("Requesting {url}");
109 RequestBuilder {
110 inner: Some(self.inner.get(url)),
111 client: self.inner.clone(),
112 }
113 }
114
115 pub fn post(&self, url: impl reqwest::IntoUrl) -> RequestBuilder {
116 let url = url.into_url().unwrap();
117 debug!("Requesting {url}");
118 RequestBuilder {
119 inner: Some(self.inner.post(url)),
120 client: self.inner.clone(),
121 }
122 }
123}
124
125pub struct RequestBuilder {
126 pub(crate) inner: Option<reqwest::RequestBuilder>,
127 pub(crate) client: reqwest::Client,
128}
129
130impl RequestBuilder {
131 pub fn header<K, V>(mut self, key: K, value: V) -> RequestBuilder
132 where
133 HeaderName: TryFrom<K>,
134 <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
135 HeaderValue: TryFrom<V>,
136 <HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
137 {
138 let inner = self.inner.take().expect("inner missing");
139 self.inner = Some(inner.header(key, value));
140 self
141 }
142
143 pub fn headers(mut self, headers: HeaderMap) -> RequestBuilder {
144 let inner = self.inner.take().expect("inner missing");
145 self.inner = Some(inner.headers(headers));
146 self
147 }
148
149 pub fn basic_auth<U, P>(mut self, username: U, password: Option<P>) -> RequestBuilder
150 where
151 U: std::fmt::Display,
152 P: std::fmt::Display,
153 {
154 let inner = self.inner.take().expect("inner missing");
155 self.inner = Some(inner.basic_auth(username, password));
156 self
157 }
158
159 pub fn bearer_auth<T>(mut self, token: T) -> RequestBuilder
160 where
161 T: std::fmt::Display,
162 {
163 let inner = self.inner.take().expect("inner missing");
164 self.inner = Some(inner.bearer_auth(token));
165 self
166 }
167
168 pub fn query<T: serde::Serialize + ?Sized>(mut self, query: &T) -> RequestBuilder {
169 let inner = self.inner.take().expect("inner missing");
170 self.inner = Some(inner.query(query));
171 self
172 }
173
174 pub fn form<T: serde::Serialize + ?Sized>(mut self, form: &T) -> RequestBuilder {
175 let inner = self.inner.take().expect("inner missing");
176 self.inner = Some(inner.form(form));
177 self
178 }
179
180 #[cfg(feature = "json")]
181 pub fn json<T: serde::Serialize + ?Sized>(mut self, json: &T) -> RequestBuilder {
182 self.inner = self.inner.take().map(|inner| inner.json(json));
183 self
184 }
185
186 #[cfg(feature = "multipart")]
187 pub fn multipart(mut self, multipart: reqwest::multipart::Form) -> RequestBuilder {
188 let inner = self.inner.take().expect("inner missing");
189 self.inner = Some(inner.multipart(multipart));
190 self
191 }
192
193 pub async fn send(mut self) -> Result<Response, Error> {
194 let req = self.inner.take().ok_or_eyre("inner missing")?.build()?;
195
196 let log_request = LogRequest {
197 url: req.url().clone(),
198 method: req.method().clone(),
199 headers: req.headers().clone(),
200 };
201
202 let time_req = Instant::now();
203 let res = self.client.execute(req).await;
204
205 match res {
206 Ok(res) => {
207 let res = Response::from(res).await;
208 let duration_req = time_req.elapsed();
209
210 let log_response = LogResponse {
211 headers: res.headers.clone(),
212 body: res.text.clone(),
213 status: res.status(),
214 duration_req,
215 };
216
217 let ch = CHANNEL.get();
218 let Ok(guard) = ch.lock() else {
219 return Err(eyre::eyre!("failed to acquire http channel lock").into());
220 };
221 if let Some(tx) = guard.deref() {
222 tx.send(Log {
223 request: log_request,
224 response: log_response,
225 })
226 .wrap_err("failed to send a message to http channel")?;
227 }
228 Ok(res)
229 }
230 Err(e) => {
231 let ch = CHANNEL.get();
232 let Ok(guard) = ch.lock() else {
233 return Err(eyre::eyre!("failed to acquire http channel lock").into());
234 };
235 if let Some(tx) = guard.deref() {
236 tx.send(Log {
237 request: log_request,
238 response: LogResponse::default(),
239 })
240 .wrap_err("failed to send a message to http channel")?;
241 }
242 Err(e.into())
243 }
244 }
245 }
246
247 pub fn timeout(mut self, timeout: std::time::Duration) -> RequestBuilder {
248 let inner = self.inner.take().expect("inner missing");
249 self.inner = Some(inner.timeout(timeout));
250 self
251 }
252
253 pub fn try_clone(&self) -> Option<RequestBuilder> {
254 let inner = self.inner.as_ref()?;
255 Some(RequestBuilder {
256 inner: Some(inner.try_clone()?),
257 client: self.client.clone(),
258 })
259 }
260
261 pub fn version(mut self, version: reqwest::Version) -> RequestBuilder {
262 let inner = self.inner.take().expect("inner missing");
263 self.inner = Some(inner.version(version));
264 self
265 }
266}