drift_rs/
priority_fee_subscriber.rs

1use std::{
2    sync::{Arc, Mutex, RwLock},
3    time::Duration,
4};
5
6use log::warn;
7use solana_rpc_client::nonblocking::rpc_client::RpcClient;
8use solana_sdk::{clock::Slot, pubkey::Pubkey};
9use tokio::sync::oneshot;
10
11pub const DEFAULT_REFRESH_FREQUENCY: Duration = Duration::from_millis(5 * 400);
12pub const DEFAULT_SLOT_WINDOW: Slot = 30;
13
14/// Subscribes to network priority fees given some accounts
15pub struct PriorityFeeSubscriber {
16    config: PriorityFeeSubscriberConfig,
17    /// Accounts to lock for the priority fee calculation
18    writeable_accounts: Vec<Pubkey>,
19    rpc_client: RpcClient,
20    latest_fees: RwLock<Vec<u64>>,
21    /// unsubscriber handler
22    unsub: Mutex<Option<oneshot::Sender<()>>>,
23}
24
25/// Options for `PriorityFeeSubscriber`
26pub struct PriorityFeeSubscriberConfig {
27    /// how frequently to re-poll the priority fee
28    pub refresh_frequency: Option<Duration>,
29    /// # of historic slots to consider in the fee calculation
30    /// max: 150
31    pub window: Option<Slot>,
32}
33
34impl PriorityFeeSubscriber {
35    /// Create new `PriorityFeeSubscriber` assuming a tx will lock `writeable_accounts`
36    pub fn new(endpoint: String, writeable_accounts: &[Pubkey]) -> Self {
37        Self::with_config(
38            RpcClient::new(endpoint),
39            writeable_accounts,
40            PriorityFeeSubscriberConfig {
41                refresh_frequency: Some(DEFAULT_REFRESH_FREQUENCY),
42                window: Some(DEFAULT_SLOT_WINDOW),
43            },
44        )
45    }
46
47    /// Create new `PriorityFeeSubscriber` assuming a tx will lock `writeable_accounts`
48    pub fn with_config(
49        rpc_client: RpcClient,
50        writeable_accounts: &[Pubkey],
51        config: PriorityFeeSubscriberConfig,
52    ) -> Self {
53        Self {
54            config,
55            writeable_accounts: writeable_accounts.to_vec(),
56            rpc_client,
57            latest_fees: RwLock::new(Default::default()),
58            unsub: Mutex::default(),
59        }
60    }
61
62    /// Start the priority fee subscriber task
63    ///
64    /// Returns a handle to the subscriber for querying results
65    pub fn subscribe(self) -> Arc<PriorityFeeSubscriber> {
66        let (unsub_tx, mut unsub_rx) = oneshot::channel();
67        {
68            let mut guard = self.unsub.try_lock().expect("uncontested");
69            guard.replace(unsub_tx);
70        }
71
72        let arc = Arc::new(self);
73
74        tokio::spawn({
75            let this = Arc::clone(&arc);
76            async move {
77                let mut refresh = tokio::time::interval(
78                    this.config
79                        .refresh_frequency
80                        .unwrap_or(DEFAULT_REFRESH_FREQUENCY),
81                );
82                let window = this
83                    .config
84                    .window
85                    .unwrap_or(DEFAULT_SLOT_WINDOW)
86                    .clamp(5, 150) as usize; // 150 max slots from node cache
87
88                let max_attempts = 3;
89                let mut attempts = 0;
90                loop {
91                    let _ = refresh.tick().await;
92                    let response = this
93                        .rpc_client
94                        .get_recent_prioritization_fees(this.writeable_accounts.as_slice())
95                        .await;
96
97                    match response {
98                        Ok(response) => {
99                            attempts = 0;
100                            let mut latest_fees: Vec<u64> = response
101                                .iter()
102                                .take(window)
103                                .map(|x| x.prioritization_fee)
104                                .collect();
105                            latest_fees.sort_unstable();
106                            let mut current_fees = this.latest_fees.write().expect("acquired");
107                            *current_fees = latest_fees;
108                        }
109                        Err(err) => {
110                            warn!("failed to fetch priority fee: {err:?}");
111                            attempts += 1;
112                            if attempts > max_attempts {
113                                panic!("unable to fetch priority fees");
114                            }
115                        }
116                    }
117
118                    if unsub_rx.try_recv().is_ok() {
119                        warn!("unsubscribing priority fees");
120                        break;
121                    }
122                }
123            }
124        });
125
126        arc
127    }
128
129    /// Stop the associated network subscription task
130    pub fn unsubscribe(&self) {
131        let mut guard = self.unsub.lock().expect("acquired");
132        if let Some(unsub) = guard.take() {
133            if unsub.send(()).is_err() {
134                log::error!("couldn't unsubscribe");
135            }
136        }
137    }
138
139    /// Returns the median priority fee in micro-lamports over the look-back window
140    pub fn priority_fee(&self) -> u64 {
141        self.priority_fee_nth(0.5)
142    }
143
144    /// Returns the n-th percentile priority fee in micro-lamports over the look-back window
145    /// `percentile` given as decimal 0.0 < n <= 1.0
146    pub fn priority_fee_nth(&self, percentile: f32) -> u64 {
147        let percentile = percentile.min(1.0);
148        let lock = self.latest_fees.read().expect("acquired");
149        if lock.is_empty() {
150            panic!("PriorityFeeSubscriber is not subscribed");
151        }
152        let idx = ((lock.len() - 1) as f32 * percentile).round() as usize;
153        lock[idx]
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use serde_json::json;
160    use solana_rpc_client::rpc_client::Mocks;
161    use solana_rpc_client_api::{request::RpcRequest, response::RpcPrioritizationFee};
162
163    use super::*;
164
165    #[tokio::test]
166    async fn priority_fee_subscribe() {
167        let _ = env_logger::try_init();
168        let account_one = Pubkey::new_unique();
169        let account_two = Pubkey::new_unique();
170
171        let mut response_mocks = Mocks::default();
172        let recent_fees: Vec<RpcPrioritizationFee> = [1, 3, 5, 6, 4, 7, 2, 9, 8]
173            .into_iter()
174            .enumerate()
175            .map(|(i, f)| RpcPrioritizationFee {
176                slot: i as u64,
177                prioritization_fee: f,
178            })
179            .collect();
180
181        response_mocks.insert(RpcRequest::GetRecentPrioritizationFees, json!(recent_fees));
182
183        let mock_rpc = RpcClient::new_mock_with_mocks(
184            "https://api.mainnet-beta.solana.com".into(),
185            response_mocks,
186        );
187        let writeable_accounts = &[account_one, account_two];
188
189        let pf = PriorityFeeSubscriber::with_config(
190            mock_rpc,
191            writeable_accounts,
192            PriorityFeeSubscriberConfig {
193                refresh_frequency: Some(Duration::from_secs(5)),
194                window: Some(100),
195            },
196        );
197
198        // test
199        let pf = pf.subscribe();
200        tokio::time::sleep(Duration::from_millis(100)).await; // wait for subscriber to populate
201
202        let pf_median = pf.priority_fee();
203        assert_eq!(pf_median, 5);
204        let pf_99 = pf.priority_fee_nth(0.99);
205        assert_eq!(pf_99, 9);
206        let pf_05 = pf.priority_fee_nth(0.05);
207        assert_eq!(pf_05, 1);
208    }
209}