quasar_rpc/
client.rs

1use base64::Engine;
2use base64::prelude::BASE64_STANDARD;
3use http_body_util::BodyExt;
4use hyper::header::{CONTENT_TYPE, HeaderValue};
5use hyper::{Method, Request};
6use solana_account::Account;
7use solana_account_decoder_client_types::{UiAccount, UiAccountEncoding};
8use solana_clock::Slot;
9use solana_commitment_config::CommitmentConfig;
10use solana_hash::Hash;
11use solana_pubkey::Pubkey;
12use solana_rpc_client_types::config::*;
13use solana_rpc_client_types::response::*;
14use solana_signature::Signature;
15use solana_transaction::versioned::VersionedTransaction;
16use solana_transaction_status_client_types::{TransactionStatus, UiTransactionEncoding};
17
18use crate::error::{ClientError, RpcResult};
19use crate::pool::{DEFAULT_NUM_CONNECTIONS, Pool};
20
21macro_rules! request {
22    ($method:expr, [$($param:tt)*]) => {
23        serde_json::json!({
24            "jsonrpc": "2.0",
25            "id": 1,
26            "method": $method,
27            "params": [$($param)*],
28        })
29    };
30}
31
32/// The default `CommitmentConfig` is `Processed` and the default number of connections is `2`.
33pub struct Rpc {
34    pool: Pool,
35    commitment: CommitmentConfig,
36    endpoint: String,
37    host: String,
38    path: String,
39}
40
41impl Rpc {
42    pub async fn new(endpoint: String) -> Self {
43        let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
44        let pool = Pool::new(parsed.addr, parsed.domain, DEFAULT_NUM_CONNECTIONS).await;
45
46        Self {
47            pool,
48            endpoint,
49            commitment: CommitmentConfig::processed(),
50            host: parsed.host,
51            path: parsed.path,
52        }
53    }
54
55    pub async fn new_with_commitment(endpoint: String, commitment: CommitmentConfig) -> Self {
56        let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
57        let pool = Pool::new(parsed.addr, parsed.domain, DEFAULT_NUM_CONNECTIONS).await;
58
59        Self { pool, endpoint, commitment, host: parsed.host, path: parsed.path }
60    }
61
62    pub async fn new_with_max_connections(endpoint: String, max_connections: usize) -> Self {
63        let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
64        let pool = Pool::new(parsed.addr, parsed.domain, max_connections).await;
65
66        Self {
67            pool,
68            endpoint,
69            commitment: CommitmentConfig::processed(),
70            host: parsed.host,
71            path: parsed.path,
72        }
73    }
74
75    pub async fn new_with_commitment_and_max_connections(
76        endpoint: String,
77        commitment: CommitmentConfig,
78        max_connections: usize,
79    ) -> Self {
80        let parsed = parse_endpoint(&endpoint).expect("Invalid endpoint format");
81        let pool = Pool::new(parsed.addr, parsed.domain, max_connections).await;
82
83        Self { pool, endpoint, commitment, host: parsed.host, path: parsed.path }
84    }
85
86    pub fn endpoint(&self) -> &str {
87        &self.endpoint
88    }
89
90    async fn send(&self, request: serde_json::Value) -> RpcResult<serde_json::Value> {
91        let body = serde_json::to_string(&request)?;
92        let req = Request::builder()
93            .method(Method::POST)
94            .uri(&self.path)
95            .header("Host", &self.host)
96            .header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
97            .body(body)
98            .unwrap();
99        let conn = self.pool.get_connection().await;
100        let response = conn.send_request(req).await?;
101        let body = response.into_body().collect().await?.to_bytes();
102        serde_json::from_slice(&body).map_err(|e| crate::error::ClientError::SerdeError(e.into()))
103    }
104
105    fn parse_response_raw<T>(mut response_raw: serde_json::Value, msg: &'static str) -> RpcResult<T>
106    where
107        T: serde::de::DeserializeOwned,
108    {
109        let response_value = response_raw
110            .as_object_mut()
111            .and_then(|obj| obj.remove("result"))
112            .ok_or_else(|| ClientError::ForUser(msg))?;
113        serde_json::from_value::<T>(response_value).map_err(|e| ClientError::SerdeError(e.into()))
114    }
115
116    pub async fn get_account(
117        &self,
118        account: &Pubkey,
119        config: Option<RpcAccountInfoConfig>,
120    ) -> RpcResult<Response<Option<Account>>> {
121        let config = config.unwrap_or(RpcAccountInfoConfig {
122            encoding: Some(UiAccountEncoding::Base64),
123            data_slice: None,
124            commitment: Some(self.commitment),
125            min_context_slot: None,
126        });
127
128        let request = request!("getAccountInfo", [account.to_string(), config]);
129
130        let response_raw = self.send(request).await?;
131        let response = Self::parse_response_raw::<Response<Option<UiAccount>>>(
132            response_raw,
133            "Account {account} not found",
134        )?;
135        let account = response.value.and_then(|acc| acc.decode());
136
137        Ok(Response { context: response.context, value: account })
138    }
139
140    pub async fn get_multiple_accounts(
141        &self,
142        accounts: &[Pubkey],
143        config: Option<RpcAccountInfoConfig>,
144    ) -> RpcResult<Response<Vec<Option<Account>>>> {
145        let config = config.unwrap_or(RpcAccountInfoConfig {
146            encoding: Some(UiAccountEncoding::Base64),
147            data_slice: None,
148            commitment: Some(self.commitment),
149            min_context_slot: None,
150        });
151
152        let request = request!("getMultipleAccounts", [accounts, config]);
153
154        let response_raw = self.send(request).await?;
155        let response = Self::parse_response_raw::<Response<Vec<UiAccount>>>(
156            response_raw,
157            "Accounts not found",
158        )?;
159        let accounts = response
160            .value
161            .into_iter()
162            .map(|account| account.decode())
163            .collect::<Vec<_>>();
164
165        Ok(Response { context: response.context, value: accounts })
166    }
167
168    pub async fn get_program_accounts(
169        &self,
170        program_id: &Pubkey,
171        config: Option<RpcProgramAccountsConfig>,
172    ) -> RpcResult<Vec<(Pubkey, Account)>> {
173        let config = config.unwrap_or(RpcProgramAccountsConfig {
174            filters: None,
175            account_config: RpcAccountInfoConfig {
176                encoding: Some(UiAccountEncoding::Base64),
177                data_slice: None,
178                commitment: Some(self.commitment),
179                min_context_slot: None,
180            },
181            with_context: None,
182            sort_results: None,
183        });
184
185        let request = request!("getProgramAccounts", [program_id.to_string(), config]);
186
187        let response_raw = self.send(request).await?;
188        let response = Self::parse_response_raw::<Response<Vec<(Pubkey, UiAccount)>>>(
189            response_raw,
190            "Accounts not found",
191        )?;
192        let accounts = response
193            .value
194            .into_iter()
195            .filter_map(|(pubkey, ui_account)| match ui_account.decode() {
196                Some(account) => Some((pubkey, account)),
197                None => None,
198            })
199            .collect::<Vec<(Pubkey, Account)>>();
200        Ok(accounts)
201    }
202
203    pub async fn get_slot(&self, commitment: Option<CommitmentConfig>) -> RpcResult<Slot> {
204        let request = request!("getSlot", [commitment.unwrap_or(self.commitment)]);
205
206        let response_raw = self.send(request).await?;
207        Self::parse_response_raw::<Slot>(response_raw, "Failed to get slot")
208    }
209
210    pub async fn get_latest_blockhash(&self) -> RpcResult<Hash> {
211        let request = request!("getLatestBlockhash", [self.commitment]);
212
213        let response_raw = self.send(request).await?;
214        let blockhash = Self::parse_response_raw::<Response<RpcBlockhash>>(
215            response_raw,
216            "Failed to get latest blockhash",
217        )?
218        .value
219        .blockhash;
220
221        Ok(blockhash.parse().map_err(|_| {
222            ClientError::ForUser("Failed to parse blockhash as a valid base58 string")
223        })?)
224    }
225
226    pub async fn get_signature_statuses(
227        &self,
228        signatures: &[Signature],
229        search_transaction_history: bool,
230    ) -> RpcResult<Vec<Option<TransactionStatus>>> {
231        let request = request!("getSignatureStatuses", [
232            signatures.iter().map(|s| s.to_string()).collect::<Vec<_>>(),
233            { "searchTransactionHistory": search_transaction_history }
234        ]);
235
236        let response_raw = self.send(request).await?;
237        let response = Self::parse_response_raw::<Response<Vec<Option<TransactionStatus>>>>(
238            response_raw,
239            "Failed to get signature statuses",
240        )?;
241
242        Ok(response.value)
243    }
244
245    pub async fn simulate_transaction(
246        &self,
247        transaction: &VersionedTransaction,
248        config: Option<RpcSimulateTransactionConfig>,
249    ) -> RpcResult<RpcSimulateTransactionResult> {
250        let config = config.unwrap_or(RpcSimulateTransactionConfig {
251            encoding: Some(UiTransactionEncoding::Base64),
252            commitment: Some(self.commitment),
253            sig_verify: false,
254            replace_recent_blockhash: false,
255            accounts: None,
256            min_context_slot: None,
257            inner_instructions: false,
258        });
259        let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
260
261        let transaction = serialize_transaction(transaction, encoding)?;
262        let request = request!("simulateTransaction", [transaction, config]);
263
264        let response_raw = self.send(request).await?;
265        let response = Self::parse_response_raw::<RpcSimulateTransactionResult>(
266            response_raw,
267            "Failed to simulate transaction",
268        )?;
269        Ok(response)
270    }
271}
272
273#[derive(Debug)]
274struct ParsedEndpoint {
275    addr: String,
276    domain: String,
277    host: String,
278    path: String,
279}
280
281fn parse_endpoint(url: &str) -> Result<ParsedEndpoint, Box<dyn std::error::Error>> {
282    let url = url
283        .strip_prefix("https://")
284        .or_else(|| url.strip_prefix("http://"))
285        .unwrap_or(url);
286
287    let (host_part, path_part) =
288        if let Some(idx) = url.find('/') { (&url[..idx], &url[idx..]) } else { (url, "/") };
289
290    let (domain, port) =
291        if let Some((d, p)) = host_part.split_once(':') { (d, p) } else { (host_part, "443") };
292
293    Ok(ParsedEndpoint {
294        addr: format!("{}:{}", domain, port),
295        domain: domain.to_string(),
296        host: domain.to_string(),
297        path: path_part.to_string(),
298    })
299}
300
301fn serialize_transaction(
302    input: &VersionedTransaction,
303    encoding: UiTransactionEncoding,
304) -> RpcResult<String> {
305    let serialized_transaction =
306        bincode::serialize(input).map_err(|e| ClientError::BincodeError(e.into()))?;
307
308    match encoding {
309        UiTransactionEncoding::Base58 => Ok(bs58::encode(serialized_transaction).into_string()),
310        UiTransactionEncoding::Base64 => Ok(BASE64_STANDARD.encode(serialized_transaction)),
311        _ => Err(ClientError::ForUser(
312            "Unsupported transaction encoding: {encoding:?}, Supported encodings are Base58 and \
313             Base64.",
314        )),
315    }
316}