layer_climb_core/querier/
tx.rs

1// these do not go through request middleware since they are transaction-related
2// but they are part of QueryClient, not SigningClient
3
4use std::time::Duration;
5
6use crate::prelude::*;
7
8impl QueryClient {
9    pub async fn simulate_tx(
10        &self,
11        tx_bytes: Vec<u8>,
12    ) -> Result<layer_climb_proto::tx::SimulateResponse> {
13        #[allow(deprecated)]
14        let req = layer_climb_proto::tx::SimulateRequest { tx: None, tx_bytes };
15
16        match self.get_connection_mode() {
17            ConnectionMode::Grpc => {
18                let mut query_client = layer_climb_proto::tx::service_client::ServiceClient::new(
19                    self.clone_grpc_channel()?,
20                );
21
22                query_client
23                    .simulate(req)
24                    .await
25                    .map(|res| res.into_inner())
26                    .map_err(|e| anyhow!("couldn't simulate tx: {e:?}"))
27            }
28            ConnectionMode::Rpc => self
29                .rpc_client()?
30                .abci_protobuf_query("/cosmos.tx.v1beta1.Service/Simulate", req, None)
31                .await
32                .map_err(|e| anyhow!("couldn't simulate tx: {e:?}")),
33        }
34    }
35
36    pub async fn broadcast_tx_bytes(
37        &self,
38        tx_bytes: Vec<u8>,
39        mode: layer_climb_proto::tx::BroadcastMode,
40    ) -> Result<AnyTxResponse> {
41        match self.get_connection_mode() {
42            ConnectionMode::Grpc => {
43                let req = layer_climb_proto::tx::BroadcastTxRequest {
44                    tx_bytes,
45                    mode: mode.into(),
46                };
47
48                let mut query_client = layer_climb_proto::tx::service_client::ServiceClient::new(
49                    self.clone_grpc_channel()?,
50                );
51
52                query_client
53                    .broadcast_tx(req)
54                    .await
55                    .map(|res| res.into_inner().tx_response)?
56                    .context("couldn't broadcast tx")
57                    .map(AnyTxResponse::Abci)
58            }
59            ConnectionMode::Rpc => self
60                .rpc_client()?
61                .broadcast_tx(tx_bytes, mode)
62                .await
63                .context("couldn't broadcast tx")
64                .map(AnyTxResponse::Rpc),
65        }
66    }
67
68    #[tracing::instrument]
69    pub async fn poll_until_tx_ready(
70        &self,
71        tx_hash: String,
72        sleep_duration: Duration,
73        timeout_duration: Duration,
74    ) -> Result<PollTxResponse> {
75        let mut total_duration = Duration::default();
76
77        let mut grpc_query_client = match self.get_connection_mode() {
78            ConnectionMode::Grpc => {
79                Some(layer_climb_proto::tx::service_client::ServiceClient::new(
80                    self.clone_grpc_channel()?,
81                ))
82            }
83            ConnectionMode::Rpc => None,
84        };
85
86        loop {
87            let req = layer_climb_proto::tx::GetTxRequest {
88                hash: tx_hash.clone(),
89            };
90
91            let response = match self.get_connection_mode() {
92                ConnectionMode::Grpc => {
93                    let res = grpc_query_client
94                        .as_mut()
95                        .unwrap()
96                        .get_tx(req)
97                        .await
98                        .map(|res| {
99                            let inner = res.into_inner();
100                            (inner.tx, inner.tx_response)
101                        });
102
103                    match res {
104                        Ok(res) => Ok(Some(res)),
105                        Err(e) => {
106                            if e.code() == tonic::Code::Ok || e.code() == tonic::Code::NotFound {
107                                Ok(None)
108                            } else {
109                                tracing::debug!(
110                                    "failed grpc GetTxRequest [code: {}]. Full error: {:?}",
111                                    e.code(),
112                                    e
113                                );
114                                Err(e.into())
115                            }
116                        }
117                    }
118                }
119                ConnectionMode::Rpc => {
120                    let res = self
121                        .rpc_client()?
122                        .abci_protobuf_query::<_, layer_climb_proto::tx::GetTxResponse>(
123                            "/cosmos.tx.v1beta1.Service/GetTx",
124                            req,
125                            None,
126                        )
127                        .await
128                        .map(|res| (res.tx, res.tx_response));
129
130                    match res {
131                        Ok(res) => Ok(Some(res)),
132                        Err(e) => {
133                            // eww :/
134                            if e.to_string().to_lowercase().contains("notfound") {
135                                Ok(None)
136                            } else {
137                                tracing::debug!("failed rpc GetTxRequest. Full error: {:?}", e);
138                                Err(e)
139                            }
140                        }
141                    }
142                }
143            };
144
145            match response {
146                Ok(Some((tx, Some(tx_response)))) => {
147                    return Ok(PollTxResponse { tx, tx_response });
148                }
149                Err(e) => {
150                    return Err(e);
151                }
152                _ => {}
153            }
154
155            futures_timer::Delay::new(sleep_duration).await;
156            total_duration += sleep_duration;
157            if total_duration >= timeout_duration {
158                return Err(anyhow!("timeout"));
159            }
160        }
161    }
162}
163
164pub struct PollTxResponse {
165    pub tx: Option<layer_climb_proto::tx::Tx>,
166    pub tx_response: layer_climb_proto::abci::TxResponse,
167}
168
169#[derive(Debug)]
170pub enum AnyTxResponse {
171    Abci(layer_climb_proto::abci::TxResponse),
172    Rpc(crate::network::rpc::TxResponse),
173}
174
175impl AnyTxResponse {
176    pub fn code(&self) -> u32 {
177        match self {
178            AnyTxResponse::Abci(res) => res.code,
179            AnyTxResponse::Rpc(res) => match res.code {
180                tendermint::abci::Code::Ok => 0,
181                tendermint::abci::Code::Err(non_zero) => non_zero.into(),
182            },
183        }
184    }
185
186    pub fn codespace(&self) -> &str {
187        match self {
188            AnyTxResponse::Abci(res) => &res.codespace,
189            AnyTxResponse::Rpc(res) => &res.codespace,
190        }
191    }
192
193    pub fn raw_log(&self) -> &str {
194        match self {
195            AnyTxResponse::Abci(res) => &res.raw_log,
196            AnyTxResponse::Rpc(res) => &res.log,
197        }
198    }
199
200    pub fn tx_hash(&self) -> String {
201        match self {
202            AnyTxResponse::Abci(res) => res.txhash.clone(),
203            AnyTxResponse::Rpc(res) => res.hash.to_string(),
204        }
205    }
206}