Skip to main content

layer_climb_core/network/
rpc.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use tendermint_rpc::Response;
7
8cfg_if::cfg_if! {
9    if #[cfg(target_arch = "wasm32")] {
10        #[async_trait(?Send)]
11        pub trait RpcTransport: Send + Sync {
12            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String>;
13        }
14
15        cfg_if::cfg_if! {
16            if #[cfg(target_os = "unknown")] {
17                #[async_trait(?Send)]
18                impl RpcTransport for reqwest::Client {
19                    async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
20                        self.post(url)
21                            .header("Content-Type", "application/json")
22                            .body(body)
23                            .send()
24                            .await
25                            .map_err(|e| anyhow!("{}", e))?
26                            .text()
27                            .await
28                            .map_err(|e| anyhow!("{}", e))
29                    }
30                }
31            } else {
32                use wstd::{
33                    http::{Body, Client, Request, StatusCode},
34                };
35
36                pub struct WasiRpcTransport {}
37
38                // prior art, cloudflare does this trick too: https://github.com/cloudflare/workers-rs/blob/38af58acc4e54b29c73336c1720188f3c3e86cc4/worker/src/send.rs#L32
39                unsafe impl Sync for WasiRpcTransport {}
40                unsafe impl Send for WasiRpcTransport {}
41
42                #[async_trait(?Send)]
43                impl RpcTransport for WasiRpcTransport {
44                    async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
45                        let request = Request::post(url)
46                            .header("content-type", "application/json")
47                            .body(Body::from(body))?;
48                        let mut res = Client::new().send(request).await?;
49
50                        match res.status() {
51                            StatusCode::OK => {
52                                let body_bytes = res.body_mut().contents().await?;
53                                String::from_utf8(body_bytes.to_vec()).map_err(|err| anyhow::anyhow!(err))
54                            },
55                            status => Err(anyhow!("unexpected status code: {status}")),
56                        }
57                    }
58                }
59            }
60        }
61    } else {
62        #[async_trait]
63        pub trait RpcTransport: Send + Sync {
64            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String>;
65        }
66
67        #[async_trait]
68        impl RpcTransport for reqwest::Client {
69            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
70                self.post(url)
71                    .header("Content-Type", "application/json")
72                    .body(body)
73                    .send()
74                    .await
75                    .map_err(|e| anyhow!("{}", e))?
76                    .text()
77                    .await
78                    .map_err(|e| anyhow!("{}", e))
79            }
80        }
81    }
82}
83
84#[derive(Clone)]
85pub struct RpcClient {
86    http_client: Arc<dyn RpcTransport>,
87    url: String,
88}
89
90impl RpcClient {
91    pub fn new(url: String, http_client: Arc<dyn RpcTransport>) -> Self {
92        Self { url, http_client }
93    }
94
95    pub async fn commit(&self, height: u64) -> Result<tendermint_rpc::endpoint::commit::Response> {
96        let height = tendermint::block::Height::try_from(height)?;
97        self.send(tendermint_rpc::endpoint::commit::Request::new(height))
98            .await
99    }
100
101    pub async fn broadcast_tx(
102        &self,
103        tx: Vec<u8>,
104        mode: layer_climb_proto::tx::BroadcastMode,
105    ) -> Result<TxResponse> {
106        match mode {
107            layer_climb_proto::tx::BroadcastMode::Sync
108            | layer_climb_proto::tx::BroadcastMode::Block => self
109                .send(tendermint_rpc::endpoint::broadcast::tx_sync::Request::new(
110                    tx,
111                ))
112                .await
113                .map(|resp| resp.into()),
114            layer_climb_proto::tx::BroadcastMode::Async => self
115                .send(tendermint_rpc::endpoint::broadcast::tx_async::Request::new(
116                    tx,
117                ))
118                .await
119                .map(|resp| resp.into()),
120            layer_climb_proto::tx::BroadcastMode::Unspecified => {
121                Err(anyhow!("broadcast mode unspecified"))
122            }
123        }
124    }
125
126    pub async fn block_results(
127        &self,
128        height: u64,
129    ) -> Result<tendermint_rpc::endpoint::block_results::Response> {
130        let height = tendermint::block::Height::try_from(height)?;
131        self.send(tendermint_rpc::endpoint::block_results::Request::new(
132            height,
133        ))
134        .await
135    }
136
137    pub async fn block(
138        &self,
139        height: Option<u64>,
140    ) -> Result<tendermint_rpc::endpoint::block::v0_38::DialectResponse> {
141        self.send(tendermint_rpc::endpoint::block::Request {
142            height: height.map(|h| h.try_into()).transpose()?,
143        })
144        .await
145    }
146
147    pub async fn health(&self) -> Result<tendermint_rpc::endpoint::health::Response> {
148        self.send(tendermint_rpc::endpoint::health::Request).await
149    }
150
151    pub async fn abci_query(
152        &self,
153        path: String,
154        data: Vec<u8>,
155        height: Option<u64>,
156        prove: bool,
157    ) -> Result<tendermint_rpc::endpoint::abci_query::AbciQuery> {
158        let height = match height {
159            Some(height) => Some(tendermint::block::Height::try_from(height)?),
160            None => {
161                // according to the rpc docs, 0 is latest... not sure what native None means
162                Some(tendermint::block::Height::try_from(0u64)?)
163            }
164        };
165        let resp = self
166            .send(tendermint_rpc::endpoint::abci_query::Request {
167                path: Some(path),
168                data,
169                height,
170                prove,
171            })
172            .await?
173            .response;
174
175        if resp.code.is_err() {
176            bail!("abci query failed: {}", resp.log);
177        }
178
179        Ok(resp)
180    }
181
182    pub async fn abci_protobuf_query<REQ, RESP>(
183        &self,
184        path: impl ToString,
185        req: REQ,
186        height: Option<u64>,
187    ) -> Result<RESP>
188    where
189        REQ: layer_climb_proto::Name,
190        RESP: layer_climb_proto::Name + Default,
191    {
192        let resp = self
193            .abci_query(path.to_string(), req.encode_to_vec(), height, false)
194            .await?;
195
196        RESP::decode(resp.value.as_slice()).map_err(|err| anyhow::anyhow!(err))
197    }
198
199    async fn send<T: tendermint_rpc::Request>(&self, req: T) -> Result<T::Response> {
200        let res = self
201            .http_client
202            .post_json_bytes(&self.url, req.into_json().into_bytes())
203            .await?;
204
205        T::Response::from_string(res).map_err(|err| anyhow::anyhow!(err))
206    }
207}
208
209/// Response from any kind of transaction broadcast request.
210#[derive(Clone, Debug, Deserialize, Serialize)]
211pub struct TxResponse {
212    /// Code space
213    pub codespace: String,
214
215    /// Code
216    pub code: tendermint::abci::Code,
217
218    /// Data
219    pub data: Vec<u8>,
220
221    /// Log
222    pub log: String,
223
224    /// Transaction hash
225    pub hash: tendermint::Hash,
226}
227
228impl From<tendermint_rpc::endpoint::broadcast::tx_sync::Response> for TxResponse {
229    fn from(resp: tendermint_rpc::endpoint::broadcast::tx_sync::Response) -> Self {
230        Self {
231            codespace: resp.codespace,
232            code: resp.code,
233            data: resp.data.into(),
234            log: resp.log,
235            hash: resp.hash,
236        }
237    }
238}
239
240impl From<tendermint_rpc::endpoint::broadcast::tx_async::Response> for TxResponse {
241    fn from(resp: tendermint_rpc::endpoint::broadcast::tx_async::Response) -> Self {
242        Self {
243            codespace: resp.codespace,
244            code: resp.code,
245            data: resp.data.into(),
246            log: resp.log,
247            hash: resp.hash,
248        }
249    }
250}