drift_rs/
oraclemap.rs

1use std::sync::{
2    atomic::{AtomicU64, Ordering},
3    Arc,
4};
5
6use ahash::HashSet;
7use dashmap::{DashMap, ReadOnlyView};
8use drift_pubsub_client::PubsubClient;
9use futures_util::{stream::FuturesUnordered, StreamExt};
10use log::warn;
11use solana_rpc_client::nonblocking::rpc_client::RpcClient;
12use solana_sdk::{
13    account::Account, clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey,
14};
15
16use crate::{
17    drift_idl::types::OracleSource,
18    ffi::{get_oracle_price, OraclePriceData},
19    grpc::AccountUpdate,
20    types::MapOf,
21    websocket_account_subscriber::{AccountUpdate as WsAccountUpdate, WebsocketAccountSubscriber},
22    MarketId, SdkError, SdkResult, UnsubHandle,
23};
24
25const LOG_TARGET: &str = "oraclemap";
26
27#[allow(dead_code)]
28#[derive(Copy, Clone, Debug)]
29/// Captures shared relationship between oracles and markets/source types
30enum OracleShareMode {
31    /// Oracle is unshared
32    Isolated {
33        market: MarketId,
34        source: OracleSource,
35    },
36    /// Oracle is shared by two markets
37    Dual {
38        spot: MarketId,
39        perp: MarketId,
40        source: OracleSource,
41    },
42    /// Oracle is shared by two markets with mixed sources
43    DualMixed {
44        spot: (MarketId, OracleSource),
45        perp: (MarketId, OracleSource),
46    },
47}
48
49#[derive(Clone, Default, Debug)]
50pub struct Oracle {
51    pub pubkey: Pubkey,
52    pub data: OraclePriceData,
53    pub source: OracleSource,
54    pub slot: u64,
55    pub raw: Vec<u8>,
56}
57
58/// Dynamic map of Drift market oracle data
59///
60/// Caller can subscribe to some subset of markets for Ws backed updates
61/// Alternatively, the caller may drive the map by calling `sync` periodically
62pub struct OracleMap {
63    /// Oracle data keyed by pubkey and source
64    pub oraclemap: Arc<DashMap<(Pubkey, u8), Oracle, ahash::RandomState>>,
65    /// Oracle subscription handles by pubkey
66    subscriptions: DashMap<Pubkey, UnsubHandle, ahash::RandomState>,
67    /// Oracle (pubkey, source) by MarketId (immutable)
68    pub oracle_by_market: ReadOnlyView<MarketId, (Pubkey, OracleSource), ahash::RandomState>,
69    /// map from oracle to consuming markets/source types
70    shared_oracles: ReadOnlyView<Pubkey, OracleShareMode, ahash::RandomState>,
71    latest_slot: Arc<AtomicU64>,
72    commitment: CommitmentConfig,
73    pubsub: Arc<PubsubClient>,
74}
75
76impl OracleMap {
77    pub const SUBSCRIPTION_ID: &'static str = "oraclemap";
78
79    /// Create a new `OracleMap`
80    ///
81    /// * `rpc_client` - Shared RPC client instance
82    /// * `pubsub_client` - Shared Pubsub client instance
83    /// * `all_oracles` - Exhaustive list of all Drift oracle pubkeys and source by market
84    ///
85    pub fn new(
86        pubsub_client: Arc<PubsubClient>,
87        all_oracles: &[(MarketId, Pubkey, OracleSource)],
88        commitment: CommitmentConfig,
89    ) -> Self {
90        log::debug!(target: LOG_TARGET, "all oracles: {:?}", all_oracles);
91        let oracle_by_market: DashMap<MarketId, (Pubkey, OracleSource), ahash::RandomState> =
92            all_oracles
93                .iter()
94                .copied()
95                .map(|(market, pubkey, source)| (market, (pubkey, source)))
96                .collect();
97        let oracle_by_market = oracle_by_market.into_read_only();
98
99        let shared_oracles = DashMap::<Pubkey, OracleShareMode, ahash::RandomState>::default();
100        for (market, (pubkey, source)) in oracle_by_market.iter() {
101            shared_oracles
102                .entry(*pubkey)
103                .and_modify(|m| {
104                    match m {
105                        OracleShareMode::Isolated {
106                            market: isolated_market,
107                            source: isolated_source,
108                        } => {
109                            let (spot, perp) = if isolated_market.is_spot() {
110                                ((*isolated_market, *isolated_source), (*market, *source))
111                            } else {
112                                ((*market, *source), (*isolated_market, *isolated_source))
113                            };
114
115                            if source == isolated_source {
116                                *m = OracleShareMode::Dual {
117                                    spot: spot.0,
118                                    perp: perp.0,
119                                    source: *source,
120                                };
121                            } else {
122                                *m = OracleShareMode::DualMixed { spot, perp };
123                            }
124                        }
125                        _ => {
126                            // the oracle is being used by more than 2 markets
127                            panic!("detected unhandled shared oracle mode");
128                        }
129                    }
130                })
131                .or_insert(OracleShareMode::Isolated {
132                    market: *market,
133                    source: *source,
134                });
135        }
136
137        Self {
138            oraclemap: Arc::default(),
139            shared_oracles: shared_oracles.into_read_only(),
140            oracle_by_market,
141            subscriptions: Default::default(),
142            latest_slot: Arc::new(AtomicU64::new(0)),
143            commitment,
144            pubsub: pubsub_client,
145        }
146    }
147
148    /// Subscribe to oracle updates for given `markets`
149    ///
150    /// Can be called multiple times to subscribe to additional markets
151    ///
152    /// Panics
153    ///
154    /// If the `market` oracle pubkey is not loaded
155    pub async fn subscribe(&self, markets: &[MarketId]) -> SdkResult<()> {
156        let markets = HashSet::from_iter(markets);
157        log::debug!(target: LOG_TARGET, "subscribe market oracles: {markets:?}");
158
159        let mut pending_subscriptions =
160            Vec::<WebsocketAccountSubscriber>::with_capacity(markets.len());
161
162        for market in markets {
163            let (oracle_pubkey, _oracle_source) =
164                self.oracle_by_market.get(market).expect("oracle exists");
165
166            // markets can share oracle pubkeys, only want one sub per oracle pubkey
167            if self.subscriptions.contains_key(oracle_pubkey)
168                || pending_subscriptions
169                    .iter()
170                    .any(|sub| &sub.pubkey == oracle_pubkey)
171            {
172                log::debug!(target: LOG_TARGET, "subscription exists: {market:?}/{oracle_pubkey:?}");
173                continue;
174            }
175
176            let oracle_subscriber = WebsocketAccountSubscriber::new(
177                Arc::clone(&self.pubsub),
178                *oracle_pubkey,
179                self.commitment,
180            );
181
182            pending_subscriptions.push(oracle_subscriber);
183        }
184
185        let futs_iter = pending_subscriptions.into_iter().map(|sub_fut| {
186            let oraclemap = Arc::clone(&self.oraclemap);
187            let oracle_shared_mode = *(self
188                .shared_oracles
189                .get(&sub_fut.pubkey)
190                .expect("oracle exists"));
191
192            async move {
193                let unsub =
194                    sub_fut
195                        .subscribe(Self::SUBSCRIPTION_ID, true, move |update| {
196                            match oracle_shared_mode {
197                                OracleShareMode::Isolated { market: _, source } => {
198                                    update_handler(update, source, &oraclemap)
199                                }
200                                OracleShareMode::Dual {
201                                    spot: _,
202                                    perp: _,
203                                    source,
204                                } => update_handler(update, source, &oraclemap),
205                                OracleShareMode::DualMixed { spot, perp } => {
206                                    update_handler(update, spot.1, &oraclemap);
207                                    update_handler(update, perp.1, &oraclemap);
208                                }
209                            }
210                        })
211                        .await;
212                ((sub_fut.pubkey, oracle_shared_mode), unsub)
213            }
214        });
215
216        let mut subscription_futs = FuturesUnordered::from_iter(futs_iter);
217
218        while let Some(((pubkey, oracle_share_mode), unsub)) = subscription_futs.next().await {
219            log::debug!(target: LOG_TARGET, "subscribed market oracle: {oracle_share_mode:?}");
220            self.subscriptions.insert(pubkey, unsub?);
221        }
222
223        log::debug!(target: LOG_TARGET, "subscribed");
224        Ok(())
225    }
226
227    /// Unsubscribe from oracle updates for the given `markets`
228    pub fn unsubscribe(&self, markets: &[MarketId]) -> SdkResult<()> {
229        for market in markets {
230            if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) {
231                if let Some((_, unsub)) = self.subscriptions.remove(oracle_pubkey) {
232                    let _ = unsub.send(());
233                    self.oraclemap
234                        .remove(&(*oracle_pubkey, *oracle_source as u8));
235                }
236            }
237        }
238        log::debug!(target: LOG_TARGET, "unsubscribed markets: {markets:?}");
239
240        Ok(())
241    }
242
243    /// Unsubscribe from all oracle updates
244    pub fn unsubscribe_all(&self) -> SdkResult<()> {
245        let all_markets: Vec<MarketId> = self.oracle_by_market.keys().copied().collect();
246        self.unsubscribe(&all_markets)
247    }
248
249    /// Fetches account data for each market oracle set by `markets`
250    ///
251    /// This may be invoked manually to resync oracle data for some set of markets
252    pub async fn sync(&self, markets: &[MarketId], rpc: &RpcClient) -> SdkResult<()> {
253        let markets = HashSet::<MarketId>::from_iter(markets.iter().copied());
254        log::debug!(target: LOG_TARGET, "sync oracles for: {markets:?}");
255
256        let mut oracle_sources = Vec::with_capacity(markets.len());
257        let mut oracle_pubkeys = Vec::with_capacity(markets.len());
258
259        for (_, (pubkey, source)) in self
260            .oracle_by_market
261            .iter()
262            .filter(|(m, _)| markets.contains(m))
263        {
264            oracle_pubkeys.push(*pubkey);
265            oracle_sources.push(*source);
266        }
267
268        let (synced_oracles, latest_slot) =
269            match get_multi_account_data_with_fallback(rpc, &oracle_pubkeys).await {
270                Ok(result) => result,
271                Err(err) => {
272                    warn!(target: LOG_TARGET, "failed to sync oracle accounts");
273                    return Err(err);
274                }
275            };
276
277        if synced_oracles.len() != oracle_pubkeys.len() {
278            warn!(target: LOG_TARGET, "failed to sync all oracle accounts");
279            return Err(SdkError::InvalidOracle);
280        }
281
282        for ((oracle_pubkey, oracle_account), oracle_source) in
283            synced_oracles.iter().zip(oracle_sources)
284        {
285            self.oraclemap
286                .entry((*oracle_pubkey, oracle_source as u8))
287                .and_modify(|o| {
288                    let price_data = get_oracle_price(
289                        o.source,
290                        &mut (*oracle_pubkey, oracle_account.clone()),
291                        latest_slot,
292                    )
293                    .expect("valid oracle data");
294
295                    o.raw.clone_from(&oracle_account.data);
296                    o.data = price_data;
297                    o.slot = latest_slot;
298                });
299        }
300
301        self.latest_slot.store(latest_slot, Ordering::Relaxed);
302        log::debug!(target: LOG_TARGET, "synced {} oracles", synced_oracles.len());
303
304        Ok(())
305    }
306
307    /// Number of oracles known to the `OracleMap`
308    #[allow(dead_code)]
309    pub fn len(&self) -> usize {
310        self.oraclemap.len()
311    }
312
313    /// Returns true if the oraclemap has a subscription for `market`
314    pub fn is_subscribed(&self, market: &MarketId) -> bool {
315        if let Some((oracle_pubkey, _oracle_source)) = self.oracle_by_market.get(market) {
316            self.subscriptions.contains_key(oracle_pubkey)
317        } else {
318            false
319        }
320    }
321
322    /// Get the address of a perp market oracle
323    pub fn current_perp_oracle(&self, market_index: u16) -> Option<Pubkey> {
324        self.get_by_market(&MarketId::perp(market_index))
325            .map(|x| x.pubkey)
326    }
327
328    /// Get the address of a spot market oracle
329    pub fn current_spot_oracle(&self, market_index: u16) -> Option<Pubkey> {
330        self.get_by_market(&MarketId::spot(market_index))
331            .map(|x| x.pubkey)
332    }
333
334    /// Return Oracle data by market, if known
335    pub fn get_by_market(&self, market: &MarketId) -> Option<Oracle> {
336        if let Some((oracle_pubkey, oracle_source)) = self.oracle_by_market.get(market) {
337            self.oraclemap
338                .get(&(*oracle_pubkey, *oracle_source as u8))
339                .map(|o| o.clone())
340        } else {
341            None
342        }
343    }
344
345    #[allow(dead_code)]
346    pub fn values(&self) -> Vec<Oracle> {
347        self.oraclemap.iter().map(|x| x.clone()).collect()
348    }
349
350    pub fn get_latest_slot(&self) -> u64 {
351        self.latest_slot.load(Ordering::Relaxed)
352    }
353    /// Return a reference to the internal map data structure
354    pub fn map(&self) -> Arc<MapOf<(Pubkey, u8), Oracle>> {
355        Arc::clone(&self.oraclemap)
356    }
357
358    /// Returns a hook for driving the map with new `Account` updates
359    pub(crate) fn on_account_fn(&self) -> impl Fn(&AccountUpdate) {
360        let marketmap = self.map();
361        let oracle_lookup = self.shared_oracles.clone();
362
363        move |update: &AccountUpdate| match oracle_lookup.get(&update.pubkey).unwrap() {
364            OracleShareMode::Dual {
365                spot: _,
366                perp: _,
367                source,
368            } => {
369                update_handler_grpc(update, *source, &marketmap);
370            }
371            OracleShareMode::Isolated { market: _, source } => {
372                update_handler_grpc(update, *source, &marketmap);
373            }
374            OracleShareMode::DualMixed { spot, perp } => {
375                update_handler_grpc(update, spot.1, &marketmap);
376                update_handler_grpc(update, perp.1, &marketmap);
377            }
378        }
379    }
380}
381
382/// Handler fn for new oracle account data
383#[inline]
384fn update_handler_grpc(
385    update: &AccountUpdate,
386    oracle_source: OracleSource,
387    oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>,
388) {
389    let lamports = update.lamports;
390    let slot = update.slot;
391    match get_oracle_price(
392        oracle_source,
393        &mut (
394            update.pubkey,
395            Account {
396                owner: update.owner,
397                data: update.data.to_vec(),
398                lamports,
399                ..Default::default()
400            },
401        ),
402        slot,
403    ) {
404        Ok(price_data) => {
405            oracle_map
406                .entry((update.pubkey, oracle_source as u8))
407                .and_modify(|o| {
408                    o.data = price_data;
409                    o.slot = slot;
410                    o.raw.resize(update.data.len(), 0);
411                    o.raw.clone_from_slice(update.data);
412                })
413                .or_insert(Oracle {
414                    pubkey: update.pubkey,
415                    source: oracle_source,
416                    data: price_data,
417                    slot,
418                    raw: update.data.to_vec(),
419                });
420        }
421        Err(err) => {
422            log::error!("Failed to get oracle price: {err:?}, {:?}", update.pubkey)
423        }
424    }
425}
426
427/// Handler fn for new oracle account data
428fn update_handler(
429    update: &WsAccountUpdate,
430    oracle_source: OracleSource,
431    oracle_map: &DashMap<(Pubkey, u8), Oracle, ahash::RandomState>,
432) {
433    let oracle_pubkey = update.pubkey;
434    let lamports = update.lamports;
435    match get_oracle_price(
436        oracle_source,
437        &mut (
438            oracle_pubkey,
439            Account {
440                owner: update.owner,
441                data: update.data.clone(),
442                lamports,
443                ..Default::default()
444            },
445        ),
446        update.slot,
447    ) {
448        Ok(price_data) => {
449            oracle_map
450                .entry((oracle_pubkey, oracle_source as u8))
451                .and_modify(|o| {
452                    o.data = price_data;
453                    o.slot = update.slot;
454                    o.raw.clone_from(&update.data);
455                })
456                .or_insert(Oracle {
457                    pubkey: oracle_pubkey,
458                    source: oracle_source,
459                    data: price_data,
460                    slot: update.slot,
461                    raw: update.data.clone(),
462                });
463        }
464        Err(err) => {
465            log::error!("Failed to get oracle price: {err:?}, {oracle_pubkey:?}")
466        }
467    }
468}
469
470/// Fetch all accounts with multiple fallbacks
471///
472/// Tries progressively less intensive RPC methods for wider compatibility with RPC providers:
473///    getMultipleAccounts, lastly multiple getAccountInfo
474///
475/// Returns deserialized accounts and retrieved slot
476async fn get_multi_account_data_with_fallback(
477    rpc: &RpcClient,
478    pubkeys: &[Pubkey],
479) -> SdkResult<(Vec<(Pubkey, Account)>, Slot)> {
480    let mut account_data = Vec::with_capacity(pubkeys.len());
481
482    // try 'getMultipleAccounts'
483    let mut gma_requests = FuturesUnordered::new();
484    for keys in pubkeys.chunks(64) {
485        gma_requests.push(async move {
486            let response = rpc
487                .get_multiple_accounts_with_commitment(keys, rpc.commitment())
488                .await;
489            (response, keys)
490        });
491    }
492
493    let mut gma_slot = 0;
494    while let Some((gma_response, keys)) = gma_requests.next().await {
495        match gma_response {
496            Ok(response) => {
497                gma_slot = response.context.slot;
498                for (oracle, pubkey) in response.value.into_iter().zip(keys) {
499                    match oracle {
500                        Some(oracle) => {
501                            account_data.push((*pubkey, oracle));
502                        }
503                        None => {
504                            log::warn!(target: LOG_TARGET, "failed to fetch oracle account (missing)");
505                            break;
506                        }
507                    }
508                }
509            }
510            Err(err) => {
511                log::warn!(target: LOG_TARGET, "failed to fetch oracle accounts: {err:?}");
512                return Err(err)?;
513            }
514        }
515    }
516
517    if account_data.len() == pubkeys.len() {
518        return Ok((account_data, gma_slot));
519    }
520    log::debug!(target: LOG_TARGET, "syncing with getMultipleAccounts failed");
521
522    // try multiple 'getAccount's
523    let mut account_requests = FuturesUnordered::from_iter(pubkeys.iter().map(|p| async move {
524        (
525            p,
526            rpc.get_account_with_commitment(p, rpc.commitment()).await,
527        )
528    }));
529
530    let mut latest_slot = 0;
531    while let Some((pubkey, response)) = account_requests.next().await {
532        match response {
533            Ok(response) => {
534                let account = response.value.ok_or({
535                    log::warn!("failed to fetch oracle account");
536                    SdkError::InvalidOracle
537                })?;
538                latest_slot = latest_slot.max(response.context.slot);
539                account_data.push((*pubkey, account));
540            }
541            Err(err) => {
542                log::warn!("failed to fetch oracle account: {err:?}");
543                return Err(err)?;
544            }
545        }
546    }
547
548    Ok((account_data, latest_slot))
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use crate::utils::{
555        get_ws_url,
556        test_envs::{devnet_endpoint, mainnet_endpoint},
557    };
558
559    const SOL_PERP_ORACLE: Pubkey =
560        solana_sdk::pubkey!("BAtFj4kQttZRVep3UZS2aZRDixkGYgWsbqTBVDbnSsPF");
561
562    #[tokio::test]
563    async fn oraclemap_sync() {
564        let all_oracles = vec![
565            (
566                MarketId::spot(0),
567                solana_sdk::pubkey!("5SSkXsEKQepHHAewytPVwdej4epN1nxgLVM84L4KXgy7"),
568                OracleSource::PythStableCoin,
569            ),
570            (MarketId::perp(0), SOL_PERP_ORACLE, OracleSource::PythPull),
571            (
572                MarketId::perp(1),
573                solana_sdk::pubkey!("486kr3pmFPfTsS4aZgcsQ7kS4i9rjMsYYZup6HQNSTT4"),
574                OracleSource::PythPull,
575            ),
576            (MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull),
577        ];
578        let rpc = Arc::new(RpcClient::new(devnet_endpoint().into()));
579        let pubsub = Arc::new(
580            PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap())
581                .await
582                .expect("ws connects"),
583        );
584        let map = OracleMap::new(pubsub, &all_oracles, rpc.commitment());
585
586        // - dups ignored
587        // - markets with same oracle pubkey, make at most 1 sub
588        let markets = [
589            MarketId::perp(0),
590            MarketId::spot(1),
591            MarketId::perp(1),
592            MarketId::spot(1),
593        ];
594        map.sync(&markets, &rpc).await.expect("subd");
595    }
596
597    #[tokio::test]
598    async fn oraclemap_subscribe_mixed_spot_perp_source() {
599        // bonk oracle uses a precision trick via 'oracle source'
600        // with same oracle pubkey
601        let all_oracles = vec![
602            (
603                MarketId::perp(0),
604                solana_sdk::pubkey!("3m6i4RFWEDw2Ft4tFHPJtYgmpPe21k56M3FHeWYrgGBz"),
605                OracleSource::PythLazer,
606            ),
607            (
608                MarketId::perp(4),
609                solana_sdk::pubkey!("BERaNi6cpEresbq6HC1EQGaB1H1UjvEo4NGnmYSSJof4"),
610                OracleSource::PythLazer1M,
611            ),
612            (
613                MarketId::spot(32),
614                solana_sdk::pubkey!("BERaNi6cpEresbq6HC1EQGaB1H1UjvEo4NGnmYSSJof4"),
615                OracleSource::PythLazer,
616            ),
617        ];
618        let pubsub = Arc::new(
619            PubsubClient::new(&get_ws_url(&mainnet_endpoint()).unwrap())
620                .await
621                .expect("ws connects"),
622        );
623        let map = OracleMap::new(pubsub, &all_oracles, CommitmentConfig::confirmed());
624
625        let markets = [MarketId::perp(0), MarketId::spot(32), MarketId::perp(4)];
626        map.subscribe(&markets).await.expect("subd");
627        assert_eq!(map.len(), 3);
628        assert!(map.is_subscribed(&MarketId::spot(32)));
629        assert!(map.is_subscribed(&MarketId::perp(4)));
630    }
631
632    #[tokio::test]
633    async fn oraclemap_subscribes() {
634        let _ = env_logger::try_init();
635        let all_oracles = vec![
636            (
637                MarketId::spot(0),
638                solana_sdk::pubkey!("5SSkXsEKQepHHAewytPVwdej4epN1nxgLVM84L4KXgy7"),
639                OracleSource::PythStableCoin,
640            ),
641            (MarketId::perp(0), SOL_PERP_ORACLE, OracleSource::PythPull),
642            (
643                MarketId::perp(1),
644                solana_sdk::pubkey!("486kr3pmFPfTsS4aZgcsQ7kS4i9rjMsYYZup6HQNSTT4"),
645                OracleSource::PythPull,
646            ),
647            (MarketId::spot(1), SOL_PERP_ORACLE, OracleSource::PythPull),
648        ];
649        let pubsub = Arc::new(
650            PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap())
651                .await
652                .expect("ws connects"),
653        );
654        let map = OracleMap::new(pubsub, &all_oracles, CommitmentConfig::confirmed());
655
656        // - dups ignored
657        // - markets with same oracle pubkey, make at most 1 sub
658        let markets = [
659            MarketId::perp(0),
660            MarketId::spot(1),
661            MarketId::perp(1),
662            MarketId::spot(1),
663        ];
664        map.subscribe(&markets).await.expect("subd");
665        assert_eq!(map.len(), 2);
666        let markets = [MarketId::perp(0), MarketId::spot(1)];
667        map.subscribe(&markets).await.expect("subd");
668        assert_eq!(map.len(), 2);
669
670        assert!(map.is_subscribed(&MarketId::perp(0)));
671        assert!(map.is_subscribed(&MarketId::perp(1)));
672
673        // check unsub ok
674        assert!(map.unsubscribe(&[MarketId::perp(0)]).is_ok());
675        assert!(!map.is_subscribed(&MarketId::perp(0)));
676    }
677
678    #[tokio::test]
679    async fn oraclemap_unsubscribe_all() {
680        let all_oracles = vec![
681            (
682                MarketId::spot(0),
683                solana_sdk::pubkey!("5SSkXsEKQepHHAewytPVwdej4epN1nxgLVM84L4KXgy7"),
684                OracleSource::PythStableCoin,
685            ),
686            (
687                MarketId::perp(1),
688                solana_sdk::pubkey!("486kr3pmFPfTsS4aZgcsQ7kS4i9rjMsYYZup6HQNSTT4"),
689                OracleSource::PythPull,
690            ),
691        ];
692        let map = OracleMap::new(
693            Arc::new(
694                PubsubClient::new(&get_ws_url(&devnet_endpoint()).unwrap())
695                    .await
696                    .expect("ws connects"),
697            ),
698            &all_oracles,
699            CommitmentConfig::confirmed(),
700        );
701        map.subscribe(&[MarketId::spot(0), MarketId::perp(1)])
702            .await
703            .expect("subd");
704        assert!(map.unsubscribe_all().is_ok());
705        assert_eq!(map.len(), 0);
706    }
707
708    #[cfg(feature = "rpc_tests")]
709    #[tokio::test]
710    async fn test_oracle_map() {
711        use crate::{
712            drift_idl::accounts::{PerpMarket, SpotMarket},
713            marketmap::MarketMap,
714        };
715        let commitment = CommitmentConfig::processed();
716
717        let spot_market_map =
718            MarketMap::<SpotMarket>::new(commitment.clone(), mainnet_endpoint(), true);
719        let perp_market_map =
720            MarketMap::<PerpMarket>::new(commitment.clone(), mainnet_endpoint(), true);
721
722        let _ = spot_market_map.sync().await;
723        let _ = perp_market_map.sync().await;
724
725        let perp_oracles = perp_market_map.oracles();
726        let spot_oracles = spot_market_map.oracles();
727
728        let mut oracles = vec![];
729        oracles.extend(perp_oracles.clone());
730        oracles.extend(spot_oracles.clone());
731
732        let mut oracle_infos = vec![];
733        for oracle_info in oracles {
734            if !oracle_infos.contains(&oracle_info) {
735                oracle_infos.push(oracle_info)
736            }
737        }
738
739        let oracle_infos_len = oracle_infos.len();
740        dbg!(oracle_infos_len);
741
742        let oracle_map = OracleMap::new(
743            commitment,
744            &mainnet_endpoint(),
745            true,
746            perp_oracles,
747            spot_oracles,
748        );
749
750        let _ = oracle_map.subscribe().await;
751
752        dbg!(oracle_map.size());
753
754        dbg!("sleeping");
755        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
756        dbg!("done sleeping");
757
758        let rlb_perp_market_oracle_pubkey = perp_market_map
759            .get(&17)
760            .expect("rlb perp market")
761            .data
762            .amm
763            .oracle;
764        let rlb_oracle = oracle_map
765            .get(&rlb_perp_market_oracle_pubkey)
766            .expect("rlb oracle");
767        dbg!("rlb oracle info:");
768        dbg!(rlb_oracle.data.price);
769        dbg!(rlb_oracle.slot);
770
771        dbg!("perp market oracles");
772        let mut last_sol_price = 0;
773        let mut last_sol_slot = 0;
774        let mut last_btc_price = 0;
775        let mut last_btc_slot = 0;
776        for _ in 0..10 {
777            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
778            dbg!();
779            let sol_perp_market_oracle_pubkey = perp_market_map
780                .get(&0)
781                .expect("sol perp market")
782                .data
783                .amm
784                .oracle;
785            let sol_oracle = oracle_map
786                .get(&sol_perp_market_oracle_pubkey)
787                .expect("sol oracle");
788            dbg!("sol oracle info:");
789            dbg!(sol_oracle.data.price);
790            dbg!(sol_oracle.slot);
791            dbg!(
792                "sol price change: {}",
793                sol_oracle.data.price - last_sol_price
794            );
795            dbg!("sol slot change: {}", sol_oracle.slot - last_sol_slot);
796            last_sol_price = sol_oracle.data.price;
797            last_sol_slot = sol_oracle.slot;
798
799            dbg!();
800
801            let btc_perp_market_oracle_pubkey = perp_market_map
802                .get(&1)
803                .expect("btc perp market")
804                .data
805                .amm
806                .oracle;
807            let btc_oracle = oracle_map
808                .get(&btc_perp_market_oracle_pubkey)
809                .expect("btc oracle");
810            dbg!("btc oracle info:");
811            dbg!(btc_oracle.data.price);
812            dbg!(btc_oracle.slot);
813            dbg!(
814                "btc price change: {}",
815                btc_oracle.data.price - last_btc_price
816            );
817            dbg!("btc slot change: {}", btc_oracle.slot - last_btc_slot);
818            last_btc_price = btc_oracle.data.price;
819            last_btc_slot = btc_oracle.slot;
820        }
821
822        dbg!();
823
824        dbg!("spot market oracles");
825        let mut last_rndr_price = 0;
826        let mut last_rndr_slot = 0;
827        let mut last_weth_price = 0;
828        let mut last_weth_slot = 0;
829        for _ in 0..10 {
830            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
831            dbg!();
832            let rndr_spot_market_oracle_pubkey = spot_market_map
833                .get(&11)
834                .expect("sol perp market")
835                .data
836                .oracle;
837            let rndr_oracle = oracle_map
838                .get(&rndr_spot_market_oracle_pubkey)
839                .expect("sol oracle");
840            dbg!("rndr oracle info:");
841            dbg!(rndr_oracle.data.price);
842            dbg!(rndr_oracle.slot);
843            dbg!(
844                "rndr price change: {}",
845                rndr_oracle.data.price - last_rndr_price
846            );
847            dbg!("rndr slot change: {}", rndr_oracle.slot - last_rndr_slot);
848            last_rndr_price = rndr_oracle.data.price;
849            last_rndr_slot = rndr_oracle.slot;
850
851            dbg!();
852
853            let weth_spot_market_oracle_pubkey = spot_market_map
854                .get(&4)
855                .expect("sol perp market")
856                .data
857                .oracle;
858            let weth_oracle = oracle_map
859                .get(&weth_spot_market_oracle_pubkey)
860                .expect("sol oracle");
861            dbg!("weth oracle info:");
862            dbg!(weth_oracle.data.price);
863            dbg!(weth_oracle.slot);
864            dbg!(
865                "weth price change: {}",
866                weth_oracle.data.price - last_weth_price
867            );
868            dbg!("weth slot change: {}", weth_oracle.slot - last_weth_slot);
869            last_weth_price = weth_oracle.data.price;
870            last_weth_slot = weth_oracle.slot;
871        }
872
873        let _ = oracle_map.unsubscribe().await;
874    }
875}