magic_resolver/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
//! A utility SDK to facilitate route resolution for a subset of solana JSON-RPC requests
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
};
use config::Configuration;
use error::Error;
use http::{fetch_account_state, update_account_state};
use rpc::nonblocking::rpc_client::RpcClient;
use scc::{hash_cache::Entry, HashCache};
use sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use websocket::{connection::WsConnection, subscription::AccountSubscription};
/// Mapping between validator(ER) identity and solana rpc client, which is
/// configured with the URL, via which the this particular ER can be reached
/// NOTE: we use RwLock with std::HashMap instead of concurrent HashMap, as this table is not
/// supposed to be modified too often, so RwLock is faster for mostly read workload
type RoutingTable = Arc<RwLock<HashMap<Pubkey, Arc<RpcClient>>>>;
/// Limited capacity (LRU) cache, mapping between an account's
/// pubkey and it's current delegation status as observed by resolver
type DelegationsDB = Arc<HashCache<Pubkey, DelegationRecord>>;
/// Conveniece wrapper for results with possible resolver errors
type ResolverResult<T> = Result<T, Error>;
const DELEGATION_PROGRAM_ID: Pubkey = ephemeral_rollups_sdk::id();
/// The fixed size of delegation record account's data,
/// NOTE: this value should be updated if the ABI of delegation
/// program changes in the future, that will affect the size
const DELEGATION_RECORD_SIZE: usize = 88;
/// Connection resolver, the type is cheaply clonable and thus a single instance should be
/// initialized and cloned between threads if necessary
#[derive(Clone)]
pub struct Resolver {
routes: RoutingTable,
delegations: DelegationsDB,
chain: Arc<RpcClient>,
ws: UnboundedSender<AccountSubscription>,
}
/// Delegation status of account
#[derive(Clone, Copy)]
pub enum DelegationStatus {
/// Account is delegated to validator indicated by pubkey
Delegated(Pubkey),
/// Account is available for modification on chain
Undelegated,
}
/// Wrapper around delegation status, with additional flag to keep track of subscription state
struct DelegationRecord {
/// current delegation status of account, last observed by resolver
status: DelegationStatus,
/// indicator, whether active websocket subscription exists for account updates, to track its
/// delegation status
subscribed: Arc<AtomicBool>,
}
impl Resolver {
/// Initialize the resolver, by creating websocket connection
/// to base chain for delegation status tracking of accounts
pub async fn new(
config: Configuration,
routes: HashMap<Pubkey, String>,
) -> ResolverResult<Self> {
let commitment = CommitmentConfig {
commitment: config.commitment,
};
let routes = routes
.into_iter()
.map(|(k, v)| (k, RpcClient::new_with_commitment(v, commitment).into()))
.collect();
let delegations = Arc::new(HashCache::with_capacity(128, config.cache_size.max(256)));
let chain = Arc::new(RpcClient::new(config.chain.to_string()));
let (ws, rx) = unbounded_channel();
let websocket =
WsConnection::establish(config.websocket, chain.clone(), rx, delegations.clone())
.await?;
tokio::spawn(websocket.start());
Ok(Self {
chain,
delegations,
ws,
routes: Arc::new(RwLock::new(routes)),
})
}
/// Start tracking account's delegation status, this is achieved by fetching delegation record
/// for account (if exists) and subscribing to updates of its state. The existence of
/// delegation record is proof that account has been delegated, and it contains critical
/// information like the identity of validator, to which the account was delegated
pub async fn track_account(&self, pubkey: Pubkey) -> ResolverResult<DelegationStatus> {
let chain = self.chain.clone();
match self.delegations.entry(pubkey) {
Entry::Vacant(e) => {
let subscribed = Arc::new(AtomicBool::default());
let record = DelegationRecord {
status: DelegationStatus::Undelegated,
subscribed: subscribed.clone(),
};
e.put_entry(record);
let db = self.delegations.clone();
let subscription = AccountSubscription::new(pubkey, subscribed);
let status = update_account_state(chain, db, pubkey).await?;
let _ = self.ws.send(subscription);
Ok(status)
}
Entry::Occupied(e) => {
// return cached status, only if subscription exists
if e.subscribed.load(Ordering::Acquire) {
Ok(e.status)
} else {
// otherwise refetch fresh version from chain, to avoid stale cache issue
fetch_account_state(chain, pubkey).await
}
}
}
}
/// Resolve connection for given account, if account has been delegated (as observed by
/// resolver), then the returned client is configured to connect to corresponding ER
/// instance, otherwise the client will connect to base layer chain
pub async fn resolve(&self, pubkey: &Pubkey) -> ResolverResult<Arc<RpcClient>> {
let status = self.resolve_status(pubkey).await?;
self.resolve_client(status)
}
/// Resolve connection for given transaction, if any of the accounts have been delegated
/// (as observed by resolver), then the resolver will check that all the writable accounts in
/// transaction have been delegated to the same ER, if validation is successful, the returned
/// client is configured to connect to this common ER. If none of the accounts are delegated then
/// the returned client is configured to connect to base layer chain. If conflict in delegation
/// is found, i.e. writable accounts are delegated to different ERs, then error is returned as
/// connection resolution is impossible for such a case.
pub async fn resolve_for_transaction(
&self,
tx: &Transaction,
) -> ResolverResult<Arc<RpcClient>> {
let mut statuses = Vec::new();
for (i, acc) in tx.message.account_keys.iter().enumerate() {
if tx.message.is_writable(i) {
statuses.push(self.resolve_status(acc).await?);
}
}
let mut validator = None;
for s in statuses {
let DelegationStatus::Delegated(v1) = s else {
continue;
};
let Some(v2) = validator.replace(v1) else {
continue;
};
if v1 != v2 {
return Err(Error::Resolver(format!(
"transaction accounts delegated to different validators: {v1} <> {v2}"
)));
}
}
if let Some(v) = validator.map(DelegationStatus::Delegated) {
return self.resolve_client(v);
}
Ok(self.chain.clone())
}
/// Get current delegation status for account, either from cache or
/// from chain (if account is encoutered for the first time)
async fn resolve_status(&self, pubkey: &Pubkey) -> ResolverResult<DelegationStatus> {
if let Some(record) = self.delegations.get(pubkey) {
if record.get().subscribed.load(Ordering::Acquire) {
// only return cached status if websocket subscription exists
Ok(record.get().status)
} else {
// fetch from chain otherwise
fetch_account_state(self.chain.clone(), *pubkey).await
}
} else {
self.track_account(*pubkey).await
}
}
/// Depending on delegation status, return appropriate RpcClient,
/// which can be used to perform requests for account involved
fn resolve_client(&self, status: DelegationStatus) -> ResolverResult<Arc<RpcClient>> {
match status {
DelegationStatus::Delegated(validator) => {
let guard = self
.routes
.read()
// not really possible, no thread can panic while holding this lock
.expect("poisoned RwLock for routing table");
let client = guard.get(&validator).ok_or(Error::Resolver(format!(
"url not found for validator: {validator}"
)))?;
Ok(client.clone())
}
DelegationStatus::Undelegated => Ok(self.chain.clone()),
}
}
}
mod account;
pub mod config;
mod error;
mod http;
mod websocket;