magic_resolver/lib.rs
1//! A utility SDK to facilitate route resolution for a subset of solana JSON-RPC requests
2
3use std::{
4 collections::HashMap,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc, RwLock,
8 },
9};
10
11use config::Configuration;
12use error::Error;
13use http::{fetch_account_state, update_account_state};
14use rpc::nonblocking::rpc_client::RpcClient;
15use scc::{hash_cache::Entry, HashCache};
16use sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction};
17use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
18use websocket::{connection::WsConnection, subscription::AccountSubscription};
19
20/// Mapping between validator(ER) identity and solana rpc client, which is
21/// configured with the URL, via which the this particular ER can be reached
22/// NOTE: we use RwLock with std::HashMap instead of concurrent HashMap, as this table is not
23/// supposed to be modified too often, so RwLock is faster for mostly read workload
24type RoutingTable = Arc<RwLock<HashMap<Pubkey, Arc<RpcClient>>>>;
25/// Limited capacity (LRU) cache, mapping between an account's
26/// pubkey and it's current delegation status as observed by resolver
27type DelegationsDB = Arc<HashCache<Pubkey, DelegationRecord>>;
28/// Conveniece wrapper for results with possible resolver errors
29type ResolverResult<T> = Result<T, Error>;
30
31const DELEGATION_PROGRAM_ID: Pubkey = ephemeral_rollups_sdk::id();
32/// The fixed size of delegation record account's data,
33/// NOTE: this value should be updated if the ABI of delegation
34/// program changes in the future, that will affect the size
35const DELEGATION_RECORD_SIZE: usize = 88;
36
37/// Connection resolver, the type is cheaply clonable and thus a single instance should be
38/// initialized and cloned between threads if necessary
39#[derive(Clone)]
40pub struct Resolver {
41 routes: RoutingTable,
42 delegations: DelegationsDB,
43 chain: Arc<RpcClient>,
44 ws: UnboundedSender<AccountSubscription>,
45}
46
47/// Delegation status of account
48#[derive(Clone, Copy)]
49pub enum DelegationStatus {
50 /// Account is delegated to validator indicated by pubkey
51 Delegated(Pubkey),
52 /// Account is available for modification on chain
53 Undelegated,
54}
55
56/// Wrapper around delegation status, with additional flag to keep track of subscription state
57struct DelegationRecord {
58 /// current delegation status of account, last observed by resolver
59 status: DelegationStatus,
60 /// indicator, whether active websocket subscription exists for account updates, to track its
61 /// delegation status
62 subscribed: Arc<AtomicBool>,
63}
64
65impl Resolver {
66 /// Initialize the resolver, by creating websocket connection
67 /// to base chain for delegation status tracking of accounts
68 pub async fn new(
69 config: Configuration,
70 routes: HashMap<Pubkey, String>,
71 ) -> ResolverResult<Self> {
72 let commitment = CommitmentConfig {
73 commitment: config.commitment,
74 };
75 let routes = routes
76 .into_iter()
77 .map(|(k, v)| (k, RpcClient::new_with_commitment(v, commitment).into()))
78 .collect();
79
80 let delegations = Arc::new(HashCache::with_capacity(128, config.cache_size.max(256)));
81 let chain = Arc::new(RpcClient::new(config.chain.to_string()));
82 let (ws, rx) = unbounded_channel();
83 let websocket =
84 WsConnection::establish(config.websocket, chain.clone(), rx, delegations.clone())
85 .await?;
86 tokio::spawn(websocket.start());
87 Ok(Self {
88 chain,
89 delegations,
90 ws,
91 routes: Arc::new(RwLock::new(routes)),
92 })
93 }
94
95 /// Start tracking account's delegation status, this is achieved by fetching delegation record
96 /// for account (if exists) and subscribing to updates of its state. The existence of
97 /// delegation record is proof that account has been delegated, and it contains critical
98 /// information like the identity of validator, to which the account was delegated
99 pub async fn track_account(&self, pubkey: Pubkey) -> ResolverResult<DelegationStatus> {
100 let chain = self.chain.clone();
101 match self.delegations.entry(pubkey) {
102 Entry::Vacant(e) => {
103 let subscribed = Arc::new(AtomicBool::default());
104 let record = DelegationRecord {
105 status: DelegationStatus::Undelegated,
106 subscribed: subscribed.clone(),
107 };
108 e.put_entry(record);
109 let db = self.delegations.clone();
110 let subscription = AccountSubscription::new(pubkey, subscribed);
111 let status = update_account_state(chain, db, pubkey).await?;
112 let _ = self.ws.send(subscription);
113 Ok(status)
114 }
115 Entry::Occupied(e) => {
116 // return cached status, only if subscription exists
117 if e.subscribed.load(Ordering::Acquire) {
118 Ok(e.status)
119 } else {
120 // otherwise refetch fresh version from chain, to avoid stale cache issue
121 fetch_account_state(chain, pubkey).await
122 }
123 }
124 }
125 }
126
127 /// Resolve connection for given account, if account has been delegated (as observed by
128 /// resolver), then the returned client is configured to connect to corresponding ER
129 /// instance, otherwise the client will connect to base layer chain
130 pub async fn resolve(&self, pubkey: &Pubkey) -> ResolverResult<Arc<RpcClient>> {
131 let status = self.resolve_status(pubkey).await?;
132 self.resolve_client(status)
133 }
134
135 /// Resolve connection for given transaction, if any of the accounts have been delegated
136 /// (as observed by resolver), then the resolver will check that all the writable accounts in
137 /// transaction have been delegated to the same ER, if validation is successful, the returned
138 /// client is configured to connect to this common ER. If none of the accounts are delegated then
139 /// the returned client is configured to connect to base layer chain. If conflict in delegation
140 /// is found, i.e. writable accounts are delegated to different ERs, then error is returned as
141 /// connection resolution is impossible for such a case.
142 pub async fn resolve_for_transaction(
143 &self,
144 tx: &Transaction,
145 ) -> ResolverResult<Arc<RpcClient>> {
146 let mut statuses = Vec::new();
147 for (i, acc) in tx.message.account_keys.iter().enumerate() {
148 if tx.message.is_maybe_writable(i, None) {
149 statuses.push(self.resolve_status(acc).await?);
150 }
151 }
152 let mut validator = None;
153 for s in statuses {
154 let DelegationStatus::Delegated(v1) = s else {
155 continue;
156 };
157 let Some(v2) = validator.replace(v1) else {
158 continue;
159 };
160 if v1 != v2 {
161 return Err(Error::Resolver(format!(
162 "transaction accounts delegated to different validators: {v1} <> {v2}"
163 )));
164 }
165 }
166 if let Some(v) = validator.map(DelegationStatus::Delegated) {
167 return self.resolve_client(v);
168 }
169 Ok(self.chain.clone())
170 }
171
172 /// Get current delegation status for account, either from cache or
173 /// from chain (if account is encoutered for the first time)
174 async fn resolve_status(&self, pubkey: &Pubkey) -> ResolverResult<DelegationStatus> {
175 if let Some(record) = self.delegations.get(pubkey) {
176 if record.get().subscribed.load(Ordering::Acquire) {
177 // only return cached status if websocket subscription exists
178 Ok(record.get().status)
179 } else {
180 // fetch from chain otherwise
181 fetch_account_state(self.chain.clone(), *pubkey).await
182 }
183 } else {
184 self.track_account(*pubkey).await
185 }
186 }
187
188 /// Depending on delegation status, return appropriate RpcClient,
189 /// which can be used to perform requests for account involved
190 fn resolve_client(&self, status: DelegationStatus) -> ResolverResult<Arc<RpcClient>> {
191 match status {
192 DelegationStatus::Delegated(validator) => {
193 let guard = self
194 .routes
195 .read()
196 // not really possible, no thread can panic while holding this lock
197 .expect("poisoned RwLock for routing table");
198 let client = guard.get(&validator).ok_or(Error::Resolver(format!(
199 "url not found for validator: {validator}"
200 )))?;
201 Ok(client.clone())
202 }
203 DelegationStatus::Undelegated => Ok(self.chain.clone()),
204 }
205 }
206}
207
208mod account;
209pub mod config;
210mod error;
211mod http;
212mod websocket;