cosm_utils/clients/
tendermint_rpc.rs

1use async_trait::async_trait;
2use cosmrs::{
3    rpc::Client,
4    tendermint::{
5        abci::{response::DeliverTx, Event},
6        Hash,
7    },
8};
9use lazy_static::lazy_static;
10use log::info;
11use std::time::Duration;
12use tendermint_rpc::{
13    client::CompatMode,
14    endpoint::{
15        abci_query::AbciQuery,
16        broadcast::{tx_async, tx_commit, tx_sync},
17        tx,
18    },
19    query::{EventType, Query},
20    HttpClient, Order,
21};
22use tokio::sync::RwLock;
23
24use crate::chain::error::ChainError;
25use crate::chain::tx::RawTx;
26
27use super::client::{
28    ClientAbciQuery, ClientTxAsync, ClientTxCommit, ClientTxSync, GetErr, GetEvents, GetValue,
29    HashSearch,
30};
31
32impl GetEvents for tx_commit::Response {
33    fn get_events(&self) -> &[Event] {
34        self.deliver_tx.events.as_slice()
35    }
36}
37
38impl GetEvents for DeliverTx {
39    fn get_events(&self) -> &[Event] {
40        self.events.as_slice()
41    }
42}
43
44impl GetErr for tx_commit::Response {
45    fn get_err(self) -> Result<Self, ChainError> {
46        if self.deliver_tx.code.is_err() {
47            return Err(ChainError::TxCommit {
48                res: format!("{:?}", self),
49            });
50        } else if self.check_tx.code.is_err() {
51            return Err(ChainError::TxCommit {
52                res: format!("{:?}", self),
53            });
54        }
55        Ok(self)
56    }
57}
58
59impl GetErr for tx_sync::Response {
60    fn get_err(self) -> Result<Self, ChainError> {
61        if self.code.is_err() {
62            return Err(ChainError::TxSync {
63                res: format!("{:?}", self),
64            });
65        }
66        Ok(self)
67    }
68}
69
70impl GetErr for tx_async::Response {
71    fn get_err(self) -> Result<Self, ChainError> {
72        if self.code.is_err() {
73            return Err(ChainError::TxAsync {
74                res: format!("{:?}", self),
75            });
76        }
77        Ok(self)
78    }
79}
80
81impl GetErr for AbciQuery {
82    fn get_err(self) -> Result<Self, ChainError> {
83        if self.code.is_err() {
84            return Err(ChainError::AbciQuery { res: self });
85        }
86        Ok(self)
87    }
88}
89
90impl GetValue for AbciQuery {
91    fn get_value(&self) -> &[u8] {
92        &self.value
93    }
94}
95
96#[async_trait]
97impl<T> ClientAbciQuery for T
98where
99    T: Client + Sync,
100{
101    type Response = AbciQuery;
102
103    async fn abci_query<V>(
104        &self,
105        path: Option<String>,
106        data: V,
107        height: Option<u32>,
108        prove: bool,
109    ) -> Result<Self::Response, ChainError>
110    where
111        V: Into<Vec<u8>> + Send,
112    {
113        let res = self
114            .abci_query(path, data, height.map(Into::into), prove)
115            .await?;
116        Ok(res.get_err()?)
117    }
118}
119
120#[async_trait]
121impl<T> HashSearch for T
122where
123    T: ClientAbciQuery + Client + Sync,
124{
125    async fn hash_search(&self, hash: &Hash) -> Result<tx::Response, ChainError> {
126        let query = Query::from(EventType::Tx).and_eq("tx.hash", hash.to_string());
127        let mut interval = tokio::time::interval(Duration::from_secs(1));
128        let start_time = tokio::time::Instant::now();
129        interval.tick().await;
130        loop {
131            interval.tick().await;
132
133            let search_res = self
134                .tx_search(query.clone(), false, 1, 255, Order::Ascending)
135                .await?;
136            if let Some(tx) = search_res.txs.first() {
137                return Ok(tx.clone());
138            }
139            if tokio::time::Instant::now() - start_time > Duration::from_secs(30) {
140                return Err(ChainError::TxSearchTimeout { tx_hash: *hash });
141            }
142        }
143    }
144}
145
146#[async_trait]
147impl<T> ClientTxCommit for T
148where
149    T: Client + Sync,
150{
151    type Response = tx_commit::Response;
152    async fn broadcast_tx_commit(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
153        let res = self.broadcast_tx_commit(raw_tx.to_bytes()?).await?;
154        Ok(res.get_err()?)
155    }
156}
157
158#[async_trait]
159impl<T> ClientTxSync for T
160where
161    T: Client + Sync,
162{
163    type Response = tx_sync::Response;
164    async fn broadcast_tx_sync(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
165        let res = self.broadcast_tx_sync(raw_tx.to_bytes()?).await?;
166        Ok(res.get_err()?)
167    }
168}
169
170#[async_trait]
171impl<T> ClientTxAsync for T
172where
173    T: Client + Sync,
174{
175    type Response = tx_async::Response;
176    async fn broadcast_tx_async(&self, raw_tx: &RawTx) -> Result<Self::Response, ChainError> {
177        let res = self.broadcast_tx_async(raw_tx.to_bytes()?).await?;
178        Ok(res.get_err()?)
179    }
180}
181
182#[cfg_attr(feature = "mockall", automock)]
183#[async_trait]
184pub trait ClientCompat: Client + Sized {
185    async fn query_compat_mode(&self) -> Result<CompatMode, ChainError> {
186        let version = self.status().await?.node_info.version;
187        info!("got tendermint version: {}", version);
188        Ok(CompatMode::from_version(version)?)
189    }
190
191    async fn get_compat(endpoint_url: &str) -> Result<Self, ChainError>;
192
193    /// WARNING: This function creates a global static to remember the compat mode
194    /// for convenience with repeated calls. Once the mode is set, it cannot not be changed
195    /// no matter how many times you call it.
196    async fn get_persistent_compat(endpoint_url: &str) -> Result<Self, ChainError>;
197}
198
199lazy_static! {
200    static ref COMPAT_MODE: RwLock<Option<CompatMode>> = RwLock::new(None);
201}
202
203#[async_trait]
204impl ClientCompat for HttpClient {
205    async fn get_compat(endpoint_url: &str) -> Result<Self, ChainError> {
206        let mut client = Self::new(endpoint_url)?;
207        let compat_mode = client.query_compat_mode().await?;
208        client.set_compat_mode(compat_mode);
209        Ok(client)
210    }
211
212    async fn get_persistent_compat(endpoint_url: &str) -> Result<Self, ChainError> {
213        let mut client = Self::new(endpoint_url)?;
214        let maybe_compat_mode = *COMPAT_MODE.read().await;
215        let compat_mode = match maybe_compat_mode {
216            Some(compat_mode) => compat_mode,
217            None => {
218                let compat_mode = client.query_compat_mode().await?;
219                *COMPAT_MODE.write().await = Some(compat_mode);
220                compat_mode
221            }
222        };
223        client.set_compat_mode(compat_mode);
224        Ok(client)
225    }
226}