iq_cometbft_rpc/client/transport/
http.rs

1//! HTTP-based transport for CometBFT RPC Client.
2
3use core::{
4    convert::{TryFrom, TryInto},
5    str::FromStr,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use reqwest::{header, Proxy};
11
12use cometbft::{block::Height, evidence::Evidence, Hash};
13use cometbft_config::net;
14
15use crate::client::{Client, CompatMode};
16use crate::dialect::{v0_34, v0_37, v0_38, Dialect, LatestDialect};
17use crate::endpoint;
18use crate::prelude::*;
19use crate::query::Query;
20use crate::request::RequestMessage;
21use crate::response::Response;
22use crate::{Error, Order, Scheme, SimpleRequest, Url};
23
24use super::auth;
25
26const USER_AGENT: &str = concat!("cometbft.rs/", env!("CARGO_PKG_VERSION"));
27
28/// A JSON-RPC/HTTP CometBFT RPC client (implements [`crate::Client`]).
29///
30/// Supports both HTTP and HTTPS connections to CometBFT RPC endpoints, and
31/// allows for the use of HTTP proxies (see [`HttpClient::new_with_proxy`] for
32/// details).
33///
34/// Does not provide [`crate::event::Event`] subscription facilities (see
35/// [`crate::WebSocketClient`] for a client that does).
36///
37/// ## Examples
38///
39/// ```rust,ignore
40/// use cometbft_rpc::{HttpClient, Client};
41///
42/// #[tokio::main]
43/// async fn main() {
44///     let client = HttpClient::new("http://127.0.0.1:26657")
45///         .unwrap();
46///
47///     let abci_info = client.abci_info()
48///         .await
49///         .unwrap();
50///
51///     println!("Got ABCI info: {:?}", abci_info);
52/// }
53/// ```
54#[derive(Debug, Clone)]
55pub struct HttpClient {
56    inner: reqwest::Client,
57    url: reqwest::Url,
58    compat: CompatMode,
59}
60
61/// The builder pattern constructor for [`HttpClient`].
62pub struct Builder {
63    url: HttpClientUrl,
64    compat: CompatMode,
65    proxy_url: Option<HttpClientUrl>,
66    user_agent: Option<String>,
67    timeout: Duration,
68    client: Option<reqwest::Client>,
69}
70
71impl Builder {
72    /// Use the specified compatibility mode for the CometBFT RPC protocol.
73    ///
74    /// The default is the latest protocol version supported by this crate.
75    pub fn compat_mode(mut self, mode: CompatMode) -> Self {
76        self.compat = mode;
77        self
78    }
79
80    /// Specify the URL of a proxy server for the client to connect through.
81    ///
82    /// If the RPC endpoint is secured (HTTPS), the proxy will automatically
83    /// attempt to connect using the [HTTP CONNECT] method.
84    ///
85    /// [HTTP CONNECT]: https://en.wikipedia.org/wiki/HTTP_tunnel
86    pub fn proxy_url(mut self, url: HttpClientUrl) -> Self {
87        self.proxy_url = Some(url);
88        self
89    }
90
91    /// The timeout is applied from when the request starts connecting until
92    /// the response body has finished.
93    ///
94    /// The default is 30 seconds.
95    pub fn timeout(mut self, duration: Duration) -> Self {
96        self.timeout = duration;
97        self
98    }
99
100    /// Specify the custom User-Agent header used by the client.
101    pub fn user_agent(mut self, agent: String) -> Self {
102        self.user_agent = Some(agent);
103        self
104    }
105
106    /// Use the provided client instead of building one internally.
107    ///
108    /// ## Warning
109    /// This will override the following options set on the builder:
110    /// `timeout`, `user_agent`, and `proxy_url`.
111    pub fn client(mut self, client: reqwest::Client) -> Self {
112        self.client = Some(client);
113        self
114    }
115
116    /// Try to create a client with the options specified for this builder.
117    pub fn build(self) -> Result<HttpClient, Error> {
118        let inner = if let Some(inner) = self.client {
119            inner
120        } else {
121            let builder = reqwest::ClientBuilder::new()
122                .user_agent(self.user_agent.unwrap_or_else(|| USER_AGENT.to_string()))
123                .timeout(self.timeout);
124
125            match self.proxy_url {
126                None => builder.build().map_err(Error::http)?,
127                Some(proxy_url) => {
128                    let proxy = if self.url.0.is_secure() {
129                        Proxy::https(reqwest::Url::from(proxy_url.0))
130                            .map_err(Error::invalid_proxy)?
131                    } else {
132                        Proxy::http(reqwest::Url::from(proxy_url.0))
133                            .map_err(Error::invalid_proxy)?
134                    };
135                    builder.proxy(proxy).build().map_err(Error::http)?
136                },
137            }
138        };
139
140        Ok(HttpClient {
141            inner,
142            url: self.url.into(),
143            compat: self.compat,
144        })
145    }
146}
147
148impl HttpClient {
149    /// Construct a new Tendermint RPC HTTP/S client connecting to the given
150    /// URL. This avoids using the `Builder` and thus does not perform any
151    /// validation of the configuration.
152    pub fn new_from_parts(inner: reqwest::Client, url: reqwest::Url, compat: CompatMode) -> Self {
153        Self { inner, url, compat }
154    }
155
156    /// Construct a new CometBFT RPC HTTP/S client connecting to the given
157    /// URL.
158    pub fn new<U>(url: U) -> Result<Self, Error>
159    where
160        U: TryInto<HttpClientUrl, Error = Error>,
161    {
162        let url = url.try_into()?;
163        Self::builder(url).build()
164    }
165
166    /// Construct a new CometBFT RPC HTTP/S client connecting to the given
167    /// URL, but via the specified proxy's URL.
168    ///
169    /// If the RPC endpoint is secured (HTTPS), the proxy will automatically
170    /// attempt to connect using the [HTTP CONNECT] method.
171    ///
172    /// [HTTP CONNECT]: https://en.wikipedia.org/wiki/HTTP_tunnel
173    pub fn new_with_proxy<U, P>(url: U, proxy_url: P) -> Result<Self, Error>
174    where
175        U: TryInto<HttpClientUrl, Error = Error>,
176        P: TryInto<HttpClientUrl, Error = Error>,
177    {
178        let url = url.try_into()?;
179        Self::builder(url).proxy_url(proxy_url.try_into()?).build()
180    }
181
182    /// Initiate a builder for a CometBFT RPC HTTP/S client connecting
183    /// to the given URL, so that more configuration options can be specified
184    /// with the builder.
185    pub fn builder(url: HttpClientUrl) -> Builder {
186        Builder {
187            url,
188            compat: Default::default(),
189            proxy_url: None,
190            user_agent: None,
191            timeout: Duration::from_secs(30),
192            client: None,
193        }
194    }
195
196    /// Set compatibility mode on the instantiated client.
197    ///
198    /// As the HTTP client is stateless and does not support subscriptions,
199    /// the protocol version it uses can be changed at will, for example,
200    /// as a result of version discovery over the `/status` endpoint.
201    pub fn set_compat_mode(&mut self, compat: CompatMode) {
202        self.compat = compat;
203    }
204
205    fn build_request<R>(&self, request: R) -> Result<reqwest::Request, Error>
206    where
207        R: RequestMessage,
208    {
209        let request_body = request.into_json();
210
211        tracing::debug!(url = %self.url, body = %request_body, "outgoing request");
212
213        let mut builder = self
214            .inner
215            .post(auth::strip_authority(self.url.clone()))
216            .header(header::CONTENT_TYPE, "application/json")
217            .body(request_body.into_bytes());
218
219        if let Some(auth) = auth::authorize(&self.url) {
220            builder = builder.header(header::AUTHORIZATION, auth.to_string());
221        }
222
223        builder.build().map_err(Error::http)
224    }
225
226    async fn perform_with_dialect<R, S>(&self, request: R, _dialect: S) -> Result<R::Output, Error>
227    where
228        R: SimpleRequest<S>,
229        S: Dialect,
230    {
231        let request = self.build_request(request)?;
232        let response = self.inner.execute(request).await.map_err(Error::http)?;
233        let response_status = response.status();
234        let response_body = response.bytes().await.map_err(Error::http)?;
235
236        tracing::debug!(
237            status = %response_status,
238            body = %String::from_utf8_lossy(&response_body),
239            "incoming response"
240        );
241
242        // Successful JSON-RPC requests are expected to return a 200 OK HTTP status.
243        // Otherwise, this means that the HTTP request failed as a whole,
244        // as opposed to the JSON-RPC request returning an error,
245        // and we cannot expect the response body to be a valid JSON-RPC response.
246        if response_status != reqwest::StatusCode::OK {
247            return Err(Error::http_request_failed(response_status));
248        }
249
250        R::Response::from_string(&response_body).map(Into::into)
251    }
252}
253
254#[async_trait]
255impl Client for HttpClient {
256    async fn perform<R>(&self, request: R) -> Result<R::Output, Error>
257    where
258        R: SimpleRequest,
259    {
260        self.perform_with_dialect(request, LatestDialect).await
261    }
262
263    async fn block<H>(&self, height: H) -> Result<endpoint::block::Response, Error>
264    where
265        H: Into<Height> + Send,
266    {
267        perform_with_compat!(self, endpoint::block::Request::new(height.into()))
268    }
269
270    async fn block_by_hash(
271        &self,
272        hash: cometbft::Hash,
273    ) -> Result<endpoint::block_by_hash::Response, Error> {
274        perform_with_compat!(self, endpoint::block_by_hash::Request::new(hash))
275    }
276
277    async fn latest_block(&self) -> Result<endpoint::block::Response, Error> {
278        perform_with_compat!(self, endpoint::block::Request::default())
279    }
280
281    async fn block_results<H>(&self, height: H) -> Result<endpoint::block_results::Response, Error>
282    where
283        H: Into<Height> + Send,
284    {
285        perform_with_compat!(self, endpoint::block_results::Request::new(height.into()))
286    }
287
288    async fn latest_block_results(&self) -> Result<endpoint::block_results::Response, Error> {
289        perform_with_compat!(self, endpoint::block_results::Request::default())
290    }
291
292    async fn block_search(
293        &self,
294        query: Query,
295        page: u32,
296        per_page: u8,
297        order: Order,
298    ) -> Result<endpoint::block_search::Response, Error> {
299        perform_with_compat!(
300            self,
301            endpoint::block_search::Request::new(query, page, per_page, order)
302        )
303    }
304
305    async fn header<H>(&self, height: H) -> Result<endpoint::header::Response, Error>
306    where
307        H: Into<Height> + Send,
308    {
309        let height = height.into();
310        match self.compat {
311            CompatMode::V0_38 => {
312                self.perform_with_dialect(endpoint::header::Request::new(height), v0_38::Dialect)
313                    .await
314            },
315            CompatMode::V0_37 => {
316                self.perform_with_dialect(endpoint::header::Request::new(height), v0_37::Dialect)
317                    .await
318            },
319            CompatMode::V0_34 => {
320                // Back-fill with a request to /block endpoint and
321                // taking just the header from the response.
322                let resp = self
323                    .perform_with_dialect(endpoint::block::Request::new(height), v0_34::Dialect)
324                    .await?;
325                Ok(resp.into())
326            },
327        }
328    }
329
330    async fn header_by_hash(
331        &self,
332        hash: Hash,
333    ) -> Result<endpoint::header_by_hash::Response, Error> {
334        match self.compat {
335            CompatMode::V0_38 => {
336                self.perform_with_dialect(
337                    endpoint::header_by_hash::Request::new(hash),
338                    v0_38::Dialect,
339                )
340                .await
341            },
342            CompatMode::V0_37 => {
343                self.perform_with_dialect(
344                    endpoint::header_by_hash::Request::new(hash),
345                    v0_37::Dialect,
346                )
347                .await
348            },
349            CompatMode::V0_34 => {
350                // Back-fill with a request to /block_by_hash endpoint and
351                // taking just the header from the response.
352                let resp = self
353                    .perform_with_dialect(
354                        endpoint::block_by_hash::Request::new(hash),
355                        v0_34::Dialect,
356                    )
357                    .await?;
358                Ok(resp.into())
359            },
360        }
361    }
362
363    /// `/broadcast_evidence`: broadcast an evidence.
364    async fn broadcast_evidence(
365        &self,
366        evidence: Evidence,
367    ) -> Result<endpoint::evidence::Response, Error> {
368        match self.compat {
369            CompatMode::V0_38 => {
370                let request = endpoint::evidence::Request::new(evidence);
371                self.perform_with_dialect(request, crate::dialect::v0_38::Dialect)
372                    .await
373            },
374            CompatMode::V0_37 => {
375                let request = endpoint::evidence::Request::new(evidence);
376                self.perform_with_dialect(request, crate::dialect::v0_37::Dialect)
377                    .await
378            },
379            CompatMode::V0_34 => {
380                let request = endpoint::evidence::Request::new(evidence);
381                self.perform_with_dialect(request, crate::dialect::v0_34::Dialect)
382                    .await
383            },
384        }
385    }
386
387    async fn tx(&self, hash: Hash, prove: bool) -> Result<endpoint::tx::Response, Error> {
388        perform_with_compat!(self, endpoint::tx::Request::new(hash, prove))
389    }
390
391    async fn tx_search(
392        &self,
393        query: Query,
394        prove: bool,
395        page: u32,
396        per_page: u8,
397        order: Order,
398    ) -> Result<endpoint::tx_search::Response, Error> {
399        perform_with_compat!(
400            self,
401            endpoint::tx_search::Request::new(query, prove, page, per_page, order)
402        )
403    }
404
405    async fn broadcast_tx_commit<T>(
406        &self,
407        tx: T,
408    ) -> Result<endpoint::broadcast::tx_commit::Response, Error>
409    where
410        T: Into<Vec<u8>> + Send,
411    {
412        perform_with_compat!(self, endpoint::broadcast::tx_commit::Request::new(tx))
413    }
414}
415
416/// A URL limited to use with HTTP clients.
417///
418/// Facilitates useful type conversions and inferences.
419#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
420pub struct HttpClientUrl(Url);
421
422impl TryFrom<Url> for HttpClientUrl {
423    type Error = Error;
424
425    fn try_from(value: Url) -> Result<Self, Error> {
426        match value.scheme() {
427            Scheme::Http | Scheme::Https => Ok(Self(value)),
428            _ => Err(Error::invalid_url(value)),
429        }
430    }
431}
432
433impl FromStr for HttpClientUrl {
434    type Err = Error;
435
436    fn from_str(s: &str) -> Result<Self, Error> {
437        let url: Url = s.parse()?;
438        url.try_into()
439    }
440}
441
442impl TryFrom<&str> for HttpClientUrl {
443    type Error = Error;
444
445    fn try_from(value: &str) -> Result<Self, Error> {
446        value.parse()
447    }
448}
449
450impl TryFrom<net::Address> for HttpClientUrl {
451    type Error = Error;
452
453    fn try_from(value: net::Address) -> Result<Self, Error> {
454        match value {
455            net::Address::Tcp {
456                peer_id: _,
457                host,
458                port,
459            } => format!("http://{host}:{port}").parse(),
460            net::Address::Unix { .. } => Err(Error::invalid_network_address()),
461        }
462    }
463}
464
465impl From<HttpClientUrl> for Url {
466    fn from(url: HttpClientUrl) -> Self {
467        url.0
468    }
469}
470
471impl From<HttpClientUrl> for url::Url {
472    fn from(url: HttpClientUrl) -> Self {
473        url.0.into()
474    }
475}
476
477#[cfg(test)]
478mod tests {
479    use core::str::FromStr;
480
481    use reqwest::{header::AUTHORIZATION, Request};
482
483    use super::HttpClient;
484    use crate::endpoint::abci_info;
485    use crate::Url;
486
487    fn authorization(req: &Request) -> Option<&str> {
488        req.headers()
489            .get(AUTHORIZATION)
490            .map(|h| h.to_str().unwrap())
491    }
492
493    #[test]
494    fn without_basic_auth() {
495        let url = Url::from_str("http://example.com").unwrap();
496        let client = HttpClient::new(url).unwrap();
497        let req = HttpClient::build_request(&client, abci_info::Request).unwrap();
498
499        assert_eq!(authorization(&req), None);
500    }
501
502    #[test]
503    fn with_basic_auth() {
504        let url = Url::from_str("http://toto:tata@example.com").unwrap();
505        let client = HttpClient::new(url).unwrap();
506        let req = HttpClient::build_request(&client, abci_info::Request).unwrap();
507
508        assert_eq!(authorization(&req), Some("Basic dG90bzp0YXRh"));
509        let num_auth_headers = req
510            .headers()
511            .iter()
512            .filter(|h| h.0 == AUTHORIZATION)
513            .count();
514        assert_eq!(num_auth_headers, 1);
515    }
516}