1use core::{
4 convert::{TryFrom, TryInto},
5 str::FromStr,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use reqwest::{header, Proxy};
11
12use cometbft::{block::Height, evidence::Evidence, Hash};
13use cometbft_config::net;
14
15use crate::client::{Client, CompatMode};
16use crate::dialect::{v0_34, v0_37, v0_38, Dialect, LatestDialect};
17use crate::endpoint;
18use crate::prelude::*;
19use crate::query::Query;
20use crate::request::RequestMessage;
21use crate::response::Response;
22use crate::{Error, Order, Scheme, SimpleRequest, Url};
23
24use super::auth;
25
26const USER_AGENT: &str = concat!("cometbft.rs/", env!("CARGO_PKG_VERSION"));
27
28#[derive(Debug, Clone)]
55pub struct HttpClient {
56 inner: reqwest::Client,
57 url: reqwest::Url,
58 compat: CompatMode,
59}
60
61pub struct Builder {
63 url: HttpClientUrl,
64 compat: CompatMode,
65 proxy_url: Option<HttpClientUrl>,
66 user_agent: Option<String>,
67 timeout: Duration,
68 client: Option<reqwest::Client>,
69}
70
71impl Builder {
72 pub fn compat_mode(mut self, mode: CompatMode) -> Self {
76 self.compat = mode;
77 self
78 }
79
80 pub fn proxy_url(mut self, url: HttpClientUrl) -> Self {
87 self.proxy_url = Some(url);
88 self
89 }
90
91 pub fn timeout(mut self, duration: Duration) -> Self {
96 self.timeout = duration;
97 self
98 }
99
100 pub fn user_agent(mut self, agent: String) -> Self {
102 self.user_agent = Some(agent);
103 self
104 }
105
106 pub fn client(mut self, client: reqwest::Client) -> Self {
112 self.client = Some(client);
113 self
114 }
115
116 pub fn build(self) -> Result<HttpClient, Error> {
118 let inner = if let Some(inner) = self.client {
119 inner
120 } else {
121 let builder = reqwest::ClientBuilder::new()
122 .user_agent(self.user_agent.unwrap_or_else(|| USER_AGENT.to_string()))
123 .timeout(self.timeout);
124
125 match self.proxy_url {
126 None => builder.build().map_err(Error::http)?,
127 Some(proxy_url) => {
128 let proxy = if self.url.0.is_secure() {
129 Proxy::https(reqwest::Url::from(proxy_url.0))
130 .map_err(Error::invalid_proxy)?
131 } else {
132 Proxy::http(reqwest::Url::from(proxy_url.0))
133 .map_err(Error::invalid_proxy)?
134 };
135 builder.proxy(proxy).build().map_err(Error::http)?
136 },
137 }
138 };
139
140 Ok(HttpClient {
141 inner,
142 url: self.url.into(),
143 compat: self.compat,
144 })
145 }
146}
147
148impl HttpClient {
149 pub fn new_from_parts(inner: reqwest::Client, url: reqwest::Url, compat: CompatMode) -> Self {
153 Self { inner, url, compat }
154 }
155
156 pub fn new<U>(url: U) -> Result<Self, Error>
159 where
160 U: TryInto<HttpClientUrl, Error = Error>,
161 {
162 let url = url.try_into()?;
163 Self::builder(url).build()
164 }
165
166 pub fn new_with_proxy<U, P>(url: U, proxy_url: P) -> Result<Self, Error>
174 where
175 U: TryInto<HttpClientUrl, Error = Error>,
176 P: TryInto<HttpClientUrl, Error = Error>,
177 {
178 let url = url.try_into()?;
179 Self::builder(url).proxy_url(proxy_url.try_into()?).build()
180 }
181
182 pub fn builder(url: HttpClientUrl) -> Builder {
186 Builder {
187 url,
188 compat: Default::default(),
189 proxy_url: None,
190 user_agent: None,
191 timeout: Duration::from_secs(30),
192 client: None,
193 }
194 }
195
196 pub fn set_compat_mode(&mut self, compat: CompatMode) {
202 self.compat = compat;
203 }
204
205 fn build_request<R>(&self, request: R) -> Result<reqwest::Request, Error>
206 where
207 R: RequestMessage,
208 {
209 let request_body = request.into_json();
210
211 tracing::debug!(url = %self.url, body = %request_body, "outgoing request");
212
213 let mut builder = self
214 .inner
215 .post(auth::strip_authority(self.url.clone()))
216 .header(header::CONTENT_TYPE, "application/json")
217 .body(request_body.into_bytes());
218
219 if let Some(auth) = auth::authorize(&self.url) {
220 builder = builder.header(header::AUTHORIZATION, auth.to_string());
221 }
222
223 builder.build().map_err(Error::http)
224 }
225
226 async fn perform_with_dialect<R, S>(&self, request: R, _dialect: S) -> Result<R::Output, Error>
227 where
228 R: SimpleRequest<S>,
229 S: Dialect,
230 {
231 let request = self.build_request(request)?;
232 let response = self.inner.execute(request).await.map_err(Error::http)?;
233 let response_status = response.status();
234 let response_body = response.bytes().await.map_err(Error::http)?;
235
236 tracing::debug!(
237 status = %response_status,
238 body = %String::from_utf8_lossy(&response_body),
239 "incoming response"
240 );
241
242 if response_status != reqwest::StatusCode::OK {
247 return Err(Error::http_request_failed(response_status));
248 }
249
250 R::Response::from_string(&response_body).map(Into::into)
251 }
252}
253
254#[async_trait]
255impl Client for HttpClient {
256 async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
257 where
258 R: SimpleRequest,
259 {
260 self.perform_with_dialect(request, LatestDialect).await
261 }
262
263 async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
264 where
265 H: Into<Height> + Send,
266 {
267 perform_with_compat!(self, endpoint::block::Request::new(height.into()))
268 }
269
270 async fn block_by_hash(
271 &self,
272 hash: cometbft::Hash,
273 ) -> Result<endpoint::block_by_hash::Response, Error> {
274 perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
275 }
276
277 async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
278 perform_with_compat!(self, endpoint::block::Request::default())
279 }
280
281 async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
282 where
283 H: Into<Height> + Send,
284 {
285 perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
286 }
287
288 async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
289 perform_with_compat!(self, endpoint::block_results::Request::default())
290 }
291
292 async fn block_search(
293 &self,
294 query: Query,
295 page: u32,
296 per_page: u8,
297 order: Order,
298 ) -> Result<endpoint::block_search::Response, Error> {
299 perform_with_compat!(
300 self,
301 endpoint::block_search::Request::new(query, page, per_page, order)
302 )
303 }
304
305 async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
306 where
307 H: Into<Height> + Send,
308 {
309 let height = height.into();
310 match self.compat {
311 CompatMode::V0_38 => {
312 self.perform_with_dialect(endpoint::header::Request::new(height), v0_38::Dialect)
313 .await
314 },
315 CompatMode::V0_37 => {
316 self.perform_with_dialect(endpoint::header::Request::new(height), v0_37::Dialect)
317 .await
318 },
319 CompatMode::V0_34 => {
320 let resp = self
323 .perform_with_dialect(endpoint::block::Request::new(height), v0_34::Dialect)
324 .await?;
325 Ok(resp.into())
326 },
327 }
328 }
329
330 async fn header_by_hash(
331 &self,
332 hash: Hash,
333 ) -> Result<endpoint::header_by_hash::Response, Error> {
334 match self.compat {
335 CompatMode::V0_38 => {
336 self.perform_with_dialect(
337 endpoint::header_by_hash::Request::new(hash),
338 v0_38::Dialect,
339 )
340 .await
341 },
342 CompatMode::V0_37 => {
343 self.perform_with_dialect(
344 endpoint::header_by_hash::Request::new(hash),
345 v0_37::Dialect,
346 )
347 .await
348 },
349 CompatMode::V0_34 => {
350 let resp = self
353 .perform_with_dialect(
354 endpoint::block_by_hash::Request::new(hash),
355 v0_34::Dialect,
356 )
357 .await?;
358 Ok(resp.into())
359 },
360 }
361 }
362
363 async fn broadcast_evidence(
365 &self,
366 evidence: Evidence,
367 ) -> Result<endpoint::evidence::Response, Error> {
368 match self.compat {
369 CompatMode::V0_38 => {
370 let request = endpoint::evidence::Request::new(evidence);
371 self.perform_with_dialect(request, crate::dialect::v0_38::Dialect)
372 .await
373 },
374 CompatMode::V0_37 => {
375 let request = endpoint::evidence::Request::new(evidence);
376 self.perform_with_dialect(request, crate::dialect::v0_37::Dialect)
377 .await
378 },
379 CompatMode::V0_34 => {
380 let request = endpoint::evidence::Request::new(evidence);
381 self.perform_with_dialect(request, crate::dialect::v0_34::Dialect)
382 .await
383 },
384 }
385 }
386
387 async fn tx(&self, hash: Hash, prove: bool) -> Result<endpoint::tx::Response, Error> {
388 perform_with_compat!(self, endpoint::tx::Request::new(hash, prove))
389 }
390
391 async fn tx_search(
392 &self,
393 query: Query,
394 prove: bool,
395 page: u32,
396 per_page: u8,
397 order: Order,
398 ) -> Result<endpoint::tx_search::Response, Error> {
399 perform_with_compat!(
400 self,
401 endpoint::tx_search::Request::new(query, prove, page, per_page, order)
402 )
403 }
404
405 async fn broadcast_tx_commit<T>(
406 &self,
407 tx: T,
408 ) -> Result<endpoint::broadcast::tx_commit::Response, Error>
409 where
410 T: Into<Vec<u8>> + Send,
411 {
412 perform_with_compat!(self, endpoint::broadcast::tx_commit::Request::new(tx))
413 }
414}
415
416#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
420pub struct HttpClientUrl(Url);
421
422impl TryFrom<Url> for HttpClientUrl {
423 type Error = Error;
424
425 fn try_from(value: Url) -> Result<Self, Error> {
426 match value.scheme() {
427 Scheme::Http | Scheme::Https => Ok(Self(value)),
428 _ => Err(Error::invalid_url(value)),
429 }
430 }
431}
432
433impl FromStr for HttpClientUrl {
434 type Err = Error;
435
436 fn from_str(s: &str) -> Result<Self, Error> {
437 let url: Url = s.parse()?;
438 url.try_into()
439 }
440}
441
442impl TryFrom<&str> for HttpClientUrl {
443 type Error = Error;
444
445 fn try_from(value: &str) -> Result<Self, Error> {
446 value.parse()
447 }
448}
449
450impl TryFrom<net::Address> for HttpClientUrl {
451 type Error = Error;
452
453 fn try_from(value: net::Address) -> Result<Self, Error> {
454 match value {
455 net::Address::Tcp {
456 peer_id: _,
457 host,
458 port,
459 } => format!("http://{host}:{port}").parse(),
460 net::Address::Unix { .. } => Err(Error::invalid_network_address()),
461 }
462 }
463}
464
465impl From<HttpClientUrl> for Url {
466 fn from(url: HttpClientUrl) -> Self {
467 url.0
468 }
469}
470
471impl From<HttpClientUrl> for url::Url {
472 fn from(url: HttpClientUrl) -> Self {
473 url.0.into()
474 }
475}
476
477#[cfg(test)]
478mod tests {
479 use core::str::FromStr;
480
481 use reqwest::{header::AUTHORIZATION, Request};
482
483 use super::HttpClient;
484 use crate::endpoint::abci_info;
485 use crate::Url;
486
487 fn authorization(req: &Request) -> Option<&str> {
488 req.headers()
489 .get(AUTHORIZATION)
490 .map(|h| h.to_str().unwrap())
491 }
492
493 #[test]
494 fn without_basic_auth() {
495 let url = Url::from_str("http://example.com").unwrap();
496 let client = HttpClient::new(url).unwrap();
497 let req = HttpClient::build_request(&client, abci_info::Request).unwrap();
498
499 assert_eq!(authorization(&req), None);
500 }
501
502 #[test]
503 fn with_basic_auth() {
504 let url = Url::from_str("http://toto:tata@example.com").unwrap();
505 let client = HttpClient::new(url).unwrap();
506 let req = HttpClient::build_request(&client, abci_info::Request).unwrap();
507
508 assert_eq!(authorization(&req), Some("Basic dG90bzp0YXRh"));
509 let num_auth_headers = req
510 .headers()
511 .iter()
512 .filter(|h| h.0 == AUTHORIZATION)
513 .count();
514 assert_eq!(num_auth_headers, 1);
515 }
516}