layer_climb_core/querier/
tx.rs1use std::time::Duration;
5
6use crate::prelude::*;
7
8impl QueryClient {
9 pub async fn simulate_tx(
10 &self,
11 tx_bytes: Vec<u8>,
12 ) -> Result<layer_climb_proto::tx::SimulateResponse> {
13 #[allow(deprecated)]
14 let req = layer_climb_proto::tx::SimulateRequest { tx: None, tx_bytes };
15
16 match self.get_connection_mode() {
17 ConnectionMode::Grpc => {
18 let mut query_client = layer_climb_proto::tx::service_client::ServiceClient::new(
19 self.clone_grpc_channel()?,
20 );
21
22 query_client
23 .simulate(req)
24 .await
25 .map(|res| res.into_inner())
26 .map_err(|e| anyhow!("couldn't simulate tx: {e:?}"))
27 }
28 ConnectionMode::Rpc => self
29 .rpc_client()?
30 .abci_protobuf_query("/cosmos.tx.v1beta1.Service/Simulate", req, None)
31 .await
32 .map_err(|e| anyhow!("couldn't simulate tx: {e:?}")),
33 }
34 }
35
36 pub async fn broadcast_tx_bytes(
37 &self,
38 tx_bytes: Vec<u8>,
39 mode: layer_climb_proto::tx::BroadcastMode,
40 ) -> Result<AnyTxResponse> {
41 match self.get_connection_mode() {
42 ConnectionMode::Grpc => {
43 let req = layer_climb_proto::tx::BroadcastTxRequest {
44 tx_bytes,
45 mode: mode.into(),
46 };
47
48 let mut query_client = layer_climb_proto::tx::service_client::ServiceClient::new(
49 self.clone_grpc_channel()?,
50 );
51
52 query_client
53 .broadcast_tx(req)
54 .await
55 .map(|res| res.into_inner().tx_response)?
56 .context("couldn't broadcast tx")
57 .map(AnyTxResponse::Abci)
58 }
59 ConnectionMode::Rpc => self
60 .rpc_client()?
61 .broadcast_tx(tx_bytes, mode)
62 .await
63 .context("couldn't broadcast tx")
64 .map(AnyTxResponse::Rpc),
65 }
66 }
67
68 #[tracing::instrument]
69 pub async fn poll_until_tx_ready(
70 &self,
71 tx_hash: String,
72 sleep_duration: Duration,
73 timeout_duration: Duration,
74 ) -> Result<PollTxResponse> {
75 let mut total_duration = Duration::default();
76
77 let mut grpc_query_client = match self.get_connection_mode() {
78 ConnectionMode::Grpc => {
79 Some(layer_climb_proto::tx::service_client::ServiceClient::new(
80 self.clone_grpc_channel()?,
81 ))
82 }
83 ConnectionMode::Rpc => None,
84 };
85
86 loop {
87 let req = layer_climb_proto::tx::GetTxRequest {
88 hash: tx_hash.clone(),
89 };
90
91 let response = match self.get_connection_mode() {
92 ConnectionMode::Grpc => {
93 let res = grpc_query_client
94 .as_mut()
95 .unwrap()
96 .get_tx(req)
97 .await
98 .map(|res| {
99 let inner = res.into_inner();
100 (inner.tx, inner.tx_response)
101 });
102
103 match res {
104 Ok(res) => Ok(Some(res)),
105 Err(e) => {
106 if e.code() == tonic::Code::Ok || e.code() == tonic::Code::NotFound {
107 Ok(None)
108 } else {
109 tracing::debug!(
110 "failed grpc GetTxRequest [code: {}]. Full error: {:?}",
111 e.code(),
112 e
113 );
114 Err(e.into())
115 }
116 }
117 }
118 }
119 ConnectionMode::Rpc => {
120 let res = self
121 .rpc_client()?
122 .abci_protobuf_query::<_, layer_climb_proto::tx::GetTxResponse>(
123 "/cosmos.tx.v1beta1.Service/GetTx",
124 req,
125 None,
126 )
127 .await
128 .map(|res| (res.tx, res.tx_response));
129
130 match res {
131 Ok(res) => Ok(Some(res)),
132 Err(e) => {
133 if e.to_string().to_lowercase().contains("notfound") {
135 Ok(None)
136 } else {
137 tracing::debug!("failed rpc GetTxRequest. Full error: {:?}", e);
138 Err(e)
139 }
140 }
141 }
142 }
143 };
144
145 match response {
146 Ok(Some((tx, Some(tx_response)))) => {
147 return Ok(PollTxResponse { tx, tx_response });
148 }
149 Err(e) => {
150 return Err(e);
151 }
152 _ => {}
153 }
154
155 futures_timer::Delay::new(sleep_duration).await;
156 total_duration += sleep_duration;
157 if total_duration >= timeout_duration {
158 return Err(anyhow!("timeout"));
159 }
160 }
161 }
162}
163
164pub struct PollTxResponse {
165 pub tx: Option<layer_climb_proto::tx::Tx>,
166 pub tx_response: layer_climb_proto::abci::TxResponse,
167}
168
169#[derive(Debug)]
170pub enum AnyTxResponse {
171 Abci(layer_climb_proto::abci::TxResponse),
172 Rpc(crate::network::rpc::TxResponse),
173}
174
175impl AnyTxResponse {
176 pub fn code(&self) -> u32 {
177 match self {
178 AnyTxResponse::Abci(res) => res.code,
179 AnyTxResponse::Rpc(res) => match res.code {
180 tendermint::abci::Code::Ok => 0,
181 tendermint::abci::Code::Err(non_zero) => non_zero.into(),
182 },
183 }
184 }
185
186 pub fn codespace(&self) -> &str {
187 match self {
188 AnyTxResponse::Abci(res) => &res.codespace,
189 AnyTxResponse::Rpc(res) => &res.codespace,
190 }
191 }
192
193 pub fn raw_log(&self) -> &str {
194 match self {
195 AnyTxResponse::Abci(res) => &res.raw_log,
196 AnyTxResponse::Rpc(res) => &res.log,
197 }
198 }
199
200 pub fn tx_hash(&self) -> String {
201 match self {
202 AnyTxResponse::Abci(res) => res.txhash.clone(),
203 AnyTxResponse::Rpc(res) => res.hash.to_string(),
204 }
205 }
206}