1pub use gemachain_banks_interface::{BanksClient as TarpcClient, TransactionStatus};
9use {
10 borsh::BorshDeserialize,
11 futures::{future::join_all, Future, FutureExt},
12 gemachain_banks_interface::{BanksRequest, BanksResponse},
13 gemachain_program::{
14 clock::Slot, fee_calculator::FeeCalculator, hash::Hash, program_pack::Pack, pubkey::Pubkey,
15 rent::Rent, sysvar::Sysvar,
16 },
17 gemachain_sdk::{
18 account::{from_account, Account},
19 commitment_config::CommitmentLevel,
20 signature::Signature,
21 transaction::{self, Transaction},
22 transport,
23 },
24 std::io::{self, Error, ErrorKind},
25 tarpc::{
26 client::{self, NewClient, RequestDispatch},
27 context::{self, Context},
28 serde_transport::tcp,
29 ClientMessage, Response, Transport,
30 },
31 tokio::{net::ToSocketAddrs, time::Duration},
32 tokio_serde::formats::Bincode,
33};
34
35pub trait BanksClientExt {}
37
38#[derive(Clone)]
39pub struct BanksClient {
40 inner: TarpcClient,
41}
42
43impl BanksClient {
44 #[allow(clippy::new_ret_no_self)]
45 pub fn new<C>(
46 config: client::Config,
47 transport: C,
48 ) -> NewClient<TarpcClient, RequestDispatch<BanksRequest, BanksResponse, C>>
49 where
50 C: Transport<ClientMessage<BanksRequest>, Response<BanksResponse>>,
51 {
52 TarpcClient::new(config, transport)
53 }
54
55 pub fn send_transaction_with_context(
56 &mut self,
57 ctx: Context,
58 transaction: Transaction,
59 ) -> impl Future<Output = io::Result<()>> + '_ {
60 self.inner.send_transaction_with_context(ctx, transaction)
61 }
62
63 pub fn get_fees_with_commitment_and_context(
64 &mut self,
65 ctx: Context,
66 commitment: CommitmentLevel,
67 ) -> impl Future<Output = io::Result<(FeeCalculator, Hash, u64)>> + '_ {
68 self.inner
69 .get_fees_with_commitment_and_context(ctx, commitment)
70 }
71
72 pub fn get_transaction_status_with_context(
73 &mut self,
74 ctx: Context,
75 signature: Signature,
76 ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
77 self.inner
78 .get_transaction_status_with_context(ctx, signature)
79 }
80
81 pub fn get_slot_with_context(
82 &mut self,
83 ctx: Context,
84 commitment: CommitmentLevel,
85 ) -> impl Future<Output = io::Result<Slot>> + '_ {
86 self.inner.get_slot_with_context(ctx, commitment)
87 }
88
89 pub fn get_block_height_with_context(
90 &mut self,
91 ctx: Context,
92 commitment: CommitmentLevel,
93 ) -> impl Future<Output = io::Result<Slot>> + '_ {
94 self.inner.get_block_height_with_context(ctx, commitment)
95 }
96
97 pub fn process_transaction_with_commitment_and_context(
98 &mut self,
99 ctx: Context,
100 transaction: Transaction,
101 commitment: CommitmentLevel,
102 ) -> impl Future<Output = io::Result<Option<transaction::Result<()>>>> + '_ {
103 self.inner
104 .process_transaction_with_commitment_and_context(ctx, transaction, commitment)
105 }
106
107 pub fn get_account_with_commitment_and_context(
108 &mut self,
109 ctx: Context,
110 address: Pubkey,
111 commitment: CommitmentLevel,
112 ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
113 self.inner
114 .get_account_with_commitment_and_context(ctx, address, commitment)
115 }
116
117 pub fn send_transaction(
121 &mut self,
122 transaction: Transaction,
123 ) -> impl Future<Output = io::Result<()>> + '_ {
124 self.send_transaction_with_context(context::current(), transaction)
125 }
126
127 pub fn get_fees(
131 &mut self,
132 ) -> impl Future<Output = io::Result<(FeeCalculator, Hash, u64)>> + '_ {
133 self.get_fees_with_commitment_and_context(context::current(), CommitmentLevel::default())
134 }
135
136 pub fn get_sysvar<T: Sysvar>(&mut self) -> impl Future<Output = io::Result<T>> + '_ {
138 self.get_account(T::id()).map(|result| {
139 let sysvar = result?
140 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Sysvar not present"))?;
141 from_account::<T, _>(&sysvar)
142 .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to deserialize sysvar"))
143 })
144 }
145
146 pub fn get_rent(&mut self) -> impl Future<Output = io::Result<Rent>> + '_ {
148 self.get_sysvar::<Rent>()
149 }
150
151 pub fn get_recent_blockhash(&mut self) -> impl Future<Output = io::Result<Hash>> + '_ {
155 self.get_fees().map(|result| Ok(result?.1))
156 }
157
158 pub fn process_transaction_with_commitment(
161 &mut self,
162 transaction: Transaction,
163 commitment: CommitmentLevel,
164 ) -> impl Future<Output = transport::Result<()>> + '_ {
165 let mut ctx = context::current();
166 ctx.deadline += Duration::from_secs(50);
167 self.process_transaction_with_commitment_and_context(ctx, transaction, commitment)
168 .map(|result| match result? {
169 None => {
170 Err(Error::new(ErrorKind::TimedOut, "invalid blockhash or fee-payer").into())
171 }
172 Some(transaction_result) => Ok(transaction_result?),
173 })
174 }
175
176 pub fn process_transaction(
178 &mut self,
179 transaction: Transaction,
180 ) -> impl Future<Output = transport::Result<()>> + '_ {
181 self.process_transaction_with_commitment(transaction, CommitmentLevel::default())
182 }
183
184 pub async fn process_transactions_with_commitment(
185 &mut self,
186 transactions: Vec<Transaction>,
187 commitment: CommitmentLevel,
188 ) -> transport::Result<()> {
189 let mut clients: Vec<_> = transactions.iter().map(|_| self.clone()).collect();
190 let futures = clients
191 .iter_mut()
192 .zip(transactions)
193 .map(|(client, transaction)| {
194 client.process_transaction_with_commitment(transaction, commitment)
195 });
196 let statuses = join_all(futures).await;
197 statuses.into_iter().collect() }
199
200 pub fn process_transactions(
202 &mut self,
203 transactions: Vec<Transaction>,
204 ) -> impl Future<Output = transport::Result<()>> + '_ {
205 self.process_transactions_with_commitment(transactions, CommitmentLevel::default())
206 }
207
208 pub fn get_root_slot(&mut self) -> impl Future<Output = io::Result<Slot>> + '_ {
211 self.get_slot_with_context(context::current(), CommitmentLevel::default())
212 }
213
214 pub fn get_root_block_height(&mut self) -> impl Future<Output = io::Result<Slot>> + '_ {
217 self.get_block_height_with_context(context::current(), CommitmentLevel::default())
218 }
219
220 pub fn get_account_with_commitment(
223 &mut self,
224 address: Pubkey,
225 commitment: CommitmentLevel,
226 ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
227 self.get_account_with_commitment_and_context(context::current(), address, commitment)
228 }
229
230 pub fn get_account(
233 &mut self,
234 address: Pubkey,
235 ) -> impl Future<Output = io::Result<Option<Account>>> + '_ {
236 self.get_account_with_commitment(address, CommitmentLevel::default())
237 }
238
239 pub fn get_packed_account_data<T: Pack>(
242 &mut self,
243 address: Pubkey,
244 ) -> impl Future<Output = io::Result<T>> + '_ {
245 self.get_account(address).map(|result| {
246 let account =
247 result?.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Account not found"))?;
248 T::unpack_from_slice(&account.data)
249 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to deserialize account"))
250 })
251 }
252
253 pub fn get_account_data_with_borsh<T: BorshDeserialize>(
256 &mut self,
257 address: Pubkey,
258 ) -> impl Future<Output = io::Result<T>> + '_ {
259 self.get_account(address).map(|result| {
260 let account =
261 result?.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "account not found"))?;
262 T::try_from_slice(&account.data)
263 })
264 }
265
266 pub fn get_balance_with_commitment(
269 &mut self,
270 address: Pubkey,
271 commitment: CommitmentLevel,
272 ) -> impl Future<Output = io::Result<u64>> + '_ {
273 self.get_account_with_commitment_and_context(context::current(), address, commitment)
274 .map(|result| Ok(result?.map(|x| x.carats).unwrap_or(0)))
275 }
276
277 pub fn get_balance(&mut self, address: Pubkey) -> impl Future<Output = io::Result<u64>> + '_ {
280 self.get_balance_with_commitment(address, CommitmentLevel::default())
281 }
282
283 pub fn get_transaction_status(
289 &mut self,
290 signature: Signature,
291 ) -> impl Future<Output = io::Result<Option<TransactionStatus>>> + '_ {
292 self.get_transaction_status_with_context(context::current(), signature)
293 }
294
295 pub async fn get_transaction_statuses(
297 &mut self,
298 signatures: Vec<Signature>,
299 ) -> io::Result<Vec<Option<TransactionStatus>>> {
300 let mut clients_and_signatures: Vec<_> = signatures
302 .into_iter()
303 .map(|signature| (self.clone(), signature))
304 .collect();
305
306 let futs = clients_and_signatures
307 .iter_mut()
308 .map(|(client, signature)| client.get_transaction_status(*signature));
309
310 let statuses = join_all(futs).await;
311
312 statuses.into_iter().collect()
314 }
315}
316
317pub async fn start_client<C>(transport: C) -> io::Result<BanksClient>
318where
319 C: Transport<ClientMessage<BanksRequest>, Response<BanksResponse>> + Send + 'static,
320{
321 Ok(BanksClient {
322 inner: TarpcClient::new(client::Config::default(), transport).spawn(),
323 })
324}
325
326pub async fn start_tcp_client<T: ToSocketAddrs>(addr: T) -> io::Result<BanksClient> {
327 let transport = tcp::connect(addr, Bincode::default).await?;
328 Ok(BanksClient {
329 inner: TarpcClient::new(client::Config::default(), transport).spawn(),
330 })
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use gemachain_banks_server::banks_server::start_local_server;
337 use gemachain_runtime::{
338 bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache,
339 genesis_utils::create_genesis_config,
340 };
341 use gemachain_sdk::{message::Message, signature::Signer, system_instruction};
342 use std::sync::{Arc, RwLock};
343 use tarpc::transport;
344 use tokio::{runtime::Runtime, time::sleep};
345
346 #[test]
347 fn test_banks_client_new() {
348 let (client_transport, _server_transport) = transport::channel::unbounded();
349 BanksClient::new(client::Config::default(), client_transport);
350 }
351
352 #[test]
353 fn test_banks_server_transfer_via_server() -> io::Result<()> {
354 let genesis = create_genesis_config(10);
359 let bank = Bank::new_for_tests(&genesis.genesis_config);
360 let slot = bank.slot();
361 let block_commitment_cache = Arc::new(RwLock::new(
362 BlockCommitmentCache::new_for_tests_with_slots(slot, slot),
363 ));
364 let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
365
366 let bob_pubkey = gemachain_sdk::pubkey::new_rand();
367 let mint_pubkey = genesis.mint_keypair.pubkey();
368 let instruction = system_instruction::transfer(&mint_pubkey, &bob_pubkey, 1);
369 let message = Message::new(&[instruction], Some(&mint_pubkey));
370
371 Runtime::new()?.block_on(async {
372 let client_transport = start_local_server(bank_forks, block_commitment_cache).await;
373 let mut banks_client = start_client(client_transport).await?;
374
375 let recent_blockhash = banks_client.get_recent_blockhash().await?;
376 let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash);
377 banks_client.process_transaction(transaction).await.unwrap();
378 assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1);
379 Ok(())
380 })
381 }
382
383 #[test]
384 fn test_banks_server_transfer_via_client() -> io::Result<()> {
385 let genesis = create_genesis_config(10);
390 let bank = Bank::new_for_tests(&genesis.genesis_config);
391 let slot = bank.slot();
392 let block_commitment_cache = Arc::new(RwLock::new(
393 BlockCommitmentCache::new_for_tests_with_slots(slot, slot),
394 ));
395 let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
396
397 let mint_pubkey = &genesis.mint_keypair.pubkey();
398 let bob_pubkey = gemachain_sdk::pubkey::new_rand();
399 let instruction = system_instruction::transfer(mint_pubkey, &bob_pubkey, 1);
400 let message = Message::new(&[instruction], Some(mint_pubkey));
401
402 Runtime::new()?.block_on(async {
403 let client_transport = start_local_server(bank_forks, block_commitment_cache).await;
404 let mut banks_client = start_client(client_transport).await?;
405 let (_, recent_blockhash, last_valid_block_height) = banks_client.get_fees().await?;
406 let transaction = Transaction::new(&[&genesis.mint_keypair], message, recent_blockhash);
407 let signature = transaction.signatures[0];
408 banks_client.send_transaction(transaction).await?;
409
410 let mut status = banks_client.get_transaction_status(signature).await?;
411
412 while status.is_none() {
413 let root_block_height = banks_client.get_root_block_height().await?;
414 if root_block_height > last_valid_block_height {
415 break;
416 }
417 sleep(Duration::from_millis(100)).await;
418 status = banks_client.get_transaction_status(signature).await?;
419 }
420 assert!(status.unwrap().err.is_none());
421 assert_eq!(banks_client.get_balance(bob_pubkey).await?, 1);
422 Ok(())
423 })
424 }
425}