1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
use std::{future::Future, time::Duration};
use futures::{
stream::{self, BufferUnordered},
StreamExt,
};
use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig};
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
};
use solana_program::pubkey::Pubkey;
use solana_sdk::{account::Account, commitment_config::CommitmentConfig};
use crate::{try_idl_address, ChainsawResult, IdlProvider};
use super::types::*;
const TIMEOUT: Duration = Duration::from_secs(180);
#[derive(Default)]
pub struct AccountClientConfig {
commitment: CommitmentConfig,
}
pub struct AccountClient {
client: RpcClient,
program_id: Pubkey,
anchor_idl_address: Pubkey,
shank_idl_address: Pubkey,
config: AccountClientConfig,
}
impl AccountClient {
/// Creates a new account client.
///
/// - [cluster] the cluster to fetch accounts from
/// - [program_id] the program id to fetch accounts for
/// - [config] specifies the commitment level to use when fetching accounts
pub fn try_from(
cluster: Cluster,
program_id: Pubkey,
config: AccountClientConfig,
) -> ChainsawResult<Self> {
let client = RpcClient::new_with_timeout(
cluster.endpoint().to_string(),
TIMEOUT,
);
let anchor_idl_address =
try_idl_address(&IdlProvider::Anchor, &program_id)?;
let shank_idl_address =
try_idl_address(&IdlProvider::Shank, &program_id)?;
Ok(Self {
client,
program_id,
anchor_idl_address,
shank_idl_address,
config,
})
}
/// Fetches all accounts for the program excluding data.
/// This should be used to quickly get all addresses of accounts for a certain program.
pub async fn all_account_addresses(&self) -> ChainsawResult<Vec<Pubkey>> {
let empty_acc_config = RpcAccountInfoConfig {
data_slice: Some(UiDataSliceConfig {
offset: 0,
length: 0,
}),
// base64 encoding needed for accounts > 128 bytes
encoding: Some(UiAccountEncoding::Base64),
..Default::default()
};
// NOTE(thlorenz): here we could include a memcmp to only fetch accounts that start with
// one of the discriminators known for the specified program.
// While this would require more work on the validator (compare) it would exclude accounts
// that we cannot parse.
// Thus if we encounter lots of invalid accounts, i.e. 1 byte long for auction house we
// should think about implementing this option.
let all_acc_addressess = self
.client
.get_program_accounts_with_config(
&self.program_id,
RpcProgramAccountsConfig {
filters: None,
account_config: empty_acc_config,
..Default::default()
},
)
.await?
.into_iter()
.map(|(pubkey, _)| pubkey)
.collect();
Ok(all_acc_addressess)
}
/// Fetches the account including data from the RPC client for the provided cluster.
/// - [pubkey] the address of the account to fetch
pub async fn fetch_account(
&self,
pubkey: &Pubkey,
) -> ChainsawResult<Option<Account>> {
Ok(self
.client
.get_account_with_commitment(pubkey, self.config.commitment)
.await?
.value)
}
/// Creates an iterator of [future::Future]s that fetches accounts from the RPC client.
///
/// 1. Fetches all account addresses for the program
/// 2. For each address it creates a future that will fetch the account when awaited
/// 3. Returns an iterator of those futures
///
/// - [include_idl_accounts] whether to include accounts storing the program IDL
pub async fn iter_accounts(
&self,
include_idl_accounts: bool,
) -> ChainsawResult<
impl Iterator<Item = impl Future<Output = (Pubkey, Option<Account>)> + '_>,
> {
let addresses: Vec<Pubkey> = self
.all_account_addresses()
.await?
.into_iter()
.filter(move |x| include_idl_accounts || !self.is_idl_account(x))
.collect::<Vec<_>>();
let iter = self.fetch_accounts_iterator(addresses);
Ok(iter)
}
/// Same as [iter_accounts] but returns a stream instead of an iterator which will invoke the
/// futures concurrently.
/// This should be preferred over [iter_accounts] if you want to fetch a lot of accounts in
/// order to mitigate latency of the RPC client.
///
/// # Example
///
/// ```
/// account_client
/// .iter_accounts_with_concurrency(false, 50)
/// .await
/// .unwrap()
/// .for_each(|(address, account)| {
/// process_account(
/// address,
/// account,
/// &program_id,
/// )
/// })
/// .await;
/// ```
///
/// - [include_idl_accounts] whether to include accounts storing the program IDL
/// - [concurrency] the number of concurrent requests to make to the RPC client
pub async fn iter_accounts_with_concurrency(
&self,
include_idl_accounts: bool,
concurrency: usize,
) -> ChainsawResult<
BufferUnordered<
impl StreamExt<
Item = impl Future<Output = (Pubkey, Option<Account>)> + '_,
>,
>,
> {
let iter = self.iter_accounts(include_idl_accounts).await?;
let s = stream::iter(iter).buffer_unordered(concurrency);
Ok(s)
}
fn fetch_accounts_iterator(
&self,
addresses: Vec<Pubkey>,
) -> impl Iterator<Item = impl Future<Output = (Pubkey, Option<Account>)> + '_>
+ '_ {
addresses.into_iter().map(move |address| async move {
let acc = self.fetch_account(&address).await.ok().flatten();
(address, acc)
})
}
/// Deterimies if the provided address is an account holding this program's IDL.
pub fn is_idl_account(&self, address: &Pubkey) -> bool {
address.eq(&self.anchor_idl_address)
|| address.eq(&self.shank_idl_address)
}
}