drift_rs/
priority_fee_subscriber.rs1use std::{
2 sync::{Arc, Mutex, RwLock},
3 time::Duration,
4};
5
6use log::warn;
7use solana_rpc_client::nonblocking::rpc_client::RpcClient;
8use solana_sdk::{clock::Slot, pubkey::Pubkey};
9use tokio::sync::oneshot;
10
11pub const DEFAULT_REFRESH_FREQUENCY: Duration = Duration::from_millis(5 * 400);
12pub const DEFAULT_SLOT_WINDOW: Slot = 30;
13
14pub struct PriorityFeeSubscriber {
16 config: PriorityFeeSubscriberConfig,
17 writeable_accounts: Vec<Pubkey>,
19 rpc_client: RpcClient,
20 latest_fees: RwLock<Vec<u64>>,
21 unsub: Mutex<Option<oneshot::Sender<()>>>,
23}
24
25pub struct PriorityFeeSubscriberConfig {
27 pub refresh_frequency: Option<Duration>,
29 pub window: Option<Slot>,
32}
33
34impl PriorityFeeSubscriber {
35 pub fn new(endpoint: String, writeable_accounts: &[Pubkey]) -> Self {
37 Self::with_config(
38 RpcClient::new(endpoint),
39 writeable_accounts,
40 PriorityFeeSubscriberConfig {
41 refresh_frequency: Some(DEFAULT_REFRESH_FREQUENCY),
42 window: Some(DEFAULT_SLOT_WINDOW),
43 },
44 )
45 }
46
47 pub fn with_config(
49 rpc_client: RpcClient,
50 writeable_accounts: &[Pubkey],
51 config: PriorityFeeSubscriberConfig,
52 ) -> Self {
53 Self {
54 config,
55 writeable_accounts: writeable_accounts.to_vec(),
56 rpc_client,
57 latest_fees: RwLock::new(Default::default()),
58 unsub: Mutex::default(),
59 }
60 }
61
62 pub fn subscribe(self) -> Arc<PriorityFeeSubscriber> {
66 let (unsub_tx, mut unsub_rx) = oneshot::channel();
67 {
68 let mut guard = self.unsub.try_lock().expect("uncontested");
69 guard.replace(unsub_tx);
70 }
71
72 let arc = Arc::new(self);
73
74 tokio::spawn({
75 let this = Arc::clone(&arc);
76 async move {
77 let mut refresh = tokio::time::interval(
78 this.config
79 .refresh_frequency
80 .unwrap_or(DEFAULT_REFRESH_FREQUENCY),
81 );
82 let window = this
83 .config
84 .window
85 .unwrap_or(DEFAULT_SLOT_WINDOW)
86 .clamp(5, 150) as usize; let max_attempts = 3;
89 let mut attempts = 0;
90 loop {
91 let _ = refresh.tick().await;
92 let response = this
93 .rpc_client
94 .get_recent_prioritization_fees(this.writeable_accounts.as_slice())
95 .await;
96
97 match response {
98 Ok(response) => {
99 attempts = 0;
100 let mut latest_fees: Vec<u64> = response
101 .iter()
102 .take(window)
103 .map(|x| x.prioritization_fee)
104 .collect();
105 latest_fees.sort_unstable();
106 let mut current_fees = this.latest_fees.write().expect("acquired");
107 *current_fees = latest_fees;
108 }
109 Err(err) => {
110 warn!("failed to fetch priority fee: {err:?}");
111 attempts += 1;
112 if attempts > max_attempts {
113 panic!("unable to fetch priority fees");
114 }
115 }
116 }
117
118 if unsub_rx.try_recv().is_ok() {
119 warn!("unsubscribing priority fees");
120 break;
121 }
122 }
123 }
124 });
125
126 arc
127 }
128
129 pub fn unsubscribe(&self) {
131 let mut guard = self.unsub.lock().expect("acquired");
132 if let Some(unsub) = guard.take() {
133 if unsub.send(()).is_err() {
134 log::error!("couldn't unsubscribe");
135 }
136 }
137 }
138
139 pub fn priority_fee(&self) -> u64 {
141 self.priority_fee_nth(0.5)
142 }
143
144 pub fn priority_fee_nth(&self, percentile: f32) -> u64 {
147 let percentile = percentile.min(1.0);
148 let lock = self.latest_fees.read().expect("acquired");
149 if lock.is_empty() {
150 panic!("PriorityFeeSubscriber is not subscribed");
151 }
152 let idx = ((lock.len() - 1) as f32 * percentile).round() as usize;
153 lock[idx]
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use serde_json::json;
160 use solana_rpc_client::rpc_client::Mocks;
161 use solana_rpc_client_api::{request::RpcRequest, response::RpcPrioritizationFee};
162
163 use super::*;
164
165 #[tokio::test]
166 async fn priority_fee_subscribe() {
167 let _ = env_logger::try_init();
168 let account_one = Pubkey::new_unique();
169 let account_two = Pubkey::new_unique();
170
171 let mut response_mocks = Mocks::default();
172 let recent_fees: Vec<RpcPrioritizationFee> = [1, 3, 5, 6, 4, 7, 2, 9, 8]
173 .into_iter()
174 .enumerate()
175 .map(|(i, f)| RpcPrioritizationFee {
176 slot: i as u64,
177 prioritization_fee: f,
178 })
179 .collect();
180
181 response_mocks.insert(RpcRequest::GetRecentPrioritizationFees, json!(recent_fees));
182
183 let mock_rpc = RpcClient::new_mock_with_mocks(
184 "https://api.mainnet-beta.solana.com".into(),
185 response_mocks,
186 );
187 let writeable_accounts = &[account_one, account_two];
188
189 let pf = PriorityFeeSubscriber::with_config(
190 mock_rpc,
191 writeable_accounts,
192 PriorityFeeSubscriberConfig {
193 refresh_frequency: Some(Duration::from_secs(5)),
194 window: Some(100),
195 },
196 );
197
198 let pf = pf.subscribe();
200 tokio::time::sleep(Duration::from_millis(100)).await; let pf_median = pf.priority_fee();
203 assert_eq!(pf_median, 5);
204 let pf_99 = pf.priority_fee_nth(0.99);
205 assert_eq!(pf_99, 9);
206 let pf_05 = pf.priority_fee_nth(0.05);
207 assert_eq!(pf_05, 1);
208 }
209}