1#![forbid(unsafe_code)]
7
8#[macro_use]
9extern crate tpfs_logger_port;
10
11pub mod errors;
12mod log_events;
13pub mod mock;
14pub mod models;
15pub mod proto_help;
16mod reconnector;
17mod with_timeout;
18
19pub use reconnector::{ReconnectingXandApiClient, Resurrectable};
22pub use tonic::{Code as TonicCode, Status as TonicStatus};
23pub use with_timeout::WithTimeout;
24pub use xand_address::{Address, AddressError};
25pub use xand_api_proto::proto_models::*;
26
27use crate::{
28 errors::XandApiClientError,
29 models::{Paginated, Paging, TransactionUpdate},
30};
31use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
32use std::{
33 convert::TryInto,
34 fmt::Debug,
35 pin::Pin,
36 task::{Context, Poll},
37 time::Duration,
38};
39use tonic::{codegen::http, transport::ClientTlsConfig, Streaming};
40use url::Url;
41use xand_api_proto::{
42 self as xap, xand_api_client::XandApiClient as GrpcClient, TransactionUpdate as ProtoTxnUpdate,
43};
44
45type Result<T, E = XandApiClientError> = std::result::Result<T, E>;
46pub type TransactionHistoryPage = Paginated<Vec<Transaction>>;
47
48mod scheme {
50 pub const HTTPS: &str = "https";
51}
52
53#[async_trait::async_trait]
56pub trait XandApiClientTrait: std::marker::Send + Sync {
57 async fn submit_transaction(
61 &self,
62 issuer: Address,
63 txn: XandTransaction,
64 ) -> Result<TransactionStatusStream>;
65
66 async fn submit_transaction_wait(
69 &self,
70 issuer: Address,
71 txn: XandTransaction,
72 ) -> Result<TransactionUpdate> {
73 self.submit_transaction(issuer, txn)
74 .await?
75 .committed_result()
76 .await
77 }
78
79 async fn submit_transaction_wait_final(
85 &self,
86 issuer: Address,
87 txn: XandTransaction,
88 ) -> Result<TransactionUpdate> {
89 self.submit_transaction(issuer, txn)
90 .await?
91 .try_fold(None, |_, update| async { Ok(Some(update)) })
92 .await?
93 .ok_or(XandApiClientError::UnknownTransactionStatus)
94 }
95
96 async fn get_transaction_details(&self, id: &TransactionId) -> Result<Transaction>;
98
99 async fn get_transaction_history(
101 &self,
102 paging: Option<Paging>,
103 filter: &TransactionFilter,
104 ) -> Result<Paginated<Vec<Transaction>>>;
105
106 async fn get_balance(&self, address: &str) -> Result<Option<u128>>;
112
113 async fn get_total_issuance(&self) -> Result<TotalIssuance>;
114
115 async fn get_address_transactions(
117 &self,
118 address: &str,
119 paging: Option<Paging>,
120 ) -> Result<TransactionHistoryPage>;
121
122 async fn get_pending_create_requests(
124 &self,
125 paging: Option<Paging>,
126 ) -> Result<Paginated<Vec<PendingCreateRequest>>>;
127
128 async fn get_pending_redeem_requests(
130 &self,
131 paging: Option<Paging>,
132 ) -> Result<Paginated<Vec<PendingRedeemRequest>>>;
133
134 async fn propose_action(
136 &self,
137 issuer: Address,
138 admin_txn: AdministrativeTransaction,
139 ) -> Result<TransactionStatusStream>;
140
141 async fn vote_on_proposal(
143 &self,
144 issuer: Address,
145 vote_proposal: VoteProposal,
146 ) -> Result<TransactionStatusStream>;
147
148 async fn get_current_block(&self) -> Result<Blockstamp>;
150
151 async fn get_proposal(&self, id: u32) -> Result<Proposal>;
153
154 async fn get_all_proposals(&self) -> Result<Vec<Proposal>>;
156
157 async fn get_members(&self) -> Result<Vec<Address>>;
159
160 async fn get_authority_keys(&self) -> Result<Vec<Address>>;
162
163 async fn get_trustee(&self) -> Result<Address>;
165
166 async fn get_limited_agent(&self) -> Result<Option<Address>>;
168
169 async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>>;
171
172 async fn get_validator_emission_rate(&self) -> Result<ValidatorEmissionRate>;
174
175 async fn get_validator_emission_progress(
177 &self,
178 address: Address,
179 ) -> Result<ValidatorEmissionProgress>;
180
181 async fn get_pending_create_request_expire_time(&self) -> Result<u64>;
183
184 async fn check_health(&self) -> Result<HealthResponse>;
186
187 fn with_timeout(self, timeout: Duration) -> WithTimeout<Self>
190 where
191 Self: Sized,
192 {
193 WithTimeout::new(self, timeout)
194 }
195}
196
197#[derive(derive_more::Deref)]
199pub struct TransactionStatusStream {
200 stream: BoxStream<'static, Result<TransactionUpdate>>,
201}
202
203impl TransactionStatusStream {
204 pub(crate) async fn committed_result(mut self) -> Result<TransactionUpdate> {
205 let mut final_status = TransactionStatus::Unknown;
206 let mut id = TransactionId::default();
207 while let Some(update) = self.stream.next().await {
208 let update = update?;
209 id = update.id;
210 match update.status {
211 TransactionStatus::Invalid(_) | TransactionStatus::Committed => {
212 final_status = update.status;
213 break;
214 }
215 _ => continue,
216 }
217 }
218 Ok(TransactionUpdate {
219 status: final_status,
220 id,
221 })
222 }
223}
224
225impl From<Vec<Result<TransactionUpdate>>> for TransactionStatusStream {
226 fn from(u: Vec<Result<TransactionUpdate>>) -> Self {
227 Self {
228 stream: futures::stream::iter(u).boxed(),
229 }
230 }
231}
232
233impl From<Streaming<ProtoTxnUpdate>> for TransactionStatusStream {
234 fn from(updates: Streaming<ProtoTxnUpdate>) -> Self {
235 TransactionStatusStream {
236 stream: updates
237 .map(|update| {
238 let update = update?;
239 let id = update.id.parse()?;
240 let status = update.status.and_then(|s| s.status).ok_or_else(|| {
241 XandApiClientError::BadReplyError {
242 message: format!("Update for transaction {} had no status!", id),
243 }
244 })?;
245 Ok(TransactionUpdate {
246 id,
247 status: status.into(),
248 })
249 })
250 .boxed(),
251 }
252 }
253}
254
255impl Stream for TransactionStatusStream {
256 type Item = Result<TransactionUpdate>;
257
258 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
259 self.stream.poll_next_unpin(cx)
260 }
261}
262
263pub struct XandApiClient {
265 inner: GrpcClient<JwtIntercepted<tonic::transport::Channel>>,
266}
267
268impl XandApiClient {
269 pub async fn connect(url: &Url) -> Result<Self> {
271 XandApiClient::connect_internal(url, None).await
272 }
273
274 pub async fn connect_with_jwt(url: &Url, jwt: String) -> Result<Self> {
276 XandApiClient::connect_internal(url, Some(jwt)).await
277 }
278
279 async fn connect_internal(url: &Url, jwt: Option<String>) -> Result<Self, XandApiClientError> {
281 let conn = tonic::transport::Endpoint::new(url.to_string())?;
282 let channel = if url.scheme().eq(scheme::HTTPS) {
283 conn.tls_config(ClientTlsConfig::new())?
284 } else {
285 conn
286 }
287 .connect()
288 .await?;
289 let intercepted = JwtIntercepted {
290 jwt,
291 inner: channel,
292 };
293 Ok(XandApiClient {
294 inner: GrpcClient::new(intercepted),
295 })
296 }
297}
298
299#[derive(Clone, Debug)]
300struct JwtIntercepted<S> {
301 jwt: Option<String>,
302 inner: S,
303}
304
305impl<ReqBody, S> tower_service::Service<http::Request<ReqBody>> for JwtIntercepted<S>
306where
307 S: tower_service::Service<http::Request<ReqBody>>,
308{
309 type Response = S::Response;
310 type Error = S::Error;
311 type Future = S::Future;
312
313 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
314 self.inner.poll_ready(ctx)
315 }
316
317 fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
318 let req = match &self.jwt {
319 Some(jwt) => attach_jwt(req, jwt),
320 None => req,
321 };
322 self.inner.call(req)
323 }
324}
325
326fn attach_jwt<T>(mut req: http::Request<T>, jwt: &str) -> http::Request<T> {
327 let preamble = format!("Bearer {}", jwt);
328 req.headers_mut().insert(
329 http::header::AUTHORIZATION,
330 http::HeaderValue::from_str(&preamble).unwrap(),
331 );
332 req
333}
334
335#[async_trait::async_trait]
336impl XandApiClientTrait for XandApiClient {
337 async fn submit_transaction(
338 &self,
339 issuer: Address,
340 txn: XandTransaction,
341 ) -> Result<TransactionStatusStream> {
342 let resp = self
343 .inner
344 .clone()
345 .submit_transaction(xap::UserTransaction {
346 issuer: issuer.to_string(),
347 operation: Some(txn.try_into()?),
348 })
349 .await?;
350 Ok(resp.into_inner().into())
351 }
352
353 async fn get_transaction_details(&self, id: &TransactionId) -> Result<Transaction> {
354 let resp = self
355 .inner
356 .clone()
357 .get_transaction_details(xap::TransactionDetailsRequest { id: id.to_string() })
358 .await?
359 .into_inner();
360 Ok(resp.try_into()?)
361 }
362
363 async fn get_current_block(&self) -> Result<Blockstamp> {
364 let resp = self
365 .inner
366 .clone()
367 .get_current_block(xap::GetCurrentBlockReq {})
368 .await?
369 .into_inner();
370 Ok(resp.try_into()?)
371 }
372
373 async fn get_transaction_history(
374 &self,
375 paging: Option<Paging>,
376 filter: &TransactionFilter,
377 ) -> Result<Paginated<Vec<Transaction>>> {
378 let paging = paging.unwrap_or_default();
379 let resp = self
380 .inner
381 .clone()
382 .get_transaction_history(xap::TransactionHistoryRequest {
383 addresses: filter.addresses.iter().map(ToString::to_string).collect(),
384 page_size: paging.page_size,
385 page_number: paging.page_number,
386 transaction_types: filter
387 .types
388 .iter()
389 .map(|&t| xand_api_proto::TransactionType::from(t) as i32)
390 .collect(),
391 start_time: filter.start_time.map(|t| xap::Timestamp {
392 unix_time_millis: t.timestamp_millis(),
393 }),
394 end_time: filter.end_time.map(|t| xap::Timestamp {
395 unix_time_millis: t.timestamp_millis(),
396 }),
397 })
398 .await?
399 .into_inner();
400 Ok(resp.try_into()?)
401 }
402
403 async fn get_balance(&self, address: &str) -> Result<Option<u128>> {
404 let resp: xap::AddressBalance = self
405 .inner
406 .clone()
407 .get_address_balance(xap::AddressBalanceRequest {
408 address: address.to_string(),
409 })
410 .await?
411 .into_inner();
412 Ok(resp.balance.map(|b| b.amount.into()))
413 }
414
415 async fn get_total_issuance(&self) -> Result<TotalIssuance> {
416 let resp: xap::TotalIssuanceResponse = self
417 .inner
418 .clone()
419 .get_total_issuance(xap::TotalIssuanceRequest {})
420 .await?
421 .into_inner();
422 Ok(resp.try_into()?)
423 }
424
425 async fn get_address_transactions(
426 &self,
427 address: &str,
428 paging: Option<Paging>,
429 ) -> Result<Paginated<Vec<Transaction>>> {
430 let paging = paging.unwrap_or_default();
431 let resp = self
432 .inner
433 .clone()
434 .get_address_transactions(xap::AddressTransactionHistoryRequest {
435 address: address.to_string(),
436 page_size: paging.page_size,
437 page_number: paging.page_number,
438 })
439 .await?
440 .into_inner();
441 Ok(resp.try_into()?)
442 }
443
444 async fn get_pending_create_requests(
445 &self,
446 paging: Option<Paging>,
447 ) -> Result<Paginated<Vec<PendingCreateRequest>>> {
448 let paging = paging.unwrap_or_default();
449 let resp = self
450 .inner
451 .clone()
452 .get_pending_create_requests(xap::PendingCreateRequestsPagination {
453 page_size: paging.page_size,
454 page_number: paging.page_number,
455 })
456 .await?;
457 Ok(resp.into_inner().try_into()?)
458 }
459
460 async fn get_pending_redeem_requests(
461 &self,
462 paging: Option<Paging>,
463 ) -> Result<Paginated<Vec<PendingRedeemRequest>>> {
464 let paging = paging.unwrap_or_default();
465 let resp = self
466 .inner
467 .clone()
468 .get_pending_redeem_requests(xap::PendingRedeemRequestsPagination {
469 page_size: paging.page_size,
470 page_number: paging.page_number,
471 })
472 .await?;
473 Ok(resp.into_inner().try_into()?)
474 }
475
476 async fn propose_action(
477 &self,
478 issuer: Address,
479 proposed_action: AdministrativeTransaction,
480 ) -> Result<TransactionStatusStream, XandApiClientError> {
481 let resp = self
482 .inner
483 .clone()
484 .propose_action(xap::SubmitProposal {
485 issuer: issuer.to_string(),
486 proposed_action: Some(proposed_action.into()),
487 })
488 .await?;
489 Ok(resp.into_inner().into())
490 }
491
492 async fn vote_on_proposal(
493 &self,
494 issuer: Address,
495 vote_proposal: VoteProposal,
496 ) -> Result<TransactionStatusStream, XandApiClientError> {
497 let resp = self
498 .inner
499 .clone()
500 .vote_on_proposal(xap::VotingTransaction {
501 issuer: issuer.to_string(),
502 vote_proposal: Some(vote_proposal.into()),
503 })
504 .await?;
505 Ok(resp.into_inner().into())
506 }
507
508 async fn get_proposal(&self, id: u32) -> Result<Proposal, XandApiClientError> {
509 let resp = self
510 .inner
511 .clone()
512 .get_proposal(xap::GetProposalReq { id })
513 .await?
514 .into_inner()
515 .try_into()?;
516 Ok(resp)
517 }
518
519 async fn get_all_proposals(&self) -> Result<Vec<Proposal>, XandApiClientError> {
520 let resp = self
521 .inner
522 .clone()
523 .get_all_proposals(xap::GetAllProposalsReq {})
524 .await?
525 .into_inner()
526 .proposals
527 .into_iter()
528 .map(|p| p.try_into())
529 .collect::<Result<Vec<_>, _>>()?;
530 Ok(resp)
531 }
532
533 async fn get_members(&self) -> Result<Vec<Address>, XandApiClientError> {
534 let members = self
535 .inner
536 .clone()
537 .get_members(xap::MembersRequest {})
538 .await?
539 .into_inner()
540 .members;
541 to_addresses(members)
542 }
543
544 async fn get_authority_keys(&self) -> Result<Vec<Address>, XandApiClientError> {
545 let keys = self
546 .inner
547 .clone()
548 .get_authority_keys(xap::AuthorityKeysRequest {})
549 .await?
550 .into_inner()
551 .authority_keys;
552 to_addresses(keys)
553 }
554
555 async fn get_trustee(&self) -> Result<Address> {
556 let trustee = self
557 .inner
558 .clone()
559 .get_trustee(xap::GetTrusteeReq {})
560 .await?
561 .into_inner()
562 .address;
563 to_address(&trustee)
564 }
565
566 async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>> {
567 self.inner
568 .clone()
569 .get_allowlist(xap::AllowlistRequest {})
570 .await?
571 .into_inner()
572 .entries
573 .into_iter()
574 .map(|entry| {
575 Ok((
576 to_address(&entry.address)?,
577 to_cidr_block(&entry.cidr_block)?,
578 ))
579 })
580 .collect()
581 }
582
583 async fn get_limited_agent(&self) -> Result<Option<Address>, XandApiClientError> {
584 let limited_agent_address_option = self
585 .inner
586 .clone()
587 .get_limited_agent(xap::GetLimitedAgentReq {})
588 .await?
589 .into_inner()
590 .address;
591 match limited_agent_address_option {
592 Some(la) => Ok(to_address(&la.address_str).map(Some)?),
593 None => Ok(None),
594 }
595 }
596
597 async fn get_validator_emission_rate(&self) -> Result<ValidatorEmissionRate> {
598 Ok(self
599 .inner
600 .clone()
601 .get_validator_emission_rate(xap::GetValidatorEmissionRateReq {})
602 .await?
603 .into_inner()
604 .into())
605 }
606
607 async fn get_validator_emission_progress(
608 &self,
609 address: Address,
610 ) -> Result<ValidatorEmissionProgress> {
611 Ok(self
612 .inner
613 .clone()
614 .get_validator_emission_progress(xap::GetValidatorEmissionProgressReq {
615 address: address.to_string(),
616 })
617 .await?
618 .into_inner()
619 .try_into()?)
620 }
621
622 async fn get_pending_create_request_expire_time(&self) -> Result<u64> {
623 let resp: xap::PendingCreateRequestExpireTime = self
624 .inner
625 .clone()
626 .get_pending_create_request_expire_time(xap::GetPendingCreateRequestExpireTimeReq {})
627 .await?
628 .into_inner();
629 Ok(resp.into())
630 }
631
632 async fn check_health(&self) -> Result<HealthResponse> {
633 let response: xap::HealthCheckResponse = self
634 .inner
635 .clone()
636 .check_health(xap::HealthCheckRequest {})
637 .await?
638 .into_inner();
639 Ok(response.try_into()?)
640 }
641}
642
643fn to_addresses(v: Vec<String>) -> Result<Vec<Address>, XandApiClientError> {
644 let addresses = v
645 .into_iter()
646 .map(|address| to_address(&address))
647 .collect::<Result<Vec<_>, _>>()?;
648 Ok(addresses)
649}
650
651fn to_address(s: &str) -> Result<Address, XandApiClientError> {
652 #[allow(clippy::map_err_ignore)]
653 s.parse()
654 .map_err(|_| XandApiClientError::InvalidAddress { address: s.into() })
655}
656
657fn to_cidr_block(s: &str) -> Result<CidrBlock, XandApiClientError> {
658 #[allow(clippy::map_err_ignore)]
659 s.parse().map_err(|_| XandApiClientError::InvalidCidrBlock {
660 cidr_block: s.into(),
661 })
662}
663
664#[cfg(test)]
665mod test {
666 use super::*;
667 use std::str::FromStr;
668
669 #[ignore]
670 #[tokio::test]
671 async fn manual_test() {
672 let mut client = GrpcClient::<tonic::transport::Channel>::connect("http://127.0.0.1:50051")
673 .await
674 .unwrap();
675 let res = client
676 .get_pending_create_requests(xap::PendingCreateRequestsPagination {
677 page_size: 100,
678 page_number: Default::default(),
679 })
680 .await
681 .unwrap();
682 dbg!(res);
683 }
684
685 #[allow(unconditional_recursion, dead_code)]
687 fn enforce_send<T: std::marker::Send>(_: T) {
688 enforce_send(XandApiClient::connect(&Url::from_str("hi").unwrap()))
689 }
690}