xand_api_client/
with_timeout.rs

1use crate::{
2    Paginated, Paging, TransactionHistoryPage, TransactionStatusStream, TransactionUpdate,
3    VoteProposal, XandApiClientError, XandApiClientTrait,
4};
5use async_trait::async_trait;
6use futures::TryStreamExt;
7use std::{future::Future, time::Duration};
8use xand_address::Address;
9use xand_api_proto::proto_models::{
10    AdministrativeTransaction, Blockstamp, CidrBlock, HealthResponse, PendingCreateRequest,
11    PendingRedeemRequest, Proposal, TotalIssuance, Transaction, TransactionFilter, TransactionId,
12    ValidatorEmissionProgress, ValidatorEmissionRate, XandTransaction,
13};
14
15/// Wrapper type for any type implementing `XandAPIClientTrait` that adds a timeout to all requests.
16#[derive(Debug)]
17pub struct WithTimeout<T> {
18    inner: T,
19    timeout: Duration,
20}
21
22impl<T> WithTimeout<T> {
23    /// Wraps a `XandAPIClientTrait` client and returns a client that applies the given timeout to
24    /// all requests.
25    pub fn new(client: T, timeout: Duration) -> Self {
26        WithTimeout {
27            inner: client,
28            timeout,
29        }
30    }
31
32    async fn timeout<F, O>(&self, task: F) -> F::Output
33    where
34        F: Future<Output = Result<O, XandApiClientError>>,
35    {
36        tokio::time::timeout(self.timeout, task)
37            .await
38            .unwrap_or(Err(XandApiClientError::Timeout))
39    }
40}
41
42#[async_trait]
43impl<T: XandApiClientTrait> XandApiClientTrait for WithTimeout<T> {
44    async fn submit_transaction(
45        &self,
46        issuer: Address,
47        txn: XandTransaction,
48    ) -> Result<TransactionStatusStream, XandApiClientError> {
49        self.timeout(self.inner.submit_transaction(issuer, txn))
50            .await
51    }
52
53    async fn submit_transaction_wait(
54        &self,
55        issuer: Address,
56        txn: XandTransaction,
57    ) -> Result<TransactionUpdate, XandApiClientError> {
58        let task = async move {
59            self.inner
60                .submit_transaction(issuer, txn)
61                .await?
62                .committed_result()
63                .await
64        };
65        self.timeout(task).await
66    }
67
68    async fn submit_transaction_wait_final(
69        &self,
70        issuer: Address,
71        txn: XandTransaction,
72    ) -> Result<TransactionUpdate, XandApiClientError> {
73        let task = async move {
74            self.inner
75                .submit_transaction(issuer, txn)
76                .await?
77                .try_fold(None, |_, update| async { Ok(Some(update)) })
78                .await?
79                .ok_or(XandApiClientError::UnknownTransactionStatus)
80        };
81        self.timeout(task).await
82    }
83
84    async fn get_transaction_details(
85        &self,
86        id: &TransactionId,
87    ) -> Result<Transaction, XandApiClientError> {
88        self.timeout(self.inner.get_transaction_details(id)).await
89    }
90
91    async fn get_transaction_history(
92        &self,
93        paging: Option<Paging>,
94        filter: &TransactionFilter,
95    ) -> Result<Paginated<Vec<Transaction>>, XandApiClientError> {
96        self.timeout(self.inner.get_transaction_history(paging, filter))
97            .await
98    }
99
100    async fn get_balance(&self, address: &str) -> Result<Option<u128>, XandApiClientError> {
101        self.timeout(self.inner.get_balance(address)).await
102    }
103
104    async fn get_total_issuance(&self) -> Result<TotalIssuance, XandApiClientError> {
105        self.timeout(self.inner.get_total_issuance()).await
106    }
107
108    async fn get_current_block(&self) -> Result<Blockstamp, XandApiClientError> {
109        self.timeout(self.inner.get_current_block()).await
110    }
111
112    async fn get_address_transactions(
113        &self,
114        address: &str,
115        paging: Option<Paging>,
116    ) -> Result<TransactionHistoryPage, XandApiClientError> {
117        self.timeout(self.inner.get_address_transactions(address, paging))
118            .await
119    }
120
121    async fn get_pending_create_requests(
122        &self,
123        paging: Option<Paging>,
124    ) -> Result<Paginated<Vec<PendingCreateRequest>>, XandApiClientError> {
125        self.timeout(self.inner.get_pending_create_requests(paging))
126            .await
127    }
128
129    async fn get_pending_redeem_requests(
130        &self,
131        paging: Option<Paging>,
132    ) -> Result<Paginated<Vec<PendingRedeemRequest>>, XandApiClientError> {
133        self.timeout(self.inner.get_pending_redeem_requests(paging))
134            .await
135    }
136
137    async fn propose_action(
138        &self,
139        issuer: Address,
140        admin_txn: AdministrativeTransaction,
141    ) -> Result<TransactionStatusStream, XandApiClientError> {
142        self.timeout(self.inner.propose_action(issuer, admin_txn))
143            .await
144    }
145
146    async fn vote_on_proposal(
147        &self,
148        issuer: Address,
149        vote_proposal: VoteProposal,
150    ) -> Result<TransactionStatusStream, XandApiClientError> {
151        self.timeout(self.inner.vote_on_proposal(issuer, vote_proposal))
152            .await
153    }
154
155    async fn get_proposal(&self, id: u32) -> Result<Proposal, XandApiClientError> {
156        self.timeout(self.inner.get_proposal(id)).await
157    }
158
159    async fn get_all_proposals(&self) -> Result<Vec<Proposal>, XandApiClientError> {
160        self.timeout(self.inner.get_all_proposals()).await
161    }
162
163    async fn get_members(&self) -> Result<Vec<Address>, XandApiClientError> {
164        self.timeout(self.inner.get_members()).await
165    }
166
167    async fn get_authority_keys(&self) -> Result<Vec<Address>, XandApiClientError> {
168        self.timeout(self.inner.get_authority_keys()).await
169    }
170
171    async fn get_trustee(&self) -> Result<Address, XandApiClientError> {
172        self.timeout(self.inner.get_trustee()).await
173    }
174
175    async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>, XandApiClientError> {
176        self.timeout(self.inner.get_allowlist()).await
177    }
178
179    async fn get_limited_agent(&self) -> Result<Option<Address>, XandApiClientError> {
180        self.timeout(self.inner.get_limited_agent()).await
181    }
182
183    async fn get_validator_emission_rate(
184        &self,
185    ) -> Result<ValidatorEmissionRate, XandApiClientError> {
186        self.timeout(self.inner.get_validator_emission_rate()).await
187    }
188
189    async fn get_validator_emission_progress(
190        &self,
191        address: Address,
192    ) -> Result<ValidatorEmissionProgress, XandApiClientError> {
193        self.timeout(self.inner.get_validator_emission_progress(address))
194            .await
195    }
196
197    async fn get_pending_create_request_expire_time(&self) -> Result<u64, XandApiClientError> {
198        self.timeout(self.inner.get_pending_create_request_expire_time())
199            .await
200    }
201
202    async fn check_health(&self) -> Result<HealthResponse, XandApiClientError> {
203        self.timeout(self.inner.check_health()).await
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use crate::{mock::MockXandApiClient, XandApiClientError, XandApiClientTrait};
210    use std::time::Duration;
211
212    #[tokio::test]
213    async fn timeout_is_applied() {
214        let delay = Duration::from_secs(5);
215        let timeout = Duration::from_millis(100);
216        let client = MockXandApiClient::default()
217            .with_delay(delay)
218            .with_timeout(timeout);
219        let result = client.get_pending_redeem_requests(None).await;
220        assert!(matches!(result, Err(XandApiClientError::Timeout)));
221    }
222}