xand_api_client/
lib.rs

1//! Provides a trait and implementation of that trait for interacting with the `xand-api` (as
2//! well as a fake version you can use in your tests in the `mock` module).
3//!
4//! If you are talking with `xand-api` over the wire, this is what you should use.
5
6#![forbid(unsafe_code)]
7
8#[macro_use]
9extern crate tpfs_logger_port;
10
11pub mod errors;
12mod log_events;
13pub mod mock;
14pub mod models;
15pub mod proto_help;
16mod reconnector;
17mod with_timeout;
18
19// We re-export many things from xand-models so that consumers of the client can depend only on
20// the client, and are in effect "told" what version of xand-models to use.
21pub use reconnector::{ReconnectingXandApiClient, Resurrectable};
22pub use tonic::{Code as TonicCode, Status as TonicStatus};
23pub use with_timeout::WithTimeout;
24pub use xand_address::{Address, AddressError};
25pub use xand_api_proto::proto_models::*;
26
27use crate::{
28    errors::XandApiClientError,
29    models::{Paginated, Paging, TransactionUpdate},
30};
31use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
32use std::{
33    convert::TryInto,
34    fmt::Debug,
35    pin::Pin,
36    task::{Context, Poll},
37    time::Duration,
38};
39use tonic::{codegen::http, transport::ClientTlsConfig, Streaming};
40use url::Url;
41use xand_api_proto::{
42    self as xap, xand_api_client::XandApiClient as GrpcClient, TransactionUpdate as ProtoTxnUpdate,
43};
44
45type Result<T, E = XandApiClientError> = std::result::Result<T, E>;
46pub type TransactionHistoryPage = Paginated<Vec<Transaction>>;
47
48/// Constants for URL schemes
49mod scheme {
50    pub const HTTPS: &str = "https";
51}
52
53/// Trait that exposes all of the functionality provided by the xand-api as a simple client
54/// interface
55#[async_trait::async_trait]
56pub trait XandApiClientTrait: std::marker::Send + Sync {
57    /// Submit a transaction to the API, where it will be signed and then sent to the blockchain
58    ///
59    /// Returns a stream of transaction status updates
60    async fn submit_transaction(
61        &self,
62        issuer: Address,
63        txn: XandTransaction,
64    ) -> Result<TransactionStatusStream>;
65
66    /// Like `submit_transaction`, but rather than returning a stream, returns a future that
67    /// completes once the the transaction has been committed or invalidated (not finalized).
68    async fn submit_transaction_wait(
69        &self,
70        issuer: Address,
71        txn: XandTransaction,
72    ) -> Result<TransactionUpdate> {
73        self.submit_transaction(issuer, txn)
74            .await?
75            .committed_result()
76            .await
77    }
78
79    /// Submits a transaction and returns its final status (which, in the case of a valid
80    /// transaction, should eventually be `Finalized`).
81    ///
82    /// You should generally prefer `submit_transaction_wait` over this, unless you must absolutely
83    /// be sure the transaction was finalized (EX: you are the trust).
84    async fn submit_transaction_wait_final(
85        &self,
86        issuer: Address,
87        txn: XandTransaction,
88    ) -> Result<TransactionUpdate> {
89        self.submit_transaction(issuer, txn)
90            .await?
91            .try_fold(None, |_, update| async { Ok(Some(update)) })
92            .await?
93            .ok_or(XandApiClientError::UnknownTransactionStatus)
94    }
95
96    /// Fetch the details of a particular transaction
97    async fn get_transaction_details(&self, id: &TransactionId) -> Result<Transaction>;
98
99    /// Query transaction history
100    async fn get_transaction_history(
101        &self,
102        paging: Option<Paging>,
103        filter: &TransactionFilter,
104    ) -> Result<Paginated<Vec<Transaction>>>;
105
106    /// Fetch the balance (in USD cents) held by an address
107    ///
108    /// If the balance is unavailable (e.g. the requester cannot read it), `Ok(None)` is returned.
109    ///
110    /// TODO: Should also be `&Address`
111    async fn get_balance(&self, address: &str) -> Result<Option<u128>>;
112
113    async fn get_total_issuance(&self) -> Result<TotalIssuance>;
114
115    /// Fetch all transactions which are relevant to the provided address
116    async fn get_address_transactions(
117        &self,
118        address: &str,
119        paging: Option<Paging>,
120    ) -> Result<TransactionHistoryPage>;
121
122    /// Request the current pending create requests
123    async fn get_pending_create_requests(
124        &self,
125        paging: Option<Paging>,
126    ) -> Result<Paginated<Vec<PendingCreateRequest>>>;
127
128    /// Request the current pending redeem requests
129    async fn get_pending_redeem_requests(
130        &self,
131        paging: Option<Paging>,
132    ) -> Result<Paginated<Vec<PendingRedeemRequest>>>;
133
134    /// Submit a proposal for an action to be voted on by network
135    async fn propose_action(
136        &self,
137        issuer: Address,
138        admin_txn: AdministrativeTransaction,
139    ) -> Result<TransactionStatusStream>;
140
141    /// Submit vote on specific proposal by id
142    async fn vote_on_proposal(
143        &self,
144        issuer: Address,
145        vote_proposal: VoteProposal,
146    ) -> Result<TransactionStatusStream>;
147
148    /// Gets the most current block and timestamp with a staleness indicator
149    async fn get_current_block(&self) -> Result<Blockstamp>;
150
151    /// Get specific proposal by id
152    async fn get_proposal(&self, id: u32) -> Result<Proposal>;
153
154    /// Get all non-expired proposals
155    async fn get_all_proposals(&self) -> Result<Vec<Proposal>>;
156
157    /// Get list of members
158    async fn get_members(&self) -> Result<Vec<Address>>;
159
160    /// Get list of validators
161    async fn get_authority_keys(&self) -> Result<Vec<Address>>;
162
163    /// Get trustee address
164    async fn get_trustee(&self) -> Result<Address>;
165
166    /// Get Limited Agent address if one exists
167    async fn get_limited_agent(&self) -> Result<Option<Address>>;
168
169    /// Get allowlisted CIDR blocks per address
170    async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>>;
171
172    /// Get current Validator emission rate setting
173    async fn get_validator_emission_rate(&self) -> Result<ValidatorEmissionRate>;
174
175    /// Get the current progress of an individual validator toward an emission
176    async fn get_validator_emission_progress(
177        &self,
178        address: Address,
179    ) -> Result<ValidatorEmissionProgress>;
180
181    /// Get the current pending create request expire time in milliseconds
182    async fn get_pending_create_request_expire_time(&self) -> Result<u64>;
183
184    /// Health check
185    async fn check_health(&self) -> Result<HealthResponse>;
186
187    /// Returns a client that behaves like `self` and applies the given timeout to every
188    /// asynchronous operation.
189    fn with_timeout(self, timeout: Duration) -> WithTimeout<Self>
190    where
191        Self: Sized,
192    {
193        WithTimeout::new(self, timeout)
194    }
195}
196
197/// Wraps a stream of `TransactionStatus` updates to simplify common operations
198#[derive(derive_more::Deref)]
199pub struct TransactionStatusStream {
200    stream: BoxStream<'static, Result<TransactionUpdate>>,
201}
202
203impl TransactionStatusStream {
204    pub(crate) async fn committed_result(mut self) -> Result<TransactionUpdate> {
205        let mut final_status = TransactionStatus::Unknown;
206        let mut id = TransactionId::default();
207        while let Some(update) = self.stream.next().await {
208            let update = update?;
209            id = update.id;
210            match update.status {
211                TransactionStatus::Invalid(_) | TransactionStatus::Committed => {
212                    final_status = update.status;
213                    break;
214                }
215                _ => continue,
216            }
217        }
218        Ok(TransactionUpdate {
219            status: final_status,
220            id,
221        })
222    }
223}
224
225impl From<Vec<Result<TransactionUpdate>>> for TransactionStatusStream {
226    fn from(u: Vec<Result<TransactionUpdate>>) -> Self {
227        Self {
228            stream: futures::stream::iter(u).boxed(),
229        }
230    }
231}
232
233impl From<Streaming<ProtoTxnUpdate>> for TransactionStatusStream {
234    fn from(updates: Streaming<ProtoTxnUpdate>) -> Self {
235        TransactionStatusStream {
236            stream: updates
237                .map(|update| {
238                    let update = update?;
239                    let id = update.id.parse()?;
240                    let status = update.status.and_then(|s| s.status).ok_or_else(|| {
241                        XandApiClientError::BadReplyError {
242                            message: format!("Update for transaction {} had no status!", id),
243                        }
244                    })?;
245                    Ok(TransactionUpdate {
246                        id,
247                        status: status.into(),
248                    })
249                })
250                .boxed(),
251        }
252    }
253}
254
255impl Stream for TransactionStatusStream {
256    type Item = Result<TransactionUpdate>;
257
258    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
259        self.stream.poll_next_unpin(cx)
260    }
261}
262
263/// Default implementation of the `XandApiClientTrait`
264pub struct XandApiClient {
265    inner: GrpcClient<JwtIntercepted<tonic::transport::Channel>>,
266}
267
268impl XandApiClient {
269    /// Returns xand-api client for given.
270    pub async fn connect(url: &Url) -> Result<Self> {
271        XandApiClient::connect_internal(url, None).await
272    }
273
274    /// Set up the client with a JWT that will be attached to all outgoing `Authorization` headers.
275    pub async fn connect_with_jwt(url: &Url, jwt: String) -> Result<Self> {
276        XandApiClient::connect_internal(url, Some(jwt)).await
277    }
278
279    /// Set up a connection to the given URL. Will negotiate TLS if the url scheme is "https".
280    async fn connect_internal(url: &Url, jwt: Option<String>) -> Result<Self, XandApiClientError> {
281        let conn = tonic::transport::Endpoint::new(url.to_string())?;
282        let channel = if url.scheme().eq(scheme::HTTPS) {
283            conn.tls_config(ClientTlsConfig::new())?
284        } else {
285            conn
286        }
287        .connect()
288        .await?;
289        let intercepted = JwtIntercepted {
290            jwt,
291            inner: channel,
292        };
293        Ok(XandApiClient {
294            inner: GrpcClient::new(intercepted),
295        })
296    }
297}
298
299#[derive(Clone, Debug)]
300struct JwtIntercepted<S> {
301    jwt: Option<String>,
302    inner: S,
303}
304
305impl<ReqBody, S> tower_service::Service<http::Request<ReqBody>> for JwtIntercepted<S>
306where
307    S: tower_service::Service<http::Request<ReqBody>>,
308{
309    type Response = S::Response;
310    type Error = S::Error;
311    type Future = S::Future;
312
313    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
314        self.inner.poll_ready(ctx)
315    }
316
317    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
318        let req = match &self.jwt {
319            Some(jwt) => attach_jwt(req, jwt),
320            None => req,
321        };
322        self.inner.call(req)
323    }
324}
325
326fn attach_jwt<T>(mut req: http::Request<T>, jwt: &str) -> http::Request<T> {
327    let preamble = format!("Bearer {}", jwt);
328    req.headers_mut().insert(
329        http::header::AUTHORIZATION,
330        http::HeaderValue::from_str(&preamble).unwrap(),
331    );
332    req
333}
334
335#[async_trait::async_trait]
336impl XandApiClientTrait for XandApiClient {
337    async fn submit_transaction(
338        &self,
339        issuer: Address,
340        txn: XandTransaction,
341    ) -> Result<TransactionStatusStream> {
342        let resp = self
343            .inner
344            .clone()
345            .submit_transaction(xap::UserTransaction {
346                issuer: issuer.to_string(),
347                operation: Some(txn.try_into()?),
348            })
349            .await?;
350        Ok(resp.into_inner().into())
351    }
352
353    async fn get_transaction_details(&self, id: &TransactionId) -> Result<Transaction> {
354        let resp = self
355            .inner
356            .clone()
357            .get_transaction_details(xap::TransactionDetailsRequest { id: id.to_string() })
358            .await?
359            .into_inner();
360        Ok(resp.try_into()?)
361    }
362
363    async fn get_current_block(&self) -> Result<Blockstamp> {
364        let resp = self
365            .inner
366            .clone()
367            .get_current_block(xap::GetCurrentBlockReq {})
368            .await?
369            .into_inner();
370        Ok(resp.try_into()?)
371    }
372
373    async fn get_transaction_history(
374        &self,
375        paging: Option<Paging>,
376        filter: &TransactionFilter,
377    ) -> Result<Paginated<Vec<Transaction>>> {
378        let paging = paging.unwrap_or_default();
379        let resp = self
380            .inner
381            .clone()
382            .get_transaction_history(xap::TransactionHistoryRequest {
383                addresses: filter.addresses.iter().map(ToString::to_string).collect(),
384                page_size: paging.page_size,
385                page_number: paging.page_number,
386                transaction_types: filter
387                    .types
388                    .iter()
389                    .map(|&t| xand_api_proto::TransactionType::from(t) as i32)
390                    .collect(),
391                start_time: filter.start_time.map(|t| xap::Timestamp {
392                    unix_time_millis: t.timestamp_millis(),
393                }),
394                end_time: filter.end_time.map(|t| xap::Timestamp {
395                    unix_time_millis: t.timestamp_millis(),
396                }),
397            })
398            .await?
399            .into_inner();
400        Ok(resp.try_into()?)
401    }
402
403    async fn get_balance(&self, address: &str) -> Result<Option<u128>> {
404        let resp: xap::AddressBalance = self
405            .inner
406            .clone()
407            .get_address_balance(xap::AddressBalanceRequest {
408                address: address.to_string(),
409            })
410            .await?
411            .into_inner();
412        Ok(resp.balance.map(|b| b.amount.into()))
413    }
414
415    async fn get_total_issuance(&self) -> Result<TotalIssuance> {
416        let resp: xap::TotalIssuanceResponse = self
417            .inner
418            .clone()
419            .get_total_issuance(xap::TotalIssuanceRequest {})
420            .await?
421            .into_inner();
422        Ok(resp.try_into()?)
423    }
424
425    async fn get_address_transactions(
426        &self,
427        address: &str,
428        paging: Option<Paging>,
429    ) -> Result<Paginated<Vec<Transaction>>> {
430        let paging = paging.unwrap_or_default();
431        let resp = self
432            .inner
433            .clone()
434            .get_address_transactions(xap::AddressTransactionHistoryRequest {
435                address: address.to_string(),
436                page_size: paging.page_size,
437                page_number: paging.page_number,
438            })
439            .await?
440            .into_inner();
441        Ok(resp.try_into()?)
442    }
443
444    async fn get_pending_create_requests(
445        &self,
446        paging: Option<Paging>,
447    ) -> Result<Paginated<Vec<PendingCreateRequest>>> {
448        let paging = paging.unwrap_or_default();
449        let resp = self
450            .inner
451            .clone()
452            .get_pending_create_requests(xap::PendingCreateRequestsPagination {
453                page_size: paging.page_size,
454                page_number: paging.page_number,
455            })
456            .await?;
457        Ok(resp.into_inner().try_into()?)
458    }
459
460    async fn get_pending_redeem_requests(
461        &self,
462        paging: Option<Paging>,
463    ) -> Result<Paginated<Vec<PendingRedeemRequest>>> {
464        let paging = paging.unwrap_or_default();
465        let resp = self
466            .inner
467            .clone()
468            .get_pending_redeem_requests(xap::PendingRedeemRequestsPagination {
469                page_size: paging.page_size,
470                page_number: paging.page_number,
471            })
472            .await?;
473        Ok(resp.into_inner().try_into()?)
474    }
475
476    async fn propose_action(
477        &self,
478        issuer: Address,
479        proposed_action: AdministrativeTransaction,
480    ) -> Result<TransactionStatusStream, XandApiClientError> {
481        let resp = self
482            .inner
483            .clone()
484            .propose_action(xap::SubmitProposal {
485                issuer: issuer.to_string(),
486                proposed_action: Some(proposed_action.into()),
487            })
488            .await?;
489        Ok(resp.into_inner().into())
490    }
491
492    async fn vote_on_proposal(
493        &self,
494        issuer: Address,
495        vote_proposal: VoteProposal,
496    ) -> Result<TransactionStatusStream, XandApiClientError> {
497        let resp = self
498            .inner
499            .clone()
500            .vote_on_proposal(xap::VotingTransaction {
501                issuer: issuer.to_string(),
502                vote_proposal: Some(vote_proposal.into()),
503            })
504            .await?;
505        Ok(resp.into_inner().into())
506    }
507
508    async fn get_proposal(&self, id: u32) -> Result<Proposal, XandApiClientError> {
509        let resp = self
510            .inner
511            .clone()
512            .get_proposal(xap::GetProposalReq { id })
513            .await?
514            .into_inner()
515            .try_into()?;
516        Ok(resp)
517    }
518
519    async fn get_all_proposals(&self) -> Result<Vec<Proposal>, XandApiClientError> {
520        let resp = self
521            .inner
522            .clone()
523            .get_all_proposals(xap::GetAllProposalsReq {})
524            .await?
525            .into_inner()
526            .proposals
527            .into_iter()
528            .map(|p| p.try_into())
529            .collect::<Result<Vec<_>, _>>()?;
530        Ok(resp)
531    }
532
533    async fn get_members(&self) -> Result<Vec<Address>, XandApiClientError> {
534        let members = self
535            .inner
536            .clone()
537            .get_members(xap::MembersRequest {})
538            .await?
539            .into_inner()
540            .members;
541        to_addresses(members)
542    }
543
544    async fn get_authority_keys(&self) -> Result<Vec<Address>, XandApiClientError> {
545        let keys = self
546            .inner
547            .clone()
548            .get_authority_keys(xap::AuthorityKeysRequest {})
549            .await?
550            .into_inner()
551            .authority_keys;
552        to_addresses(keys)
553    }
554
555    async fn get_trustee(&self) -> Result<Address> {
556        let trustee = self
557            .inner
558            .clone()
559            .get_trustee(xap::GetTrusteeReq {})
560            .await?
561            .into_inner()
562            .address;
563        to_address(&trustee)
564    }
565
566    async fn get_allowlist(&self) -> Result<Vec<(Address, CidrBlock)>> {
567        self.inner
568            .clone()
569            .get_allowlist(xap::AllowlistRequest {})
570            .await?
571            .into_inner()
572            .entries
573            .into_iter()
574            .map(|entry| {
575                Ok((
576                    to_address(&entry.address)?,
577                    to_cidr_block(&entry.cidr_block)?,
578                ))
579            })
580            .collect()
581    }
582
583    async fn get_limited_agent(&self) -> Result<Option<Address>, XandApiClientError> {
584        let limited_agent_address_option = self
585            .inner
586            .clone()
587            .get_limited_agent(xap::GetLimitedAgentReq {})
588            .await?
589            .into_inner()
590            .address;
591        match limited_agent_address_option {
592            Some(la) => Ok(to_address(&la.address_str).map(Some)?),
593            None => Ok(None),
594        }
595    }
596
597    async fn get_validator_emission_rate(&self) -> Result<ValidatorEmissionRate> {
598        Ok(self
599            .inner
600            .clone()
601            .get_validator_emission_rate(xap::GetValidatorEmissionRateReq {})
602            .await?
603            .into_inner()
604            .into())
605    }
606
607    async fn get_validator_emission_progress(
608        &self,
609        address: Address,
610    ) -> Result<ValidatorEmissionProgress> {
611        Ok(self
612            .inner
613            .clone()
614            .get_validator_emission_progress(xap::GetValidatorEmissionProgressReq {
615                address: address.to_string(),
616            })
617            .await?
618            .into_inner()
619            .try_into()?)
620    }
621
622    async fn get_pending_create_request_expire_time(&self) -> Result<u64> {
623        let resp: xap::PendingCreateRequestExpireTime = self
624            .inner
625            .clone()
626            .get_pending_create_request_expire_time(xap::GetPendingCreateRequestExpireTimeReq {})
627            .await?
628            .into_inner();
629        Ok(resp.into())
630    }
631
632    async fn check_health(&self) -> Result<HealthResponse> {
633        let response: xap::HealthCheckResponse = self
634            .inner
635            .clone()
636            .check_health(xap::HealthCheckRequest {})
637            .await?
638            .into_inner();
639        Ok(response.try_into()?)
640    }
641}
642
643fn to_addresses(v: Vec<String>) -> Result<Vec<Address>, XandApiClientError> {
644    let addresses = v
645        .into_iter()
646        .map(|address| to_address(&address))
647        .collect::<Result<Vec<_>, _>>()?;
648    Ok(addresses)
649}
650
651fn to_address(s: &str) -> Result<Address, XandApiClientError> {
652    #[allow(clippy::map_err_ignore)]
653    s.parse()
654        .map_err(|_| XandApiClientError::InvalidAddress { address: s.into() })
655}
656
657fn to_cidr_block(s: &str) -> Result<CidrBlock, XandApiClientError> {
658    #[allow(clippy::map_err_ignore)]
659    s.parse().map_err(|_| XandApiClientError::InvalidCidrBlock {
660        cidr_block: s.into(),
661    })
662}
663
664#[cfg(test)]
665mod test {
666    use super::*;
667    use std::str::FromStr;
668
669    #[ignore]
670    #[tokio::test]
671    async fn manual_test() {
672        let mut client = GrpcClient::<tonic::transport::Channel>::connect("http://127.0.0.1:50051")
673            .await
674            .unwrap();
675        let res = client
676            .get_pending_create_requests(xap::PendingCreateRequestsPagination {
677                page_size: 100,
678                page_number: Default::default(),
679            })
680            .await
681            .unwrap();
682        dbg!(res);
683    }
684
685    // Is never executed. Ignore clippy.
686    #[allow(unconditional_recursion, dead_code)]
687    fn enforce_send<T: std::marker::Send>(_: T) {
688        enforce_send(XandApiClient::connect(&Url::from_str("hi").unwrap()))
689    }
690}