xand_api_client/
reconnector.rs

1//! Provides functionality extending the api client so that its connection status can largely
2//! be ignored by the user.
3
4use crate::{
5    errors::XandApiClientError,
6    log_events::LoggingEvent::ReconnectingClientError,
7    models::{Paginated, Paging},
8    TransactionHistoryPage, TransactionId, TransactionStatusStream, XandApiClient,
9    XandApiClientTrait,
10};
11use futures::{future::BoxFuture, FutureExt};
12use std::fmt::Debug;
13use std::ops::Deref;
14use tokio::sync::{RwLock, RwLockReadGuard};
15use url::Url;
16use xand_address::Address;
17use xand_api_proto::proto_models::{
18    AdministrativeTransaction, Blockstamp, CidrBlock, HealthResponse, PendingCreateRequest,
19    PendingRedeemRequest, Proposal, TotalIssuance, Transaction, TransactionFilter,
20    ValidatorEmissionProgress, ValidatorEmissionRate, VoteProposal, XandTransaction,
21};
22
23/// A wrapper around something async that will re-attempt initialization automatically
24pub struct Resurrectable<T, Err> {
25    inner: RwLock<Option<T>>,
26    initializer: Box<dyn Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync>,
27}
28
29// This is just a way to provide a view *inside* the option. The option *must* be populated
30// upon insertion
31pub struct LockHolder<'a, T> {
32    guard: RwLockReadGuard<'a, Option<T>>,
33}
34impl<'a, T> Deref for LockHolder<'a, T> {
35    type Target = T;
36
37    fn deref(&self) -> &Self::Target {
38        self.guard.as_ref().unwrap()
39    }
40}
41
42impl<T, Err> Resurrectable<T, Err>
43where
44    Err: Debug,
45{
46    /// Construct a new resurrectable by passing a function that can instantiate the inner type.
47    /// If instantiation fails, the resurrectable will still be created, but the first call to
48    /// `get` will re-run the initializer.
49    pub async fn new<Fun: Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync + 'static>(
50        initializer: Fun,
51    ) -> Self {
52        Self {
53            inner: RwLock::new(initializer().await.ok()),
54            initializer: Box::new(initializer),
55        }
56    }
57
58    /// Construct a new resurrectable by passing a function that can instantiate the inner type.
59    /// The function will *not* be called immediately. It will be called on the first call to `get`
60    pub fn stub<Fun: Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync + 'static>(
61        initializer: Fun,
62    ) -> Self {
63        Self {
64            inner: RwLock::new(None),
65            initializer: Box::new(initializer),
66        }
67    }
68
69    #[allow(clippy::missing_errors_doc)]
70    /// Attempt to fetch the inner type the resurrectable holds. If it has not succesfully been
71    /// initialized, this will attempt to initialize it first, returning an error if that fails.
72    pub async fn get(&self) -> Result<LockHolder<'_, T>, Err> {
73        let mut wl = self.inner.write().await;
74        if wl.is_none() {
75            *wl = Some((self.initializer)().await?);
76        }
77        drop(wl);
78        // Guaranteed to be initialized here
79        Ok(LockHolder {
80            guard: self.inner.read().await,
81        })
82    }
83
84    /// Returns true if the wrapped type has been initialized. IE: If this returns false, the next
85    /// call to `get` must run the initializer, otherwise not.
86    pub async fn is_initted(&self) -> bool {
87        self.inner.read().await.is_some()
88    }
89
90    /// Resets the resurrectable such that the next `get` call will have to run the initializer
91    pub async fn reset(&self) {
92        (*self.inner.write().await) = None;
93    }
94}
95
96pub struct ReconnectingXandApiClient {
97    inner: Resurrectable<XandApiClient, XandApiClientError>,
98}
99
100impl ReconnectingXandApiClient {
101    // TODO: Does not need to be async (nor should it) - fix with Mutex / other API changes.
102    /// Create a new reconnecting client that has not even tried to connect. Will try to connect
103    /// on the first call to `get`
104    pub async fn stub(url: Url) -> Self {
105        Self::stubber(url, None)
106    }
107
108    /// Like `stub`, but with a JWT to attach for authorization.
109    #[must_use]
110    pub fn stub_jwt(url: Url, jwt: String) -> Self {
111        Self::stubber(url, Some(jwt))
112    }
113
114    fn stubber(url: Url, jwt: Option<String>) -> Self {
115        Self {
116            inner: Resurrectable::stub(move || {
117                // These shouldn't *need* to be cloned... it is being passed in by ownership, but
118                // async closures are wizardry still.
119                let url = url.clone();
120                let jwt = jwt.clone();
121                async move {
122                    let res = if let Some(jwt) = jwt {
123                        XandApiClient::connect_with_jwt(&url, jwt).await
124                    } else {
125                        XandApiClient::connect(&url).await
126                    };
127                    if let Err(e) = &res {
128                        warn!(ReconnectingClientError(format!("{:?}", e)))
129                    };
130                    res
131                }
132                .boxed()
133            }),
134        }
135    }
136}
137
138#[async_trait::async_trait]
139impl XandApiClientTrait for ReconnectingXandApiClient {
140    async fn submit_transaction(
141        &self,
142        issuer: Address,
143        txn: XandTransaction,
144    ) -> Result<TransactionStatusStream, XandApiClientError> {
145        self.inner
146            .get()
147            .await?
148            .submit_transaction(issuer, txn)
149            .await
150    }
151
152    async fn get_transaction_details(
153        &self,
154        id: &TransactionId,
155    ) -> Result<Transaction, XandApiClientError> {
156        self.inner.get().await?.get_transaction_details(id).await
157    }
158
159    async fn get_transaction_history(
160        &self,
161        paging: Option<Paging>,
162        filter: &TransactionFilter,
163    ) -> Result<Paginated<Vec<Transaction>>, XandApiClientError> {
164        self.inner
165            .get()
166            .await?
167            .get_transaction_history(paging, filter)
168            .await
169    }
170
171    async fn get_balance(&self, address: &str) -> Result<Option<u128>, XandApiClientError> {
172        self.inner.get().await?.get_balance(address).await
173    }
174
175    async fn get_current_block(&self) -> Result<Blockstamp, XandApiClientError> {
176        self.inner.get().await?.get_current_block().await
177    }
178
179    async fn get_total_issuance(&self) -> Result<TotalIssuance, XandApiClientError> {
180        self.inner.get().await?.get_total_issuance().await
181    }
182
183    async fn get_address_transactions(
184        &self,
185        address: &str,
186        paging: Option<Paging>,
187    ) -> Result<TransactionHistoryPage, XandApiClientError> {
188        self.inner
189            .get()
190            .await?
191            .get_address_transactions(address, paging)
192            .await
193    }
194
195    async fn get_pending_create_requests(
196        &self,
197        paging: Option<Paging>,
198    ) -> Result<Paginated<Vec<PendingCreateRequest>>, XandApiClientError> {
199        self.inner
200            .get()
201            .await?
202            .get_pending_create_requests(paging)
203            .await
204    }
205
206    async fn get_pending_redeem_requests(
207        &self,
208        paging: Option<Paging>,
209    ) -> Result<Paginated<Vec<PendingRedeemRequest>>, XandApiClientError> {
210        self.inner
211            .get()
212            .await?
213            .get_pending_redeem_requests(paging)
214            .await
215    }
216
217    async fn propose_action(
218        &self,
219        issuer: Address,
220        admin_txn: AdministrativeTransaction,
221    ) -> Result<TransactionStatusStream, XandApiClientError> {
222        self.inner
223            .get()
224            .await?
225            .propose_action(issuer, admin_txn)
226            .await
227    }
228
229    async fn vote_on_proposal(
230        &self,
231        issuer: Address,
232        vote_proposal: VoteProposal,
233    ) -> Result<TransactionStatusStream, XandApiClientError> {
234        self.inner
235            .get()
236            .await?
237            .vote_on_proposal(issuer, vote_proposal)
238            .await
239    }
240
241    async fn get_proposal(&self, id: u32) -> Result<Proposal, XandApiClientError> {
242        self.inner.get().await?.get_proposal(id).await
243    }
244
245    async fn get_all_proposals(&self) -> Result<Vec<Proposal>, XandApiClientError> {
246        self.inner.get().await?.get_all_proposals().await
247    }
248
249    async fn get_members(&self) -> Result<Vec<Address>, XandApiClientError> {
250        self.inner.get().await?.get_members().await
251    }
252
253    async fn get_authority_keys(&self) -> Result<Vec<Address>, XandApiClientError> {
254        self.inner.get().await?.get_authority_keys().await
255    }
256
257    async fn get_trustee(&self) -> Result<Address, XandApiClientError> {
258        self.inner.get().await?.get_trustee().await
259    }
260
261    async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>, XandApiClientError> {
262        self.inner.get().await?.get_allowlist().await
263    }
264
265    async fn get_limited_agent(&self) -> Result<Option<Address>, XandApiClientError> {
266        self.inner.get().await?.get_limited_agent().await
267    }
268
269    async fn get_validator_emission_rate(
270        &self,
271    ) -> Result<ValidatorEmissionRate, XandApiClientError> {
272        self.inner.get().await?.get_validator_emission_rate().await
273    }
274
275    async fn get_validator_emission_progress(
276        &self,
277        address: Address,
278    ) -> Result<ValidatorEmissionProgress, XandApiClientError> {
279        self.inner
280            .get()
281            .await?
282            .get_validator_emission_progress(address)
283            .await
284    }
285
286    async fn get_pending_create_request_expire_time(&self) -> Result<u64, XandApiClientError> {
287        self.inner
288            .get()
289            .await?
290            .get_pending_create_request_expire_time()
291            .await
292    }
293
294    async fn check_health(&self) -> Result<HealthResponse, XandApiClientError> {
295        self.inner.get().await?.check_health().await
296    }
297}
298
299#[cfg(test)]
300mod reconnector_tests {
301    use super::*;
302    use std::sync::Mutex;
303
304    #[tokio::test]
305    async fn resurrectable_works() {
306        let responses: Mutex<Vec<Result<i32, ()>>> =
307            Mutex::new(vec![Err(()), Ok(1), Err(()), Err(())]);
308        let res = Resurrectable::new(move || {
309            futures::future::ready(responses.lock().unwrap().pop().unwrap()).boxed()
310        })
311        .await;
312        let inner_res = res.get().await;
313        matches!(inner_res, Err(()));
314        assert!(!res.is_initted().await);
315        assert_eq!(*res.get().await.unwrap(), 1);
316        assert!(res.is_initted().await);
317        assert_eq!(*res.get().await.unwrap(), 1);
318    }
319}