layer_climb_core/network/
rpc.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use tendermint_rpc::Response;
7
8cfg_if::cfg_if! {
9    if #[cfg(target_arch = "wasm32")] {
10        #[async_trait(?Send)]
11        pub trait RpcTransport: Send + Sync {
12            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String>;
13        }
14
15        cfg_if::cfg_if! {
16            if #[cfg(target_os = "unknown")] {
17                #[async_trait(?Send)]
18                impl RpcTransport for reqwest::Client {
19                    async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
20                        self.post(url)
21                            .header("Content-Type", "application/json")
22                            .body(body)
23                            .send()
24                            .await
25                            .map_err(|e| anyhow!("{}", e))?
26                            .text()
27                            .await
28                            .map_err(|e| anyhow!("{}", e))
29                    }
30                }
31            } else {
32                use wstd::{
33                    http::{Client, IntoBody, Request, StatusCode},
34                    io::AsyncRead,
35                };
36
37                pub struct WasiRpcTransport {}
38
39                // prior art, cloudflare does this trick too: https://github.com/cloudflare/workers-rs/blob/38af58acc4e54b29c73336c1720188f3c3e86cc4/worker/src/send.rs#L32
40                unsafe impl Sync for WasiRpcTransport {}
41                unsafe impl Send for WasiRpcTransport {}
42
43                #[async_trait(?Send)]
44                impl RpcTransport for WasiRpcTransport {
45                    async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
46                        let request = Request::post(url).header("content-type", "application/json").body(body.into_body())?;
47                        let mut res = Client::new().send(request).await?;
48
49                        match res.status() {
50                            StatusCode::OK => {
51                                let body = res.body_mut();
52                                let mut body_buf = Vec::new();
53                                body.read_to_end(&mut body_buf).await?;
54                                String::from_utf8(body_buf).map_err(|err| anyhow::anyhow!(err))
55                            },
56                            status => Err(anyhow!("unexpected status code: {status}")),
57                        }
58                    }
59                }
60            }
61        }
62    } else {
63        #[async_trait]
64        pub trait RpcTransport: Send + Sync {
65            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String>;
66        }
67
68        #[async_trait]
69        impl RpcTransport for reqwest::Client {
70            async fn post_json_bytes(&self, url: &str, body: Vec<u8>) -> anyhow::Result<String> {
71                self.post(url)
72                    .header("Content-Type", "application/json")
73                    .body(body)
74                    .send()
75                    .await
76                    .map_err(|e| anyhow!("{}", e))?
77                    .text()
78                    .await
79                    .map_err(|e| anyhow!("{}", e))
80            }
81        }
82    }
83}
84
85#[derive(Clone)]
86pub struct RpcClient {
87    http_client: Arc<dyn RpcTransport>,
88    url: String,
89}
90
91impl RpcClient {
92    pub fn new(url: String, http_client: Arc<dyn RpcTransport>) -> Self {
93        Self { url, http_client }
94    }
95
96    pub async fn commit(&self, height: u64) -> Result<tendermint_rpc::endpoint::commit::Response> {
97        let height = tendermint::block::Height::try_from(height)?;
98        self.send(tendermint_rpc::endpoint::commit::Request::new(height))
99            .await
100    }
101
102    pub async fn broadcast_tx(
103        &self,
104        tx: Vec<u8>,
105        mode: layer_climb_proto::tx::BroadcastMode,
106    ) -> Result<TxResponse> {
107        match mode {
108            layer_climb_proto::tx::BroadcastMode::Sync
109            | layer_climb_proto::tx::BroadcastMode::Block => self
110                .send(tendermint_rpc::endpoint::broadcast::tx_sync::Request::new(
111                    tx,
112                ))
113                .await
114                .map(|resp| resp.into()),
115            layer_climb_proto::tx::BroadcastMode::Async => self
116                .send(tendermint_rpc::endpoint::broadcast::tx_async::Request::new(
117                    tx,
118                ))
119                .await
120                .map(|resp| resp.into()),
121            layer_climb_proto::tx::BroadcastMode::Unspecified => {
122                Err(anyhow!("broadcast mode unspecified"))
123            }
124        }
125    }
126
127    pub async fn block_results(
128        &self,
129        height: u64,
130    ) -> Result<tendermint_rpc::endpoint::block_results::Response> {
131        let height = tendermint::block::Height::try_from(height)?;
132        self.send(tendermint_rpc::endpoint::block_results::Request::new(
133            height,
134        ))
135        .await
136    }
137
138    pub async fn block(
139        &self,
140        height: Option<u64>,
141    ) -> Result<tendermint_rpc::endpoint::block::v0_38::DialectResponse> {
142        self.send(tendermint_rpc::endpoint::block::Request {
143            height: height.map(|h| h.try_into()).transpose()?,
144        })
145        .await
146    }
147
148    pub async fn health(&self) -> Result<tendermint_rpc::endpoint::health::Response> {
149        self.send(tendermint_rpc::endpoint::health::Request).await
150    }
151
152    pub async fn abci_query(
153        &self,
154        path: String,
155        data: Vec<u8>,
156        height: Option<u64>,
157        prove: bool,
158    ) -> Result<tendermint_rpc::endpoint::abci_query::AbciQuery> {
159        let height = match height {
160            Some(height) => Some(tendermint::block::Height::try_from(height)?),
161            None => {
162                // according to the rpc docs, 0 is latest... not sure what native None means
163                Some(tendermint::block::Height::try_from(0u64)?)
164            }
165        };
166        let resp = self
167            .send(tendermint_rpc::endpoint::abci_query::Request {
168                path: Some(path),
169                data,
170                height,
171                prove,
172            })
173            .await?
174            .response;
175
176        if resp.code.is_err() {
177            bail!("abci query failed: {}", resp.log);
178        }
179
180        Ok(resp)
181    }
182
183    pub async fn abci_protobuf_query<REQ, RESP>(
184        &self,
185        path: impl ToString,
186        req: REQ,
187        height: Option<u64>,
188    ) -> Result<RESP>
189    where
190        REQ: layer_climb_proto::Name,
191        RESP: layer_climb_proto::Name + Default,
192    {
193        let resp = self
194            .abci_query(path.to_string(), req.encode_to_vec(), height, false)
195            .await?;
196
197        RESP::decode(resp.value.as_slice()).map_err(|err| anyhow::anyhow!(err))
198    }
199
200    async fn send<T: tendermint_rpc::Request>(&self, req: T) -> Result<T::Response> {
201        let res = self
202            .http_client
203            .post_json_bytes(&self.url, req.into_json().into_bytes())
204            .await?;
205
206        T::Response::from_string(res).map_err(|err| anyhow::anyhow!(err))
207    }
208}
209
210/// Response from any kind of transaction broadcast request.
211#[derive(Clone, Debug, Deserialize, Serialize)]
212pub struct TxResponse {
213    /// Code space
214    pub codespace: String,
215
216    /// Code
217    pub code: tendermint::abci::Code,
218
219    /// Data
220    pub data: Vec<u8>,
221
222    /// Log
223    pub log: String,
224
225    /// Transaction hash
226    pub hash: tendermint::Hash,
227}
228
229impl From<tendermint_rpc::endpoint::broadcast::tx_sync::Response> for TxResponse {
230    fn from(resp: tendermint_rpc::endpoint::broadcast::tx_sync::Response) -> Self {
231        Self {
232            codespace: resp.codespace,
233            code: resp.code,
234            data: resp.data.into(),
235            log: resp.log,
236            hash: resp.hash,
237        }
238    }
239}
240
241impl From<tendermint_rpc::endpoint::broadcast::tx_async::Response> for TxResponse {
242    fn from(resp: tendermint_rpc::endpoint::broadcast::tx_async::Response) -> Self {
243        Self {
244            codespace: resp.codespace,
245            code: resp.code,
246            data: resp.data.into(),
247            log: resp.log,
248            hash: resp.hash,
249        }
250    }
251}