gemachain_banks_client/
lib.rs

1//! A client for the ledger state, from the perspective of an arbitrary validator.
2//!
3//! Use start_tcp_client() to create a client and then import BanksClientExt to
4//! access its methods. Additional "*_with_context" methods are also available,
5//! but they are undocumented, may change over time, and are generally more
6//! cumbersome to use.
7
8pub use gemachain_banks_interface::{BanksClient as TarpcClient, TransactionStatus};
9use {
10    borsh::BorshDeserialize,
11    futures::{future::join_all, Future, FutureExt},
12    gemachain_banks_interface::{BanksRequest, BanksResponse},
13    gemachain_program::{
14        clock::Slot, fee_calculator::FeeCalculator, hash::Hash, program_pack::Pack, pubkey::Pubkey,
15        rent::Rent, sysvar::Sysvar,
16    },
17    gemachain_sdk::{
18        account::{from_account, Account},
19        commitment_config::CommitmentLevel,
20        signature::Signature,
21        transaction::{self, Transaction},
22        transport,
23    },
24    std::io::{self, Error, ErrorKind},
25    tarpc::{
26        client::{self, NewClient, RequestDispatch},
27        context::{self, Context},
28        serde_transport::tcp,
29        ClientMessage, Response, Transport,
30    },
31    tokio::{net::ToSocketAddrs, time::Duration},
32    tokio_serde::formats::Bincode,
33};
34
35// This exists only for backward compatibility
36pub trait BanksClientExt {}
37
38#[derive(Clone)]
39pub struct BanksClient {
40    inner: TarpcClient,
41}
42
43impl BanksClient {
44    #[allow(clippy::new_ret_no_self)]
45    pub fn new<C>(
46        config: client::Config,
47        transport: C,
48    ) -> NewClient<TarpcClient, RequestDispatch<BanksRequest, BanksResponse, C>>
49    where
50        C: Transport<ClientMessage<BanksRequest>, Response<BanksResponse>>,
51    {
52        TarpcClient::new(config, transport)
53    }
54
55    pub fn send_transaction_with_context(
56        &mut self,
57        ctx: Context,
58        transaction: Transaction,
59    ) -> impl Future<Output = io::Result<()>> + '_ {
60        self.inner.send_transaction_with_context(ctx, transaction)
61    }
62
63    pub fn get_fees_with_commitment_and_context(
64        &mut self,
65        ctx: Context,
66        commitment: CommitmentLevel,
67    ) -> impl Future<Output = io::Result<(FeeCalculator, Hash, u64)>> + '_ {
68        self.inner
69            .get_fees_with_commitment_and_context(ctx, commitment)
70    }
71
72    pub fn get_transaction_status_with_context(
73        &mut self,
74        ctx: Context,
75        signature: Signature,
76    ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
77        self.inner
78            .get_transaction_status_with_context(ctx, signature)
79    }
80
81    pub fn get_slot_with_context(
82        &mut self,
83        ctx: Context,
84        commitment: CommitmentLevel,
85    ) -> impl Future<Output = io::Result<Slot>> + '_ {
86        self.inner.get_slot_with_context(ctx, commitment)
87    }
88
89    pub fn get_block_height_with_context(
90        &mut self,
91        ctx: Context,
92        commitment: CommitmentLevel,
93    ) -> impl Future<Output = io::Result<Slot>> + '_ {
94        self.inner.get_block_height_with_context(ctx, commitment)
95    }
96
97    pub fn process_transaction_with_commitment_and_context(
98        &mut self,
99        ctx: Context,
100        transaction: Transaction,
101        commitment: CommitmentLevel,
102    ) -> impl Future<Output = io::Result<Option<transaction::Result<()>>>> + '_ {
103        self.inner
104            .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
105    }
106
107    pub fn get_account_with_commitment_and_context(
108        &mut self,
109        ctx: Context,
110        address: Pubkey,
111        commitment: CommitmentLevel,
112    ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
113        self.inner
114            .get_account_with_commitment_and_context(ctx, address, commitment)
115    }
116
117    /// Send a transaction and return immediately. The server will resend the
118    /// transaction until either it is accepted by the cluster or the transaction's
119    /// blockhash expires.
120    pub fn send_transaction(
121        &mut self,
122        transaction: Transaction,
123    ) -> impl Future<Output = io::Result<()>> + '_ {
124        self.send_transaction_with_context(context::current(), transaction)
125    }
126
127    /// Return the fee parameters associated with a recent, rooted blockhash. The cluster
128    /// will use the transaction's blockhash to look up these same fee parameters and
129    /// use them to calculate the transaction fee.
130    pub fn get_fees(
131        &mut self,
132    ) -> impl Future<Output = io::Result<(FeeCalculator, Hash, u64)>> + '_ {
133        self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::default())
134    }
135
136    /// Return the cluster Sysvar
137    pub fn get_sysvar<T: Sysvar>(&mut self) -> impl Future<Output = io::Result<T>> + '_ {
138        self.get_account(T::id()).map(|result| {
139            let sysvar = result?
140                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Sysvar not present"))?;
141            from_account::<T, _>(&sysvar)
142                .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to deserialize sysvar"))
143        })
144    }
145
146    /// Return the cluster rent
147    pub fn get_rent(&mut self) -> impl Future<Output = io::Result<Rent>> + '_ {
148        self.get_sysvar::<Rent>()
149    }
150
151    /// Return a recent, rooted blockhash from the server. The cluster will only accept
152    /// transactions with a blockhash that has not yet expired. Use the `get_fees`
153    /// method to get both a blockhash and the blockhash's last valid slot.
154    pub fn get_recent_blockhash(&mut self) -> impl Future<Output = io::Result<Hash>> + '_ {
155        self.get_fees().map(|result| Ok(result?.1))
156    }
157
158    /// Send a transaction and return after the transaction has been rejected or
159    /// reached the given level of commitment.
160    pub fn process_transaction_with_commitment(
161        &mut self,
162        transaction: Transaction,
163        commitment: CommitmentLevel,
164    ) -> impl Future<Output = transport::Result<()>> + '_ {
165        let mut ctx = context::current();
166        ctx.deadline += Duration::from_secs(50);
167        self.process_transaction_with_commitment_and_context(ctx, transaction, commitment)
168            .map(|result| match result? {
169                None => {
170                    Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into())
171                }
172                Some(transaction_result) => Ok(transaction_result?),
173            })
174    }
175
176    /// Send a transaction and return until the transaction has been finalized or rejected.
177    pub fn process_transaction(
178        &mut self,
179        transaction: Transaction,
180    ) -> impl Future<Output = transport::Result<()>> + '_ {
181        self.process_transaction_with_commitment(transaction, CommitmentLevel::default())
182    }
183
184    pub async fn process_transactions_with_commitment(
185        &mut self,
186        transactions: Vec<Transaction>,
187        commitment: CommitmentLevel,
188    ) -> transport::Result<()> {
189        let mut clients: Vec<_> = transactions.iter().map(|_| self.clone()).collect();
190        let futures = clients
191            .iter_mut()
192            .zip(transactions)
193            .map(|(client, transaction)| {
194                client.process_transaction_with_commitment(transaction, commitment)
195            });
196        let statuses = join_all(futures).await;
197        statuses.into_iter().collect() // Convert Vec<Result<_, _>> to Result<Vec<_>>
198    }
199
200    /// Send transactions and return until the transaction has been finalized or rejected.
201    pub fn process_transactions(
202        &mut self,
203        transactions: Vec<Transaction>,
204    ) -> impl Future<Output = transport::Result<()>> + '_ {
205        self.process_transactions_with_commitment(transactions, CommitmentLevel::default())
206    }
207
208    /// Return the most recent rooted slot. All transactions at or below this slot
209    /// are said to be finalized. The cluster will not fork to a higher slot.
210    pub fn get_root_slot(&mut self) -> impl Future<Output = io::Result<Slot>> + '_ {
211        self.get_slot_with_context(context::current(), CommitmentLevel::default())
212    }
213
214    /// Return the most recent rooted block height. All transactions at or below this height
215    /// are said to be finalized. The cluster will not fork to a higher block height.
216    pub fn get_root_block_height(&mut self) -> impl Future<Output = io::Result<Slot>> + '_ {
217        self.get_block_height_with_context(context::current(), CommitmentLevel::default())
218    }
219
220    /// Return the account at the given address at the slot corresponding to the given
221    /// commitment level. If the account is not found, None is returned.
222    pub fn get_account_with_commitment(
223        &mut self,
224        address: Pubkey,
225        commitment: CommitmentLevel,
226    ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
227        self.get_account_with_commitment_and_context(context::current(), address, commitment)
228    }
229
230    /// Return the account at the given address at the time of the most recent root slot.
231    /// If the account is not found, None is returned.
232    pub fn get_account(
233        &mut self,
234        address: Pubkey,
235    ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
236        self.get_account_with_commitment(address, CommitmentLevel::default())
237    }
238
239    /// Return the unpacked account data at the given address
240    /// If the account is not found, an error is returned
241    pub fn get_packed_account_data<T: Pack>(
242        &mut self,
243        address: Pubkey,
244    ) -> impl Future<Output = io::Result<T>> + '_ {
245        self.get_account(address).map(|result| {
246            let account =
247                result?.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Account not found"))?;
248            T::unpack_from_slice(&account.data)
249                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to deserialize account"))
250        })
251    }
252
253    /// Return the unpacked account data at the given address
254    /// If the account is not found, an error is returned
255    pub fn get_account_data_with_borsh<T: BorshDeserialize>(
256        &mut self,
257        address: Pubkey,
258    ) -> impl Future<Output = io::Result<T>> + '_ {
259        self.get_account(address).map(|result| {
260            let account =
261                result?.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "account not found"))?;
262            T::try_from_slice(&account.data)
263        })
264    }
265
266    /// Return the balance in carats of an account at the given address at the slot
267    /// corresponding to the given commitment level.
268    pub fn get_balance_with_commitment(
269        &mut self,
270        address: Pubkey,
271        commitment: CommitmentLevel,
272    ) -> impl Future<Output = io::Result<u64>> + '_ {
273        self.get_account_with_commitment_and_context(context::current(), address, commitment)
274            .map(|result| Ok(result?.map(|x| x.carats).unwrap_or(0)))
275    }
276
277    /// Return the balance in carats of an account at the given address at the time
278    /// of the most recent root slot.
279    pub fn get_balance(&mut self, address: Pubkey) -> impl Future<Output = io::Result<u64>> + '_ {
280        self.get_balance_with_commitment(address, CommitmentLevel::default())
281    }
282
283    /// Return the status of a transaction with a signature matching the transaction's first
284    /// signature. Return None if the transaction is not found, which may be because the
285    /// blockhash was expired or the fee-paying account had insufficient funds to pay the
286    /// transaction fee. Note that servers rarely store the full transaction history. This
287    /// method may return None if the transaction status has been discarded.
288    pub fn get_transaction_status(
289        &mut self,
290        signature: Signature,
291    ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
292        self.get_transaction_status_with_context(context::current(), signature)
293    }
294
295    /// Same as get_transaction_status, but for multiple transactions.
296    pub async fn get_transaction_statuses(
297        &mut self,
298        signatures: Vec<Signature>,
299    ) -> io::Result<Vec<Option<TransactionStatus>>> {
300        // tarpc futures oddly hold a mutable reference back to the client so clone the client upfront
301        let mut clients_and_signatures: Vec<_> = signatures
302            .into_iter()
303            .map(|signature| (self.clone(), signature))
304            .collect();
305
306        let futs = clients_and_signatures
307            .iter_mut()
308            .map(|(client, signature)| client.get_transaction_status(*signature));
309
310        let statuses = join_all(futs).await;
311
312        // Convert Vec<Result<_, _>> to Result<Vec<_>>
313        statuses.into_iter().collect()
314    }
315}
316
317pub async fn start_client<C>(transport: C) -> io::Result<BanksClient>
318where
319    C: Transport<ClientMessage<BanksRequest>, Response<BanksResponse>> + Send + 'static,
320{
321    Ok(BanksClient {
322        inner: TarpcClient::new(client::Config::default(), transport).spawn(),
323    })
324}
325
326pub async fn start_tcp_client<T: ToSocketAddrs>(addr: T) -> io::Result<BanksClient> {
327    let transport = tcp::connect(addr, Bincode::default).await?;
328    Ok(BanksClient {
329        inner: TarpcClient::new(client::Config::default(), transport).spawn(),
330    })
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use gemachain_banks_server::banks_server::start_local_server;
337    use gemachain_runtime::{
338        bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
339        genesis_utils::create_genesis_config,
340    };
341    use gemachain_sdk::{message::Message, signature::Signer, system_instruction};
342    use std::sync::{Arc, RwLock};
343    use tarpc::transport;
344    use tokio::{runtime::Runtime, time::sleep};
345
346    #[test]
347    fn test_banks_client_new() {
348        let (client_transport, _server_transport) = transport::channel::unbounded();
349        BanksClient::new(client::Config::default(), client_transport);
350    }
351
352    #[test]
353    fn test_banks_server_transfer_via_server() -> io::Result<()> {
354        // This test shows the preferred way to interact with BanksServer.
355        // It creates a runtime explicitly (no globals via tokio macros) and calls
356        // `runtime.block_on()` just once, to run all the async code.
357
358        let genesis = create_genesis_config(10);
359        let bank = Bank::new_for_tests(&genesis.genesis_config);
360        let slot = bank.slot();
361        let block_commitment_cache = Arc::new(RwLock::new(
362            BlockCommitmentCache::new_for_tests_with_slots(slot, slot),
363        ));
364        let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
365
366        let bob_pubkey = gemachain_sdk::pubkey::new_rand();
367        let mint_pubkey = genesis.mint_keypair.pubkey();
368        let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1);
369        let message = Message::new(&[instruction], Some(&mint_pubkey));
370
371        Runtime::new()?.block_on(async {
372            let client_transport = start_local_server(bank_forks, block_commitment_cache).await;
373            let mut banks_client = start_client(client_transport).await?;
374
375            let recent_blockhash = banks_client.get_recent_blockhash().await?;
376            let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash);
377            banks_client.process_transaction(transaction).await.unwrap();
378            assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1);
379            Ok(())
380        })
381    }
382
383    #[test]
384    fn test_banks_server_transfer_via_client() -> io::Result<()> {
385        // The caller may not want to hold the connection open until the transaction
386        // is processed (or blockhash expires). In this test, we verify the
387        // server-side functionality is available to the client.
388
389        let genesis = create_genesis_config(10);
390        let bank = Bank::new_for_tests(&genesis.genesis_config);
391        let slot = bank.slot();
392        let block_commitment_cache = Arc::new(RwLock::new(
393            BlockCommitmentCache::new_for_tests_with_slots(slot, slot),
394        ));
395        let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
396
397        let mint_pubkey = &genesis.mint_keypair.pubkey();
398        let bob_pubkey = gemachain_sdk::pubkey::new_rand();
399        let instruction = system_instruction::transfer(mint_pubkey, &bob_pubkey, 1);
400        let message = Message::new(&[instruction], Some(mint_pubkey));
401
402        Runtime::new()?.block_on(async {
403            let client_transport = start_local_server(bank_forks, block_commitment_cache).await;
404            let mut banks_client = start_client(client_transport).await?;
405            let (_, recent_blockhash, last_valid_block_height) = banks_client.get_fees().await?;
406            let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash);
407            let signature = transaction.signatures[0];
408            banks_client.send_transaction(transaction).await?;
409
410            let mut status = banks_client.get_transaction_status(signature).await?;
411
412            while status.is_none() {
413                let root_block_height = banks_client.get_root_block_height().await?;
414                if root_block_height > last_valid_block_height {
415                    break;
416                }
417                sleep(Duration::from_millis(100)).await;
418                status = banks_client.get_transaction_status(signature).await?;
419            }
420            assert!(status.unwrap().err.is_none());
421            assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1);
422            Ok(())
423        })
424    }
425}