Skip to main content

magic_resolver/
lib.rs

1//! A utility SDK to facilitate route resolution for a subset of solana JSON-RPC requests
2
3use std::{
4    collections::HashMap,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc,
8    },
9};
10
11use parking_lot::RwLock;
12
13use config::Configuration;
14use error::Error;
15use http::{fetch_account_state, fetch_domain_records, update_account_state};
16use rpc::nonblocking::rpc_client::RpcClient;
17use scc::{hash_cache::Entry, HashCache};
18use sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
20use websocket::{
21    connection::{delegations::WsDelegationsConnection, routes::WsRoutesConnection},
22    subscription::AccountSubscription,
23};
24
25/// Mapping between validator(ER) identity and solana rpc client, which is
26/// configured with the URL, via which the this particular ER can be reached
27/// NOTE: we use RwLock with std::HashMap instead of concurrent HashMap, as this table is not
28/// supposed to be modified too often, so RwLock is faster for mostly read workload
29type RoutingTable = Arc<RwLock<HashMap<Pubkey, Arc<RpcClient>>>>;
30/// Limited capacity (LRU) cache, mapping between an account's
31/// pubkey and it's current delegation status as observed by resolver
32type DelegationsDB = Arc<HashCache<Pubkey, DelegationRecord>>;
33/// Conveniece wrapper for results with possible resolver errors
34type ResolverResult<T> = Result<T, Error>;
35
36const DELEGATION_PROGRAM_ID: Pubkey = ephemeral_rollups_sdk::id();
37/// The fixed size of delegation record account's data,
38/// NOTE: this value should be updated if the ABI of delegation
39/// program changes in the future, that will affect the size
40const DELEGATION_RECORD_SIZE: usize = 88;
41
42/// Connection resolver, the type is cheaply clonable and thus a single instance should be
43/// initialized and cloned between threads if necessary
44#[derive(Clone)]
45pub struct Resolver {
46    routes: RoutingTable,
47    delegations: DelegationsDB,
48    chain: Arc<RpcClient>,
49    delegations_tx: UnboundedSender<AccountSubscription>,
50}
51
52/// Delegation status of account
53#[derive(Clone, Copy)]
54pub enum DelegationStatus {
55    /// Account is delegated to validator indicated by pubkey
56    Delegated(Pubkey),
57    /// Account is available for modification on chain
58    Undelegated,
59}
60
61/// Wrapper around delegation status, with additional flag to keep track of subscription state
62struct DelegationRecord {
63    /// current delegation status of account, last observed by resolver
64    status: DelegationStatus,
65    /// indicator, whether active websocket subscription exists for account updates, to track its
66    /// delegation status
67    subscribed: Arc<AtomicBool>,
68}
69
70impl Resolver {
71    /// Initialize the resolver with the provided configuration
72    pub async fn new(config: Configuration) -> ResolverResult<Self> {
73        Self::new_custom(config, true, None).await
74    }
75    /// Initialize the resolver:
76    /// 1. fetch routes from chain (if use_on_chain_routes is true)
77    /// 2. add custom routes to routing table, if any
78    /// 3. subscribe to on-chain route updates (if use_on_chain_routes is true)
79    /// 4. creating websocket connection to base chain for delegation status tracking of accounts
80    pub async fn new_custom(
81        config: Configuration,
82        use_on_chain_routes: bool,
83        custom_routes: Option<HashMap<Pubkey, String>>,
84    ) -> ResolverResult<Self> {
85        let commitment = CommitmentConfig {
86            commitment: config.commitment,
87        };
88        let chain = Arc::new(RpcClient::new(config.chain.to_string()));
89
90        let mut routes: HashMap<_, _> = if use_on_chain_routes {
91            fetch_domain_records(&chain)
92                .await?
93                .into_iter()
94                .map(|record| {
95                    (
96                        *record.identity(),
97                        RpcClient::new_with_commitment(record.addr().to_string(), commitment)
98                            .into(),
99                    )
100                })
101                .collect()
102        } else {
103            Default::default()
104        };
105
106        routes.extend(
107            custom_routes
108                .into_iter()
109                .flatten()
110                .map(|(k, v)| (k, RpcClient::new_with_commitment(v, commitment).into())),
111        );
112
113        let routes = Arc::new(RwLock::new(routes));
114
115        let delegations = Arc::new(HashCache::with_capacity(128, config.cache_size.max(256)));
116        let (delegations_tx, rx) = unbounded_channel();
117        let delegations_ws = WsDelegationsConnection::establish(
118            config.websocket.clone(),
119            chain.clone(),
120            rx,
121            delegations.clone(),
122        )
123        .await?;
124
125        tokio::spawn(delegations_ws.start());
126
127        if use_on_chain_routes {
128            let routes_ws =
129                WsRoutesConnection::establish(config.websocket, chain.clone(), routes.clone())
130                    .await?;
131            tokio::spawn(routes_ws.start());
132        }
133
134        Ok(Self {
135            chain,
136            delegations,
137            delegations_tx,
138            routes,
139        })
140    }
141
142    /// Start tracking account's delegation status, this is achieved by fetching the delegation
143    /// record for the account (if it exists) and subscribing to updates of its state. The existence
144    /// of the delegation record is a proof that account has been delegated, and it contains critical
145    /// information like the identity of validator, to which the account was delegated
146    pub async fn track_account(&self, pubkey: Pubkey) -> ResolverResult<DelegationStatus> {
147        let chain = self.chain.clone();
148        match self.delegations.entry(pubkey) {
149            Entry::Vacant(e) => {
150                let subscribed = Arc::new(AtomicBool::default());
151                let record = DelegationRecord {
152                    status: DelegationStatus::Undelegated,
153                    subscribed: subscribed.clone(),
154                };
155                e.put_entry(record);
156                let db = self.delegations.clone();
157                let subscription = AccountSubscription::new(pubkey, subscribed);
158                let status = update_account_state(chain, db, pubkey).await?;
159                let _ = self.delegations_tx.send(subscription);
160                Ok(status)
161            }
162            Entry::Occupied(e) => {
163                // return cached status, only if subscription exists
164                if e.subscribed.load(Ordering::Acquire) {
165                    Ok(e.status)
166                } else {
167                    // otherwise refetch fresh version from chain, to avoid stale cache issue
168                    fetch_account_state(chain, pubkey).await
169                }
170            }
171        }
172    }
173
174    /// Resolve connection for given account, if account has been delegated (as observed by
175    /// resolver), then the returned client is configured to connect to corresponding ER
176    /// instance, otherwise the client will connect to base layer chain
177    pub async fn resolve(&self, pubkey: &Pubkey) -> ResolverResult<Arc<RpcClient>> {
178        let status = self.resolve_status(pubkey).await?;
179        self.resolve_client(status)
180    }
181
182    /// Resolve connection for given transaction, if any of the accounts have been delegated
183    /// (as observed by resolver), then the resolver will check that all the writable accounts in
184    /// transaction have been delegated to the same ER, if validation is successful, the returned
185    /// client is configured to connect to this common ER. If none of the accounts are delegated then
186    /// the returned client is configured to connect to base layer chain. If conflict in delegation
187    /// is found, i.e. writable accounts are delegated to different ERs, then error is returned as
188    /// connection resolution is impossible for such a case.
189    pub async fn resolve_for_transaction(
190        &self,
191        tx: &Transaction,
192    ) -> ResolverResult<Arc<RpcClient>> {
193        let mut statuses = Vec::new();
194        for (i, acc) in tx.message.account_keys.iter().enumerate() {
195            if tx.message.is_maybe_writable(i, None) {
196                statuses.push(self.resolve_status(acc).await?);
197            }
198        }
199        let mut validator = None;
200        for s in statuses {
201            let DelegationStatus::Delegated(v1) = s else {
202                continue;
203            };
204            let Some(v2) = validator.replace(v1) else {
205                continue;
206            };
207            if v1 != v2 {
208                return Err(Error::Resolver(format!(
209                    "transaction accounts delegated to different validators: {v1} <> {v2}"
210                )));
211            }
212        }
213        if let Some(v) = validator.map(DelegationStatus::Delegated) {
214            return self.resolve_client(v);
215        }
216        Ok(self.chain.clone())
217    }
218
219    /// Get current delegation status for account, either from cache or
220    /// from chain (if account is encoutered for the first time)
221    async fn resolve_status(&self, pubkey: &Pubkey) -> ResolverResult<DelegationStatus> {
222        if let Some(record) = self.delegations.get(pubkey) {
223            if record.get().subscribed.load(Ordering::Acquire) {
224                // only return cached status if websocket subscription exists
225                Ok(record.get().status)
226            } else {
227                // fetch from chain otherwise
228                fetch_account_state(self.chain.clone(), *pubkey).await
229            }
230        } else {
231            self.track_account(*pubkey).await
232        }
233    }
234
235    /// Depending on delegation status, return appropriate RpcClient,
236    /// which can be used to perform requests for account involved
237    fn resolve_client(&self, status: DelegationStatus) -> ResolverResult<Arc<RpcClient>> {
238        match status {
239            DelegationStatus::Delegated(validator) => {
240                let guard = self.routes.read();
241                let client = guard.get(&validator).ok_or(Error::Resolver(format!(
242                    "url not found for validator: {validator}"
243                )))?;
244                Ok(client.clone())
245            }
246            DelegationStatus::Undelegated => Ok(self.chain.clone()),
247        }
248    }
249}
250
251mod account;
252pub mod config;
253mod error;
254mod http;
255mod websocket;