cosm_utils/clients/
tendermint_rpc.rs1use async_trait::async_trait;
2use cosmrs::{
3 rpc::Client,
4 tendermint::{
5 abci::{response::DeliverTx, Event},
6 Hash,
7 },
8};
9use lazy_static::lazy_static;
10use log::info;
11use std::time::Duration;
12use tendermint_rpc::{
13 client::CompatMode,
14 endpoint::{
15 abci_query::AbciQuery,
16 broadcast::{tx_async, tx_commit, tx_sync},
17 tx,
18 },
19 query::{EventType, Query},
20 HttpClient, Order,
21};
22use tokio::sync::RwLock;
23
24use crate::chain::error::ChainError;
25use crate::chain::tx::RawTx;
26
27use super::client::{
28 ClientAbciQuery, ClientTxAsync, ClientTxCommit, ClientTxSync, GetErr, GetEvents, GetValue,
29 HashSearch,
30};
31
32impl GetEvents for tx_commit::Response {
33 fn get_events(&self) -> &[Event] {
34 self.deliver_tx.events.as_slice()
35 }
36}
37
38impl GetEvents for DeliverTx {
39 fn get_events(&self) -> &[Event] {
40 self.events.as_slice()
41 }
42}
43
44impl GetErr for tx_commit::Response {
45 fn get_err(self) -> Result<Self, ChainError> {
46 if self.deliver_tx.code.is_err() {
47 return Err(ChainError::TxCommit {
48 res: format!("{:?}", self),
49 });
50 } else if self.check_tx.code.is_err() {
51 return Err(ChainError::TxCommit {
52 res: format!("{:?}", self),
53 });
54 }
55 Ok(self)
56 }
57}
58
59impl GetErr for tx_sync::Response {
60 fn get_err(self) -> Result<Self, ChainError> {
61 if self.code.is_err() {
62 return Err(ChainError::TxSync {
63 res: format!("{:?}", self),
64 });
65 }
66 Ok(self)
67 }
68}
69
70impl GetErr for tx_async::Response {
71 fn get_err(self) -> Result<Self, ChainError> {
72 if self.code.is_err() {
73 return Err(ChainError::TxAsync {
74 res: format!("{:?}", self),
75 });
76 }
77 Ok(self)
78 }
79}
80
81impl GetErr for AbciQuery {
82 fn get_err(self) -> Result<Self, ChainError> {
83 if self.code.is_err() {
84 return Err(ChainError::AbciQuery { res: self });
85 }
86 Ok(self)
87 }
88}
89
90impl GetValue for AbciQuery {
91 fn get_value(&self) -> &[u8] {
92 &self.value
93 }
94}
95
96#[async_trait]
97impl<T> ClientAbciQuery for T
98where
99 T: Client + Sync,
100{
101 type Response = AbciQuery;
102
103 async fn abci_query<V>(
104 &self,
105 path: Option<String>,
106 data: V,
107 height: Option<u32>,
108 prove: bool,
109 ) -> Result<Self::Response, ChainError>
110 where
111 V: Into<Vec<u8>> + Send,
112 {
113 let res = self
114 .abci_query(path, data, height.map(Into::into), prove)
115 .await?;
116 Ok(res.get_err()?)
117 }
118}
119
120#[async_trait]
121impl<T> HashSearch for T
122where
123 T: ClientAbciQuery + Client + Sync,
124{
125 async fn hash_search(&self, hash: &Hash) -> Result<tx::Response, ChainError> {
126 let query = Query::from(EventType::Tx).and_eq("tx.hash", hash.to_string());
127 let mut interval = tokio::time::interval(Duration::from_secs(1));
128 let start_time = tokio::time::Instant::now();
129 interval.tick().await;
130 loop {
131 interval.tick().await;
132
133 let search_res = self
134 .tx_search(query.clone(), false, 1, 255, Order::Ascending)
135 .await?;
136 if let Some(tx) = search_res.txs.first() {
137 return Ok(tx.clone());
138 }
139 if tokio::time::Instant::now() - start_time > Duration::from_secs(30) {
140 return Err(ChainError::TxSearchTimeout { tx_hash: *hash });
141 }
142 }
143 }
144}
145
146#[async_trait]
147impl<T> ClientTxCommit for T
148where
149 T: Client + Sync,
150{
151 type Response = tx_commit::Response;
152 async fn broadcast_tx_commit(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
153 let res = self.broadcast_tx_commit(raw_tx.to_bytes()?).await?;
154 Ok(res.get_err()?)
155 }
156}
157
158#[async_trait]
159impl<T> ClientTxSync for T
160where
161 T: Client + Sync,
162{
163 type Response = tx_sync::Response;
164 async fn broadcast_tx_sync(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
165 let res = self.broadcast_tx_sync(raw_tx.to_bytes()?).await?;
166 Ok(res.get_err()?)
167 }
168}
169
170#[async_trait]
171impl<T> ClientTxAsync for T
172where
173 T: Client + Sync,
174{
175 type Response = tx_async::Response;
176 async fn broadcast_tx_async(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
177 let res = self.broadcast_tx_async(raw_tx.to_bytes()?).await?;
178 Ok(res.get_err()?)
179 }
180}
181
182#[cfg_attr(feature = "mockall", automock)]
183#[async_trait]
184pub trait ClientCompat: Client + Sized {
185 async fn query_compat_mode(&self) -> Result<CompatMode, ChainError> {
186 let version = self.status().await?.node_info.version;
187 info!("got tendermint version: {}", version);
188 Ok(CompatMode::from_version(version)?)
189 }
190
191 async fn get_compat(endpoint_url: &str) -> Result<Self, ChainError>;
192
193 async fn get_persistent_compat(endpoint_url: &str) -> Result<Self, ChainError>;
197}
198
199lazy_static! {
200 static ref COMPAT_MODE: RwLock<Option<CompatMode>> = RwLock::new(None);
201}
202
203#[async_trait]
204impl ClientCompat for HttpClient {
205 async fn get_compat(endpoint_url: &str) -> Result<Self, ChainError> {
206 let mut client = Self::new(endpoint_url)?;
207 let compat_mode = client.query_compat_mode().await?;
208 client.set_compat_mode(compat_mode);
209 Ok(client)
210 }
211
212 async fn get_persistent_compat(endpoint_url: &str) -> Result<Self, ChainError> {
213 let mut client = Self::new(endpoint_url)?;
214 let maybe_compat_mode = *COMPAT_MODE.read().await;
215 let compat_mode = match maybe_compat_mode {
216 Some(compat_mode) => compat_mode,
217 None => {
218 let compat_mode = client.query_compat_mode().await?;
219 *COMPAT_MODE.write().await = Some(compat_mode);
220 compat_mode
221 }
222 };
223 client.set_compat_mode(compat_mode);
224 Ok(client)
225 }
226}