magic_resolver/
lib.rs

1//! A utility SDK to facilitate route resolution for a subset of solana JSON-RPC requests
2
3use std::{
4    collections::HashMap,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc, RwLock,
8    },
9};
10
11use config::Configuration;
12use error::Error;
13use http::{fetch_account_state, update_account_state};
14use rpc::nonblocking::rpc_client::RpcClient;
15use scc::{hash_cache::Entry, HashCache};
16use sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction};
17use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
18use websocket::{connection::WsConnection, subscription::AccountSubscription};
19
20/// Mapping between validator(ER) identity and solana rpc client, which is
21/// configured with the URL, via which the this particular ER can be reached
22/// NOTE: we use RwLock with std::HashMap instead of concurrent HashMap, as this table is not
23/// supposed to be modified too often, so RwLock is faster for mostly read workload
24type RoutingTable = Arc<RwLock<HashMap<Pubkey, Arc<RpcClient>>>>;
25/// Limited capacity (LRU) cache, mapping between an account's
26/// pubkey and it's current delegation status as observed by resolver
27type DelegationsDB = Arc<HashCache<Pubkey, DelegationRecord>>;
28/// Conveniece wrapper for results with possible resolver errors
29type ResolverResult<T> = Result<T, Error>;
30
31const DELEGATION_PROGRAM_ID: Pubkey = ephemeral_rollups_sdk::id();
32/// The fixed size of delegation record account's data,
33/// NOTE: this value should be updated if the ABI of delegation
34/// program changes in the future, that will affect the size
35const DELEGATION_RECORD_SIZE: usize = 88;
36
37/// Connection resolver, the type is cheaply clonable and thus a single instance should be
38/// initialized and cloned between threads if necessary
39#[derive(Clone)]
40pub struct Resolver {
41    routes: RoutingTable,
42    delegations: DelegationsDB,
43    chain: Arc<RpcClient>,
44    ws: UnboundedSender<AccountSubscription>,
45}
46
47/// Delegation status of account
48#[derive(Clone, Copy)]
49pub enum DelegationStatus {
50    /// Account is delegated to validator indicated by pubkey
51    Delegated(Pubkey),
52    /// Account is available for modification on chain
53    Undelegated,
54}
55
56/// Wrapper around delegation status, with additional flag to keep track of subscription state
57struct DelegationRecord {
58    /// current delegation status of account, last observed by resolver
59    status: DelegationStatus,
60    /// indicator, whether active websocket subscription exists for account updates, to track its
61    /// delegation status
62    subscribed: Arc<AtomicBool>,
63}
64
65impl Resolver {
66    /// Initialize the resolver, by creating websocket connection
67    /// to base chain for delegation status tracking of accounts
68    pub async fn new(
69        config: Configuration,
70        routes: HashMap<Pubkey, String>,
71    ) -> ResolverResult<Self> {
72        let commitment = CommitmentConfig {
73            commitment: config.commitment,
74        };
75        let routes = routes
76            .into_iter()
77            .map(|(k, v)| (k, RpcClient::new_with_commitment(v, commitment).into()))
78            .collect();
79
80        let delegations = Arc::new(HashCache::with_capacity(128, config.cache_size.max(256)));
81        let chain = Arc::new(RpcClient::new(config.chain.to_string()));
82        let (ws, rx) = unbounded_channel();
83        let websocket =
84            WsConnection::establish(config.websocket, chain.clone(), rx, delegations.clone())
85                .await?;
86        tokio::spawn(websocket.start());
87        Ok(Self {
88            chain,
89            delegations,
90            ws,
91            routes: Arc::new(RwLock::new(routes)),
92        })
93    }
94
95    /// Start tracking account's delegation status, this is achieved by fetching delegation record
96    /// for account (if exists) and subscribing to updates of its state. The existence of
97    /// delegation record is proof that account has been delegated, and it contains critical
98    /// information like the identity of validator, to which the account was delegated
99    pub async fn track_account(&self, pubkey: Pubkey) -> ResolverResult<DelegationStatus> {
100        let chain = self.chain.clone();
101        match self.delegations.entry(pubkey) {
102            Entry::Vacant(e) => {
103                let subscribed = Arc::new(AtomicBool::default());
104                let record = DelegationRecord {
105                    status: DelegationStatus::Undelegated,
106                    subscribed: subscribed.clone(),
107                };
108                e.put_entry(record);
109                let db = self.delegations.clone();
110                let subscription = AccountSubscription::new(pubkey, subscribed);
111                let status = update_account_state(chain, db, pubkey).await?;
112                let _ = self.ws.send(subscription);
113                Ok(status)
114            }
115            Entry::Occupied(e) => {
116                // return cached status, only if subscription exists
117                if e.subscribed.load(Ordering::Acquire) {
118                    Ok(e.status)
119                } else {
120                    // otherwise refetch fresh version from chain, to avoid stale cache issue
121                    fetch_account_state(chain, pubkey).await
122                }
123            }
124        }
125    }
126
127    /// Resolve connection for given account, if account has been delegated (as observed by
128    /// resolver), then the returned client is configured to connect to corresponding ER
129    /// instance, otherwise the client will connect to base layer chain
130    pub async fn resolve(&self, pubkey: &Pubkey) -> ResolverResult<Arc<RpcClient>> {
131        let status = self.resolve_status(pubkey).await?;
132        self.resolve_client(status)
133    }
134
135    /// Resolve connection for given transaction, if any of the accounts have been delegated
136    /// (as observed by resolver), then the resolver will check that all the writable accounts in
137    /// transaction have been delegated to the same ER, if validation is successful, the returned
138    /// client is configured to connect to this common ER. If none of the accounts are delegated then
139    /// the returned client is configured to connect to base layer chain. If conflict in delegation
140    /// is found, i.e. writable accounts are delegated to different ERs, then error is returned as
141    /// connection resolution is impossible for such a case.
142    pub async fn resolve_for_transaction(
143        &self,
144        tx: &Transaction,
145    ) -> ResolverResult<Arc<RpcClient>> {
146        let mut statuses = Vec::new();
147        for (i, acc) in tx.message.account_keys.iter().enumerate() {
148            if tx.message.is_maybe_writable(i, None) {
149                statuses.push(self.resolve_status(acc).await?);
150            }
151        }
152        let mut validator = None;
153        for s in statuses {
154            let DelegationStatus::Delegated(v1) = s else {
155                continue;
156            };
157            let Some(v2) = validator.replace(v1) else {
158                continue;
159            };
160            if v1 != v2 {
161                return Err(Error::Resolver(format!(
162                    "transaction accounts delegated to different validators: {v1} <> {v2}"
163                )));
164            }
165        }
166        if let Some(v) = validator.map(DelegationStatus::Delegated) {
167            return self.resolve_client(v);
168        }
169        Ok(self.chain.clone())
170    }
171
172    /// Get current delegation status for account, either from cache or
173    /// from chain (if account is encoutered for the first time)
174    async fn resolve_status(&self, pubkey: &Pubkey) -> ResolverResult<DelegationStatus> {
175        if let Some(record) = self.delegations.get(pubkey) {
176            if record.get().subscribed.load(Ordering::Acquire) {
177                // only return cached status if websocket subscription exists
178                Ok(record.get().status)
179            } else {
180                // fetch from chain otherwise
181                fetch_account_state(self.chain.clone(), *pubkey).await
182            }
183        } else {
184            self.track_account(*pubkey).await
185        }
186    }
187
188    /// Depending on delegation status, return appropriate RpcClient,
189    /// which can be used to perform requests for account involved
190    fn resolve_client(&self, status: DelegationStatus) -> ResolverResult<Arc<RpcClient>> {
191        match status {
192            DelegationStatus::Delegated(validator) => {
193                let guard = self
194                    .routes
195                    .read()
196                    // not really possible, no thread can panic while holding this lock
197                    .expect("poisoned RwLock for routing table");
198                let client = guard.get(&validator).ok_or(Error::Resolver(format!(
199                    "url not found for validator: {validator}"
200                )))?;
201                Ok(client.clone())
202            }
203            DelegationStatus::Undelegated => Ok(self.chain.clone()),
204        }
205    }
206}
207
208mod account;
209pub mod config;
210mod error;
211mod http;
212mod websocket;