1use crate::{
5 errors::XandApiClientError,
6 log_events::LoggingEvent::ReconnectingClientError,
7 models::{Paginated, Paging},
8 TransactionHistoryPage, TransactionId, TransactionStatusStream, XandApiClient,
9 XandApiClientTrait,
10};
11use futures::{future::BoxFuture, FutureExt};
12use std::fmt::Debug;
13use std::ops::Deref;
14use tokio::sync::{RwLock, RwLockReadGuard};
15use url::Url;
16use xand_address::Address;
17use xand_api_proto::proto_models::{
18 AdministrativeTransaction, Blockstamp, CidrBlock, HealthResponse, PendingCreateRequest,
19 PendingRedeemRequest, Proposal, TotalIssuance, Transaction, TransactionFilter,
20 ValidatorEmissionProgress, ValidatorEmissionRate, VoteProposal, XandTransaction,
21};
22
23pub struct Resurrectable<T, Err> {
25 inner: RwLock<Option<T>>,
26 initializer: Box<dyn Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync>,
27}
28
29pub struct LockHolder<'a, T> {
32 guard: RwLockReadGuard<'a, Option<T>>,
33}
34impl<'a, T> Deref for LockHolder<'a, T> {
35 type Target = T;
36
37 fn deref(&self) -> &Self::Target {
38 self.guard.as_ref().unwrap()
39 }
40}
41
42impl<T, Err> Resurrectable<T, Err>
43where
44 Err: Debug,
45{
46 pub async fn new<Fun: Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync + 'static>(
50 initializer: Fun,
51 ) -> Self {
52 Self {
53 inner: RwLock::new(initializer().await.ok()),
54 initializer: Box::new(initializer),
55 }
56 }
57
58 pub fn stub<Fun: Fn() -> BoxFuture<'static, Result<T, Err>> + Send + Sync + 'static>(
61 initializer: Fun,
62 ) -> Self {
63 Self {
64 inner: RwLock::new(None),
65 initializer: Box::new(initializer),
66 }
67 }
68
69 #[allow(clippy::missing_errors_doc)]
70 pub async fn get(&self) -> Result<LockHolder<'_, T>, Err> {
73 let mut wl = self.inner.write().await;
74 if wl.is_none() {
75 *wl = Some((self.initializer)().await?);
76 }
77 drop(wl);
78 Ok(LockHolder {
80 guard: self.inner.read().await,
81 })
82 }
83
84 pub async fn is_initted(&self) -> bool {
87 self.inner.read().await.is_some()
88 }
89
90 pub async fn reset(&self) {
92 (*self.inner.write().await) = None;
93 }
94}
95
96pub struct ReconnectingXandApiClient {
97 inner: Resurrectable<XandApiClient, XandApiClientError>,
98}
99
100impl ReconnectingXandApiClient {
101 pub async fn stub(url: Url) -> Self {
105 Self::stubber(url, None)
106 }
107
108 #[must_use]
110 pub fn stub_jwt(url: Url, jwt: String) -> Self {
111 Self::stubber(url, Some(jwt))
112 }
113
114 fn stubber(url: Url, jwt: Option<String>) -> Self {
115 Self {
116 inner: Resurrectable::stub(move || {
117 let url = url.clone();
120 let jwt = jwt.clone();
121 async move {
122 let res = if let Some(jwt) = jwt {
123 XandApiClient::connect_with_jwt(&url, jwt).await
124 } else {
125 XandApiClient::connect(&url).await
126 };
127 if let Err(e) = &res {
128 warn!(ReconnectingClientError(format!("{:?}", e)))
129 };
130 res
131 }
132 .boxed()
133 }),
134 }
135 }
136}
137
138#[async_trait::async_trait]
139impl XandApiClientTrait for ReconnectingXandApiClient {
140 async fn submit_transaction(
141 &self,
142 issuer: Address,
143 txn: XandTransaction,
144 ) -> Result<TransactionStatusStream, XandApiClientError> {
145 self.inner
146 .get()
147 .await?
148 .submit_transaction(issuer, txn)
149 .await
150 }
151
152 async fn get_transaction_details(
153 &self,
154 id: &TransactionId,
155 ) -> Result<Transaction, XandApiClientError> {
156 self.inner.get().await?.get_transaction_details(id).await
157 }
158
159 async fn get_transaction_history(
160 &self,
161 paging: Option<Paging>,
162 filter: &TransactionFilter,
163 ) -> Result<Paginated<Vec<Transaction>>, XandApiClientError> {
164 self.inner
165 .get()
166 .await?
167 .get_transaction_history(paging, filter)
168 .await
169 }
170
171 async fn get_balance(&self, address: &str) -> Result<Option<u128>, XandApiClientError> {
172 self.inner.get().await?.get_balance(address).await
173 }
174
175 async fn get_current_block(&self) -> Result<Blockstamp, XandApiClientError> {
176 self.inner.get().await?.get_current_block().await
177 }
178
179 async fn get_total_issuance(&self) -> Result<TotalIssuance, XandApiClientError> {
180 self.inner.get().await?.get_total_issuance().await
181 }
182
183 async fn get_address_transactions(
184 &self,
185 address: &str,
186 paging: Option<Paging>,
187 ) -> Result<TransactionHistoryPage, XandApiClientError> {
188 self.inner
189 .get()
190 .await?
191 .get_address_transactions(address, paging)
192 .await
193 }
194
195 async fn get_pending_create_requests(
196 &self,
197 paging: Option<Paging>,
198 ) -> Result<Paginated<Vec<PendingCreateRequest>>, XandApiClientError> {
199 self.inner
200 .get()
201 .await?
202 .get_pending_create_requests(paging)
203 .await
204 }
205
206 async fn get_pending_redeem_requests(
207 &self,
208 paging: Option<Paging>,
209 ) -> Result<Paginated<Vec<PendingRedeemRequest>>, XandApiClientError> {
210 self.inner
211 .get()
212 .await?
213 .get_pending_redeem_requests(paging)
214 .await
215 }
216
217 async fn propose_action(
218 &self,
219 issuer: Address,
220 admin_txn: AdministrativeTransaction,
221 ) -> Result<TransactionStatusStream, XandApiClientError> {
222 self.inner
223 .get()
224 .await?
225 .propose_action(issuer, admin_txn)
226 .await
227 }
228
229 async fn vote_on_proposal(
230 &self,
231 issuer: Address,
232 vote_proposal: VoteProposal,
233 ) -> Result<TransactionStatusStream, XandApiClientError> {
234 self.inner
235 .get()
236 .await?
237 .vote_on_proposal(issuer, vote_proposal)
238 .await
239 }
240
241 async fn get_proposal(&self, id: u32) -> Result<Proposal, XandApiClientError> {
242 self.inner.get().await?.get_proposal(id).await
243 }
244
245 async fn get_all_proposals(&self) -> Result<Vec<Proposal>, XandApiClientError> {
246 self.inner.get().await?.get_all_proposals().await
247 }
248
249 async fn get_members(&self) -> Result<Vec<Address>, XandApiClientError> {
250 self.inner.get().await?.get_members().await
251 }
252
253 async fn get_authority_keys(&self) -> Result<Vec<Address>, XandApiClientError> {
254 self.inner.get().await?.get_authority_keys().await
255 }
256
257 async fn get_trustee(&self) -> Result<Address, XandApiClientError> {
258 self.inner.get().await?.get_trustee().await
259 }
260
261 async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>, XandApiClientError> {
262 self.inner.get().await?.get_allowlist().await
263 }
264
265 async fn get_limited_agent(&self) -> Result<Option<Address>, XandApiClientError> {
266 self.inner.get().await?.get_limited_agent().await
267 }
268
269 async fn get_validator_emission_rate(
270 &self,
271 ) -> Result<ValidatorEmissionRate, XandApiClientError> {
272 self.inner.get().await?.get_validator_emission_rate().await
273 }
274
275 async fn get_validator_emission_progress(
276 &self,
277 address: Address,
278 ) -> Result<ValidatorEmissionProgress, XandApiClientError> {
279 self.inner
280 .get()
281 .await?
282 .get_validator_emission_progress(address)
283 .await
284 }
285
286 async fn get_pending_create_request_expire_time(&self) -> Result<u64, XandApiClientError> {
287 self.inner
288 .get()
289 .await?
290 .get_pending_create_request_expire_time()
291 .await
292 }
293
294 async fn check_health(&self) -> Result<HealthResponse, XandApiClientError> {
295 self.inner.get().await?.check_health().await
296 }
297}
298
299#[cfg(test)]
300mod reconnector_tests {
301 use super::*;
302 use std::sync::Mutex;
303
304 #[tokio::test]
305 async fn resurrectable_works() {
306 let responses: Mutex<Vec<Result<i32, ()>>> =
307 Mutex::new(vec![Err(()), Ok(1), Err(()), Err(())]);
308 let res = Resurrectable::new(move || {
309 futures::future::ready(responses.lock().unwrap().pop().unwrap()).boxed()
310 })
311 .await;
312 let inner_res = res.get().await;
313 matches!(inner_res, Err(()));
314 assert!(!res.is_initted().await);
315 assert_eq!(*res.get().await.unwrap(), 1);
316 assert!(res.is_initted().await);
317 assert_eq!(*res.get().await.unwrap(), 1);
318 }
319}