1use base64::Engine;
2use base64::prelude::BASE64_STANDARD;
3use http_body_util::BodyExt;
4use hyper::header::{CONTENT_TYPE, HeaderValue};
5use hyper::{Method, Request};
6use solana_account::Account;
7use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding};
8use solana_clock::Slot;
9use solana_commitment_config::CommitmentConfig;
10use solana_hash::Hash;
11use solana_pubkey::Pubkey;
12use solana_rpc_client_types::config::*;
13use solana_rpc_client_types::response::*;
14use solana_signature::Signature;
15use solana_transaction::versioned::VersionedTransaction;
16use solana_transaction_status_client_types::{TransactionStatus, UiTransactionEncoding};
17
18use crate::error::{ClientError, RpcResult};
19use crate::pool::{DEFAULT_NUM_CONNECTIONS, Pool};
20
21macro_rules! request {
22 ($method:expr, [$($param:tt)*]) => {
23 serde_json::json!({
24 "jsonrpc": "2.0",
25 "id": 1,
26 "method": $method,
27 "params": [$($param)*],
28 })
29 };
30}
31
32pub struct Rpc {
34 pool: Pool,
35 commitment: CommitmentConfig,
36 endpoint: String,
37 host: String,
38 path: String,
39}
40
41impl Rpc {
42 pub async fn new(endpoint: String) -> Self {
43 let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
44 let pool = Pool::new(parsed.addr, parsed.domain, DEFAULT_NUM_CONNECTIONS).await;
45
46 Self {
47 pool,
48 endpoint,
49 commitment: CommitmentConfig::processed(),
50 host: parsed.host,
51 path: parsed.path,
52 }
53 }
54
55 pub async fn new_with_commitment(endpoint: String, commitment: CommitmentConfig) -> Self {
56 let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
57 let pool = Pool::new(parsed.addr, parsed.domain, DEFAULT_NUM_CONNECTIONS).await;
58
59 Self { pool, endpoint, commitment, host: parsed.host, path: parsed.path }
60 }
61
62 pub async fn new_with_max_connections(endpoint: String, max_connections: usize) -> Self {
63 let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
64 let pool = Pool::new(parsed.addr, parsed.domain, max_connections).await;
65
66 Self {
67 pool,
68 endpoint,
69 commitment: CommitmentConfig::processed(),
70 host: parsed.host,
71 path: parsed.path,
72 }
73 }
74
75 pub async fn new_with_commitment_and_max_connections(
76 endpoint: String,
77 commitment: CommitmentConfig,
78 max_connections: usize,
79 ) -> Self {
80 let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
81 let pool = Pool::new(parsed.addr, parsed.domain, max_connections).await;
82
83 Self { pool, endpoint, commitment, host: parsed.host, path: parsed.path }
84 }
85
86 pub fn endpoint(&self) -> &str {
87 &self.endpoint
88 }
89
90 async fn send(&self, request: serde_json::Value) -> RpcResult<serde_json::Value> {
91 let body = serde_json::to_string(&request)?;
92 let req = Request::builder()
93 .method(Method::POST)
94 .uri(&self.path)
95 .header("Host", &self.host)
96 .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
97 .body(body)
98 .unwrap();
99 let conn = self.pool.get_connection().await;
100 let response = conn.send_request(req).await?;
101 let body = response.into_body().collect().await?.to_bytes();
102 serde_json::from_slice(&body).map_err(|e| crate::error::ClientError::SerdeError(e.into()))
103 }
104
105 fn parse_response_raw<T>(mut response_raw: serde_json::Value, msg: &'static str) -> RpcResult<T>
106 where
107 T: serde::de::DeserializeOwned,
108 {
109 let response_value = response_raw
110 .as_object_mut()
111 .and_then(|obj| obj.remove("result"))
112 .ok_or_else(|| ClientError::ForUser(msg))?;
113 serde_json::from_value::<T>(response_value).map_err(|e| ClientError::SerdeError(e.into()))
114 }
115
116 pub async fn get_account(
117 &self,
118 account: &Pubkey,
119 config: Option<RpcAccountInfoConfig>,
120 ) -> RpcResult<Response<Option<Account>>> {
121 let config = config.unwrap_or(RpcAccountInfoConfig {
122 encoding: Some(UiAccountEncoding::Base64),
123 data_slice: None,
124 commitment: Some(self.commitment),
125 min_context_slot: None,
126 });
127
128 let request = request!("getAccountInfo", [account.to_string(), config]);
129
130 let response_raw = self.send(request).await?;
131 let response = Self::parse_response_raw::<Response<Option<UiAccount>>>(
132 response_raw,
133 "Account {account} not found",
134 )?;
135 let account = response.value.and_then(|acc| acc.decode());
136
137 Ok(Response { context: response.context, value: account })
138 }
139
140 pub async fn get_multiple_accounts(
141 &self,
142 accounts: &[Pubkey],
143 config: Option<RpcAccountInfoConfig>,
144 ) -> RpcResult<Response<Vec<Option<Account>>>> {
145 let config = config.unwrap_or(RpcAccountInfoConfig {
146 encoding: Some(UiAccountEncoding::Base64),
147 data_slice: None,
148 commitment: Some(self.commitment),
149 min_context_slot: None,
150 });
151
152 let request = request!("getMultipleAccounts", [accounts, config]);
153
154 let response_raw = self.send(request).await?;
155 let response = Self::parse_response_raw::<Response<Vec<UiAccount>>>(
156 response_raw,
157 "Accounts not found",
158 )?;
159 let accounts = response
160 .value
161 .into_iter()
162 .map(|account| account.decode())
163 .collect::<Vec<_>>();
164
165 Ok(Response { context: response.context, value: accounts })
166 }
167
168 pub async fn get_program_accounts(
169 &self,
170 program_id: &Pubkey,
171 config: Option<RpcProgramAccountsConfig>,
172 ) -> RpcResult<Vec<(Pubkey, Account)>> {
173 let config = config.unwrap_or(RpcProgramAccountsConfig {
174 filters: None,
175 account_config: RpcAccountInfoConfig {
176 encoding: Some(UiAccountEncoding::Base64),
177 data_slice: None,
178 commitment: Some(self.commitment),
179 min_context_slot: None,
180 },
181 with_context: None,
182 sort_results: None,
183 });
184
185 let request = request!("getProgramAccounts", [program_id.to_string(), config]);
186
187 let response_raw = self.send(request).await?;
188 let response = Self::parse_response_raw::<Response<Vec<(Pubkey, UiAccount)>>>(
189 response_raw,
190 "Accounts not found",
191 )?;
192 let accounts = response
193 .value
194 .into_iter()
195 .filter_map(|(pubkey, ui_account)| match ui_account.decode() {
196 Some(account) => Some((pubkey, account)),
197 None => None,
198 })
199 .collect::<Vec<(Pubkey, Account)>>();
200 Ok(accounts)
201 }
202
203 pub async fn get_slot(&self, commitment: Option<CommitmentConfig>) -> RpcResult<Slot> {
204 let request = request!("getSlot", [commitment.unwrap_or(self.commitment)]);
205
206 let response_raw = self.send(request).await?;
207 Self::parse_response_raw::<Slot>(response_raw, "Failed to get slot")
208 }
209
210 pub async fn get_latest_blockhash(&self) -> RpcResult<Hash> {
211 let request = request!("getLatestBlockhash", [self.commitment]);
212
213 let response_raw = self.send(request).await?;
214 let blockhash = Self::parse_response_raw::<Response<RpcBlockhash>>(
215 response_raw,
216 "Failed to get latest blockhash",
217 )?
218 .value
219 .blockhash;
220
221 Ok(blockhash.parse().map_err(|_| {
222 ClientError::ForUser("Failed to parse blockhash as a valid base58 string")
223 })?)
224 }
225
226 pub async fn get_signature_statuses(
227 &self,
228 signatures: &[Signature],
229 search_transaction_history: bool,
230 ) -> RpcResult<Vec<Option<TransactionStatus>>> {
231 let request = request!("getSignatureStatuses", [
232 signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
233 { "searchTransactionHistory": search_transaction_history }
234 ]);
235
236 let response_raw = self.send(request).await?;
237 let response = Self::parse_response_raw::<Response<Vec<Option<TransactionStatus>>>>(
238 response_raw,
239 "Failed to get signature statuses",
240 )?;
241
242 Ok(response.value)
243 }
244
245 pub async fn simulate_transaction(
246 &self,
247 transaction: &VersionedTransaction,
248 config: Option<RpcSimulateTransactionConfig>,
249 ) -> RpcResult<RpcSimulateTransactionResult> {
250 let config = config.unwrap_or(RpcSimulateTransactionConfig {
251 encoding: Some(UiTransactionEncoding::Base64),
252 commitment: Some(self.commitment),
253 sig_verify: false,
254 replace_recent_blockhash: false,
255 accounts: None,
256 min_context_slot: None,
257 inner_instructions: false,
258 });
259 let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
260
261 let transaction = serialize_transaction(transaction, encoding)?;
262 let request = request!("simulateTransaction", [transaction, config]);
263
264 let response_raw = self.send(request).await?;
265 let response = Self::parse_response_raw::<RpcSimulateTransactionResult>(
266 response_raw,
267 "Failed to simulate transaction",
268 )?;
269 Ok(response)
270 }
271}
272
273#[derive(Debug)]
274struct ParsedEndpoint {
275 addr: String,
276 domain: String,
277 host: String,
278 path: String,
279}
280
281fn parse_endpoint(url: &str) -> Result<ParsedEndpoint, Box<dyn std::error::Error>> {
282 let url = url
283 .strip_prefix("https://")
284 .or_else(|| url.strip_prefix("http://"))
285 .unwrap_or(url);
286
287 let (host_part, path_part) =
288 if let Some(idx) = url.find('/') { (&url[..idx], &url[idx..]) } else { (url, "/") };
289
290 let (domain, port) =
291 if let Some((d, p)) = host_part.split_once(':') { (d, p) } else { (host_part, "443") };
292
293 Ok(ParsedEndpoint {
294 addr: format!("{}:{}", domain, port),
295 domain: domain.to_string(),
296 host: domain.to_string(),
297 path: path_part.to_string(),
298 })
299}
300
301fn serialize_transaction(
302 input: &VersionedTransaction,
303 encoding: UiTransactionEncoding,
304) -> RpcResult<String> {
305 let serialized_transaction =
306 bincode::serialize(input).map_err(|e| ClientError::BincodeError(e.into()))?;
307
308 match encoding {
309 UiTransactionEncoding::Base58 => Ok(bs58::encode(serialized_transaction).into_string()),
310 UiTransactionEncoding::Base64 => Ok(BASE64_STANDARD.encode(serialized_transaction)),
311 _ => Err(ClientError::ForUser(
312 "Unsupported transaction encoding: {encoding:?}, Supported encodings are Base58 and \
313 Base64.",
314 )),
315 }
316}