1use std::{
4 collections::HashMap,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 },
9};
10
11use parking_lot::RwLock;
12
13use config::Configuration;
14use error::Error;
15use http::{fetch_account_state, fetch_domain_records, update_account_state};
16use rpc::nonblocking::rpc_client::RpcClient;
17use scc::{hash_cache::Entry, HashCache};
18use sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction};
19use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
20use websocket::{
21 connection::{delegations::WsDelegationsConnection, routes::WsRoutesConnection},
22 subscription::AccountSubscription,
23};
24
25type RoutingTable = Arc<RwLock<HashMap<Pubkey, Arc<RpcClient>>>>;
30type DelegationsDB = Arc<HashCache<Pubkey, DelegationRecord>>;
33type ResolverResult<T> = Result<T, Error>;
35
36const DELEGATION_PROGRAM_ID: Pubkey = ephemeral_rollups_sdk::id();
37const DELEGATION_RECORD_SIZE: usize = 88;
41
42#[derive(Clone)]
45pub struct Resolver {
46 routes: RoutingTable,
47 delegations: DelegationsDB,
48 chain: Arc<RpcClient>,
49 delegations_tx: UnboundedSender<AccountSubscription>,
50}
51
52#[derive(Clone, Copy)]
54pub enum DelegationStatus {
55 Delegated(Pubkey),
57 Undelegated,
59}
60
61struct DelegationRecord {
63 status: DelegationStatus,
65 subscribed: Arc<AtomicBool>,
68}
69
70impl Resolver {
71 pub async fn new(config: Configuration) -> ResolverResult<Self> {
73 Self::new_custom(config, true, None).await
74 }
75 pub async fn new_custom(
81 config: Configuration,
82 use_on_chain_routes: bool,
83 custom_routes: Option<HashMap<Pubkey, String>>,
84 ) -> ResolverResult<Self> {
85 let commitment = CommitmentConfig {
86 commitment: config.commitment,
87 };
88 let chain = Arc::new(RpcClient::new(config.chain.to_string()));
89
90 let mut routes: HashMap<_, _> = if use_on_chain_routes {
91 fetch_domain_records(&chain)
92 .await?
93 .into_iter()
94 .map(|record| {
95 (
96 *record.identity(),
97 RpcClient::new_with_commitment(record.addr().to_string(), commitment)
98 .into(),
99 )
100 })
101 .collect()
102 } else {
103 Default::default()
104 };
105
106 routes.extend(
107 custom_routes
108 .into_iter()
109 .flatten()
110 .map(|(k, v)| (k, RpcClient::new_with_commitment(v, commitment).into())),
111 );
112
113 let routes = Arc::new(RwLock::new(routes));
114
115 let delegations = Arc::new(HashCache::with_capacity(128, config.cache_size.max(256)));
116 let (delegations_tx, rx) = unbounded_channel();
117 let delegations_ws = WsDelegationsConnection::establish(
118 config.websocket.clone(),
119 chain.clone(),
120 rx,
121 delegations.clone(),
122 )
123 .await?;
124
125 tokio::spawn(delegations_ws.start());
126
127 if use_on_chain_routes {
128 let routes_ws =
129 WsRoutesConnection::establish(config.websocket, chain.clone(), routes.clone())
130 .await?;
131 tokio::spawn(routes_ws.start());
132 }
133
134 Ok(Self {
135 chain,
136 delegations,
137 delegations_tx,
138 routes,
139 })
140 }
141
142 pub async fn track_account(&self, pubkey: Pubkey) -> ResolverResult<DelegationStatus> {
147 let chain = self.chain.clone();
148 match self.delegations.entry(pubkey) {
149 Entry::Vacant(e) => {
150 let subscribed = Arc::new(AtomicBool::default());
151 let record = DelegationRecord {
152 status: DelegationStatus::Undelegated,
153 subscribed: subscribed.clone(),
154 };
155 e.put_entry(record);
156 let db = self.delegations.clone();
157 let subscription = AccountSubscription::new(pubkey, subscribed);
158 let status = update_account_state(chain, db, pubkey).await?;
159 let _ = self.delegations_tx.send(subscription);
160 Ok(status)
161 }
162 Entry::Occupied(e) => {
163 if e.subscribed.load(Ordering::Acquire) {
165 Ok(e.status)
166 } else {
167 fetch_account_state(chain, pubkey).await
169 }
170 }
171 }
172 }
173
174 pub async fn resolve(&self, pubkey: &Pubkey) -> ResolverResult<Arc<RpcClient>> {
178 let status = self.resolve_status(pubkey).await?;
179 self.resolve_client(status)
180 }
181
182 pub async fn resolve_for_transaction(
190 &self,
191 tx: &Transaction,
192 ) -> ResolverResult<Arc<RpcClient>> {
193 let mut statuses = Vec::new();
194 for (i, acc) in tx.message.account_keys.iter().enumerate() {
195 if tx.message.is_maybe_writable(i, None) {
196 statuses.push(self.resolve_status(acc).await?);
197 }
198 }
199 let mut validator = None;
200 for s in statuses {
201 let DelegationStatus::Delegated(v1) = s else {
202 continue;
203 };
204 let Some(v2) = validator.replace(v1) else {
205 continue;
206 };
207 if v1 != v2 {
208 return Err(Error::Resolver(format!(
209 "transaction accounts delegated to different validators: {v1} <> {v2}"
210 )));
211 }
212 }
213 if let Some(v) = validator.map(DelegationStatus::Delegated) {
214 return self.resolve_client(v);
215 }
216 Ok(self.chain.clone())
217 }
218
219 async fn resolve_status(&self, pubkey: &Pubkey) -> ResolverResult<DelegationStatus> {
222 if let Some(record) = self.delegations.get(pubkey) {
223 if record.get().subscribed.load(Ordering::Acquire) {
224 Ok(record.get().status)
226 } else {
227 fetch_account_state(self.chain.clone(), *pubkey).await
229 }
230 } else {
231 self.track_account(*pubkey).await
232 }
233 }
234
235 fn resolve_client(&self, status: DelegationStatus) -> ResolverResult<Arc<RpcClient>> {
238 match status {
239 DelegationStatus::Delegated(validator) => {
240 let guard = self.routes.read();
241 let client = guard.get(&validator).ok_or(Error::Resolver(format!(
242 "url not found for validator: {validator}"
243 )))?;
244 Ok(client.clone())
245 }
246 DelegationStatus::Undelegated => Ok(self.chain.clone()),
247 }
248 }
249}
250
251mod account;
252pub mod config;
253mod error;
254mod http;
255mod websocket;