gmsol_sdk/client/
mod.rs

1/// Client for transaction subscription.
2pub mod pubsub;
3
4/// Utilities for program accounts.
5pub mod accounts;
6
7/// Utilities for transaction history.
8pub mod transaction_history;
9
10/// Definition of [`TokenMap`].
11pub mod token_map;
12
13/// Operations.
14pub mod ops;
15
16/// Feeds parser.
17pub mod feeds_parser;
18
19/// Pull oracle support.
20pub mod pull_oracle;
21
22/// Utilities for token accounts.
23pub mod token_account;
24
25/// Simulate a transaction and view its output.
26pub mod view;
27
28/// Instruction buffer.
29pub mod instruction_buffer;
30
31/// Program IDs.
32pub mod program_ids;
33
34/// Chainlink support.
35#[cfg(feature = "chainlink")]
36pub mod chainlink;
37
38/// Pyth support.
39#[cfg(feature = "pyth")]
40pub mod pyth;
41
42/// Switchboard support.
43#[cfg(feature = "switchboard")]
44pub mod switchboard;
45
46/// Squads operations.
47#[cfg(feature = "squads")]
48pub mod squads;
49
50use std::{
51    collections::BTreeMap,
52    ops::Deref,
53    sync::{Arc, OnceLock},
54};
55
56use accounts::{
57    account_with_context, accounts_lazy_with_context, get_account_with_context,
58    ProgramAccountsConfig,
59};
60use gmsol_model::{price::Prices, PnlFactorKind};
61use gmsol_programs::{
62    anchor_lang::{AccountDeserialize, AnchorSerialize, Discriminator},
63    bytemuck,
64    gmsol_store::{
65        accounts as store_accounts,
66        types::{self as store_types, MarketStatus},
67    },
68};
69use gmsol_solana_utils::{
70    bundle_builder::{BundleBuilder, BundleOptions, CreateBundleOptions},
71    cluster::Cluster,
72    program::Program,
73    transaction_builder::{default_before_sign, Config, TransactionBuilder},
74    utils::WithSlot,
75};
76use gmsol_utils::oracle::PriceProviderKind;
77use instruction_buffer::InstructionBuffer;
78use ops::market::MarketOps;
79use pubsub::{PubsubClient, SubscriptionConfig};
80use solana_client::{
81    nonblocking::rpc_client::RpcClient,
82    rpc_config::RpcAccountInfoConfig,
83    rpc_filter::{Memcmp, RpcFilterType},
84};
85use solana_sdk::{
86    account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Signer,
87};
88use token_map::TokenMap;
89use tokio::sync::OnceCell;
90use typed_builder::TypedBuilder;
91
92use crate::{
93    builders::{
94        callback::{Callback, CallbackParams},
95        StoreProgram,
96    },
97    pda::{NonceBytes, ReferralCodeBytes},
98    utils::{
99        optional::optional_address,
100        zero_copy::{SharedZeroCopy, ZeroCopy},
101    },
102};
103
104#[cfg(liquidity_provider)]
105use gmsol_solana_utils::Program as ProgramTrait;
106
107#[cfg(liquidity_provider)]
108use crate::builders::liquidity_provider::LiquidityProviderProgram;
109
110#[cfg(feature = "decode")]
111use gmsol_decode::{gmsol::programs::GMSOLCPIEvent, Decode};
112
113#[cfg(feature = "decode")]
114use gmsol_programs::gmsol_store::events as store_events;
115
116/// Discriminator offset.
117pub const DISC_OFFSET: usize = 8;
118
119/// Options for [`Client`].
120#[derive(Debug, Clone, TypedBuilder)]
121pub struct ClientOptions {
122    #[builder(default)]
123    store_program_id: Option<Pubkey>,
124    #[builder(default)]
125    treasury_program_id: Option<Pubkey>,
126    #[cfg(liquidity_provider)]
127    #[builder(default)]
128    liquidity_provider_program_id: Option<Pubkey>,
129    #[builder(default)]
130    timelock_program_id: Option<Pubkey>,
131    #[builder(default)]
132    commitment: CommitmentConfig,
133    #[builder(default)]
134    subscription: SubscriptionConfig,
135}
136
137impl Default for ClientOptions {
138    fn default() -> Self {
139        Self::builder().build()
140    }
141}
142
143/// Client for interacting with the GMX-Solana protocol.
144pub struct Client<C> {
145    cfg: Config<C>,
146    store_program: Program<C>,
147    treasury_program: Program<C>,
148    #[cfg(liquidity_provider)]
149    liquidity_provider_program: LiquidityProviderProgram,
150    timelock_program: Program<C>,
151    rpc: OnceLock<RpcClient>,
152    pub_sub: OnceCell<PubsubClient>,
153    subscription_config: SubscriptionConfig,
154}
155
156impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
157    /// Create a new [`Client`] with the given options.
158    pub fn new_with_options(
159        cluster: Cluster,
160        payer: C,
161        options: ClientOptions,
162    ) -> crate::Result<Self> {
163        let ClientOptions {
164            store_program_id,
165            treasury_program_id,
166            #[cfg(liquidity_provider)]
167            liquidity_provider_program_id,
168            timelock_program_id,
169            commitment,
170            subscription,
171        } = options;
172        let cfg = Config::new(cluster, payer, commitment);
173        Ok(Self {
174            store_program: Program::new(
175                store_program_id.unwrap_or(gmsol_programs::gmsol_store::ID),
176                cfg.clone(),
177            ),
178            treasury_program: Program::new(
179                treasury_program_id.unwrap_or(gmsol_programs::gmsol_treasury::ID),
180                cfg.clone(),
181            ),
182            #[cfg(liquidity_provider)]
183            liquidity_provider_program: LiquidityProviderProgram::builder()
184                .id(liquidity_provider_program_id
185                    .unwrap_or(gmsol_programs::gmsol_liquidity_provider::ID))
186                .build(),
187            timelock_program: Program::new(
188                timelock_program_id.unwrap_or(gmsol_programs::gmsol_timelock::ID),
189                cfg.clone(),
190            ),
191            cfg,
192            pub_sub: OnceCell::default(),
193            rpc: Default::default(),
194            subscription_config: subscription,
195        })
196    }
197
198    /// Create a new [`Client`] with default options.
199    pub fn new(cluster: Cluster, payer: C) -> crate::Result<Self> {
200        Self::new_with_options(cluster, payer, ClientOptions::default())
201    }
202
203    /// Create a clone of this client with a new payer.
204    pub fn try_clone_with_payer<C2: Clone + Deref<Target = impl Signer>>(
205        &self,
206        payer: C2,
207    ) -> crate::Result<Client<C2>> {
208        Client::new_with_options(
209            self.cluster().clone(),
210            payer,
211            ClientOptions {
212                store_program_id: Some(*self.store_program_id()),
213                treasury_program_id: Some(*self.treasury_program_id()),
214                #[cfg(liquidity_provider)]
215                liquidity_provider_program_id: Some(*self.liquidity_provider_program.id()),
216                timelock_program_id: Some(*self.timelock_program_id()),
217                commitment: self.commitment(),
218                subscription: self.subscription_config.clone(),
219            },
220        )
221    }
222
223    /// Create a clone of this client.
224    pub fn try_clone(&self) -> crate::Result<Self> {
225        Ok(Self {
226            cfg: self.cfg.clone(),
227            store_program: self.program(*self.store_program_id()),
228            treasury_program: self.program(*self.treasury_program_id()),
229            #[cfg(liquidity_provider)]
230            liquidity_provider_program: self.liquidity_provider_program.clone(),
231            timelock_program: self.program(*self.timelock_program_id()),
232            pub_sub: OnceCell::default(),
233            rpc: Default::default(),
234            subscription_config: self.subscription_config.clone(),
235        })
236    }
237
238    /// Replace subscription config with the given.
239    pub fn set_subscription_config(&mut self, config: SubscriptionConfig) -> &mut Self {
240        self.subscription_config = config;
241        self
242    }
243
244    /// Create a new [`Program`] with the given program id.
245    pub fn program(&self, program_id: Pubkey) -> Program<C> {
246        Program::new(program_id, self.cfg.clone())
247    }
248
249    /// Get current cluster.
250    pub fn cluster(&self) -> &Cluster {
251        self.cfg.cluster()
252    }
253
254    /// Get current commitment config.
255    pub fn commitment(&self) -> CommitmentConfig {
256        *self.cfg.commitment()
257    }
258
259    /// Get current payer.
260    pub fn payer(&self) -> Pubkey {
261        self.cfg.payer()
262    }
263
264    /// Get [`RpcClient`].
265    pub fn rpc(&self) -> &RpcClient {
266        self.rpc.get_or_init(|| self.cfg.rpc())
267    }
268
269    /// Get store program.
270    pub fn store_program(&self) -> &Program<C> {
271        &self.store_program
272    }
273
274    /// Creates a [`StoreProgram`].
275    pub fn store_program_for_builders(&self, store: &Pubkey) -> StoreProgram {
276        StoreProgram::builder()
277            .id(*self.store_program_id())
278            .store(*store)
279            .build()
280    }
281
282    /// Get [`LiquidityProviderProgram`] for builders.
283    #[cfg(liquidity_provider)]
284    pub fn lp_program_for_builders(&self) -> &LiquidityProviderProgram {
285        &self.liquidity_provider_program
286    }
287
288    /// Get treasury program.
289    pub fn treasury_program(&self) -> &Program<C> {
290        &self.treasury_program
291    }
292
293    /// Get timelock program.
294    pub fn timelock_program(&self) -> &Program<C> {
295        &self.timelock_program
296    }
297
298    /// Create a new store program.
299    pub fn new_store_program(&self) -> crate::Result<Program<C>> {
300        Ok(self.program(*self.store_program_id()))
301    }
302
303    /// Create a new treasury program.
304    pub fn new_treasury_program(&self) -> crate::Result<Program<C>> {
305        Ok(self.program(*self.store_program_id()))
306    }
307
308    /// Get the program id of the store program.
309    pub fn store_program_id(&self) -> &Pubkey {
310        self.store_program().id()
311    }
312
313    /// Get the program id of the treasury program.
314    pub fn treasury_program_id(&self) -> &Pubkey {
315        self.treasury_program().id()
316    }
317
318    /// Get the program id of the timelock program.
319    pub fn timelock_program_id(&self) -> &Pubkey {
320        self.timelock_program().id()
321    }
322
323    /// Get the program id of the liquidity provider program.
324    #[cfg(liquidity_provider)]
325    pub fn liquidity_provider_program_id(&self) -> &Pubkey {
326        self.liquidity_provider_program.id()
327    }
328
329    /// Create a [`TransactionBuilder`] for the store program.
330    pub fn store_transaction(&self) -> TransactionBuilder<'_, C> {
331        self.store_program().transaction()
332    }
333
334    /// Create a [`TransactionBuilder`] for the treasury program.
335    pub fn treasury_transaction(&self) -> TransactionBuilder<'_, C> {
336        self.treasury_program().transaction()
337    }
338
339    /// Create a [`TransactionBuilder`] for the timelock program.
340    pub fn timelock_transaction(&self) -> TransactionBuilder<'_, C> {
341        self.timelock_program().transaction()
342    }
343
344    /// Create a [`BundleBuilder`] with the given options.
345    pub fn bundle_with_options(&self, options: BundleOptions) -> BundleBuilder<'_, C> {
346        BundleBuilder::new_with_options(CreateBundleOptions {
347            cluster: self.cluster().clone(),
348            commitment: self.commitment(),
349            options,
350        })
351    }
352
353    /// Create a [`BundleBuilder`] with default options.
354    pub fn bundle(&self) -> BundleBuilder<C> {
355        self.bundle_with_options(Default::default())
356    }
357
358    /// Find PDA for [`Store`](gmsol_programs::gmsol_store::accounts::Store) account.
359    pub fn find_store_address(&self, key: &str) -> Pubkey {
360        crate::pda::find_store_address(key, self.store_program_id()).0
361    }
362
363    /// Find PDA for store wallet account.
364    pub fn find_store_wallet_address(&self, store: &Pubkey) -> Pubkey {
365        crate::pda::find_store_wallet_address(store, self.store_program_id()).0
366    }
367
368    /// Get the event authority PDA for the `Store` program.
369    pub fn store_event_authority(&self) -> Pubkey {
370        crate::pda::find_event_authority_address(self.store_program_id()).0
371    }
372
373    /// Find PDA for market vault account.
374    pub fn find_market_vault_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
375        crate::pda::find_market_vault_address(store, token, self.store_program_id()).0
376    }
377
378    /// Find PDA for market token mint account.
379    pub fn find_market_token_address(
380        &self,
381        store: &Pubkey,
382        index_token: &Pubkey,
383        long_token: &Pubkey,
384        short_token: &Pubkey,
385    ) -> Pubkey {
386        crate::pda::find_market_token_address(
387            store,
388            index_token,
389            long_token,
390            short_token,
391            self.store_program_id(),
392        )
393        .0
394    }
395
396    /// Find PDA for market account.
397    pub fn find_market_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
398        crate::pda::find_market_address(store, token, self.store_program_id()).0
399    }
400
401    /// Find PDA for deposit account.
402    pub fn find_deposit_address(
403        &self,
404        store: &Pubkey,
405        user: &Pubkey,
406        nonce: &NonceBytes,
407    ) -> Pubkey {
408        crate::pda::find_deposit_address(store, user, nonce, self.store_program_id()).0
409    }
410
411    /// Find PDA for first deposit owner.
412    pub fn find_first_deposit_owner_address(&self) -> Pubkey {
413        crate::pda::find_first_deposit_receiver_address(self.store_program_id()).0
414    }
415
416    /// Find PDA for withdrawal account.
417    pub fn find_withdrawal_address(
418        &self,
419        store: &Pubkey,
420        user: &Pubkey,
421        nonce: &NonceBytes,
422    ) -> Pubkey {
423        crate::pda::find_withdrawal_address(store, user, nonce, self.store_program_id()).0
424    }
425
426    /// Find PDA for order.
427    pub fn find_order_address(&self, store: &Pubkey, user: &Pubkey, nonce: &NonceBytes) -> Pubkey {
428        crate::pda::find_order_address(store, user, nonce, self.store_program_id()).0
429    }
430
431    /// Find PDA for shift.
432    pub fn find_shift_address(&self, store: &Pubkey, owner: &Pubkey, nonce: &NonceBytes) -> Pubkey {
433        crate::pda::find_shift_address(store, owner, nonce, self.store_program_id()).0
434    }
435
436    /// Find PDA for position.
437    pub fn find_position_address(
438        &self,
439        store: &Pubkey,
440        user: &Pubkey,
441        market_token: &Pubkey,
442        collateral_token: &Pubkey,
443        is_long: bool,
444    ) -> crate::Result<Pubkey> {
445        Ok(crate::pda::find_position_address(
446            store,
447            user,
448            market_token,
449            collateral_token,
450            is_long,
451            self.store_program_id(),
452        )
453        .0)
454    }
455
456    /// Find PDA for claimable account.
457    pub fn find_claimable_account_address(
458        &self,
459        store: &Pubkey,
460        mint: &Pubkey,
461        user: &Pubkey,
462        time_key: &[u8],
463    ) -> Pubkey {
464        crate::pda::find_claimable_account_address(
465            store,
466            mint,
467            user,
468            time_key,
469            self.store_program_id(),
470        )
471        .0
472    }
473
474    /// Find PDA for trade event buffer account.
475    pub fn find_trade_event_buffer_address(
476        &self,
477        store: &Pubkey,
478        authority: &Pubkey,
479        index: u16,
480    ) -> Pubkey {
481        crate::pda::find_trade_event_buffer_address(
482            store,
483            authority,
484            index,
485            self.store_program_id(),
486        )
487        .0
488    }
489
490    /// Find PDA for user account.
491    pub fn find_user_address(&self, store: &Pubkey, owner: &Pubkey) -> Pubkey {
492        crate::pda::find_user_address(store, owner, self.store_program_id()).0
493    }
494
495    /// Find PDA for referral code.
496    pub fn find_referral_code_address(&self, store: &Pubkey, code: ReferralCodeBytes) -> Pubkey {
497        crate::pda::find_referral_code_address(store, code, self.store_program_id()).0
498    }
499
500    /// Find PDA for GLV token mint.
501    pub fn find_glv_token_address(&self, store: &Pubkey, index: u16) -> Pubkey {
502        crate::pda::find_glv_token_address(store, index, self.store_program_id()).0
503    }
504
505    /// Find PDA for GLV.
506    pub fn find_glv_address(&self, glv_token: &Pubkey) -> Pubkey {
507        crate::pda::find_glv_address(glv_token, self.store_program_id()).0
508    }
509
510    /// Find PDA for GLV deposit.
511    pub fn find_glv_deposit_address(
512        &self,
513        store: &Pubkey,
514        owner: &Pubkey,
515        nonce: &NonceBytes,
516    ) -> Pubkey {
517        crate::pda::find_glv_deposit_address(store, owner, nonce, self.store_program_id()).0
518    }
519
520    /// Find PDA for GLV withdrawal.
521    pub fn find_glv_withdrawal_address(
522        &self,
523        store: &Pubkey,
524        owner: &Pubkey,
525        nonce: &NonceBytes,
526    ) -> Pubkey {
527        crate::pda::find_glv_withdrawal_address(store, owner, nonce, self.store_program_id()).0
528    }
529
530    /// Find PDA for GT exchange vault.
531    pub fn find_gt_exchange_vault_address(
532        &self,
533        store: &Pubkey,
534        time_window_index: i64,
535        time_window: u32,
536    ) -> Pubkey {
537        crate::pda::find_gt_exchange_vault_address(
538            store,
539            time_window_index,
540            time_window,
541            self.store_program_id(),
542        )
543        .0
544    }
545
546    /// Find PDA for GT exchange.
547    pub fn find_gt_exchange_address(&self, vault: &Pubkey, owner: &Pubkey) -> Pubkey {
548        crate::pda::find_gt_exchange_address(vault, owner, self.store_program_id()).0
549    }
550
551    /// Find PDA for custom price feed.
552    pub fn find_price_feed_address(
553        &self,
554        store: &Pubkey,
555        authority: &Pubkey,
556        index: u16,
557        provider: PriceProviderKind,
558        token: &Pubkey,
559    ) -> Pubkey {
560        crate::pda::find_price_feed_address(
561            store,
562            authority,
563            index,
564            provider,
565            token,
566            self.store_program_id(),
567        )
568        .0
569    }
570
571    /// Find PDA for treasury global config.
572    pub fn find_treasury_config_address(&self, store: &Pubkey) -> Pubkey {
573        crate::pda::find_treasury_config_address(store, self.treasury_program_id()).0
574    }
575
576    /// Find PDA for treasury vault config.
577    pub fn find_treasury_vault_config_address(&self, config: &Pubkey, index: u16) -> Pubkey {
578        crate::pda::find_treasury_vault_config_address(config, index, self.treasury_program_id()).0
579    }
580
581    /// Find PDA for GT bank.
582    pub fn find_gt_bank_address(
583        &self,
584        treasury_vault_config: &Pubkey,
585        gt_exchange_vault: &Pubkey,
586    ) -> Pubkey {
587        crate::pda::find_gt_bank_address(
588            treasury_vault_config,
589            gt_exchange_vault,
590            self.treasury_program_id(),
591        )
592        .0
593    }
594
595    /// Find PDA for treasury receiver.
596    pub fn find_treasury_receiver_address(&self, config: &Pubkey) -> Pubkey {
597        crate::pda::find_treasury_receiver_address(config, self.treasury_program_id()).0
598    }
599
600    /// Find PDA for timelock config.
601    pub fn find_timelock_config_address(&self, store: &Pubkey) -> Pubkey {
602        crate::pda::find_timelock_config_address(store, self.timelock_program_id()).0
603    }
604
605    /// Find PDA for timelock executor.
606    pub fn find_executor_address(&self, store: &Pubkey, role: &str) -> crate::Result<Pubkey> {
607        Ok(crate::pda::find_executor_address(store, role, self.timelock_program_id())?.0)
608    }
609
610    /// Find the wallet PDA for the given timelock executor.
611    pub fn find_executor_wallet_address(&self, executor: &Pubkey) -> Pubkey {
612        crate::pda::find_executor_wallet_address(executor, self.timelock_program_id()).0
613    }
614
615    /// Find the PDA for callback authority.
616    pub fn find_callback_authority_address(&self) -> Pubkey {
617        crate::pda::find_callback_authority(self.store_program_id()).0
618    }
619
620    /// Find the PDA for virtual inventory for swaps.
621    pub fn find_virtual_inventory_for_swaps_address(&self, store: &Pubkey, index: u32) -> Pubkey {
622        crate::pda::find_virtual_inventory_for_swaps_address(store, index, self.store_program_id())
623            .0
624    }
625
626    /// Find the PDA for virtual inventory for positions.
627    pub fn find_virtual_inventory_for_positions_address(
628        &self,
629        store: &Pubkey,
630        index_token: &Pubkey,
631    ) -> Pubkey {
632        crate::pda::find_virtual_inventory_for_positions_address(
633            store,
634            index_token,
635            self.store_program_id(),
636        )
637        .0
638    }
639
640    pub(crate) fn get_callback_params(&self, callback: Option<&Callback>) -> CallbackParams {
641        match callback {
642            Some(callback) => CallbackParams {
643                callback_version: Some(callback.version),
644                callback_authority: Some(self.find_callback_authority_address()),
645                callback_program: Some(callback.program.0),
646                callback_shared_data_account: Some(callback.shared_data.0),
647                callback_partitioned_data_account: Some(callback.partitioned_data.0),
648            },
649            None => CallbackParams::default(),
650        }
651    }
652
653    /// Get latest slot.
654    pub async fn get_slot(&self, commitment: Option<CommitmentConfig>) -> crate::Result<u64> {
655        let slot = self
656            .store_program()
657            .rpc()
658            .get_slot_with_commitment(commitment.unwrap_or(self.commitment()))
659            .await
660            .map_err(crate::Error::custom)?;
661        Ok(slot)
662    }
663
664    /// Fetch accounts owned by the store program.
665    pub async fn store_accounts_with_config<T>(
666        &self,
667        filter_by_store: Option<StoreFilter>,
668        other_filters: impl IntoIterator<Item = RpcFilterType>,
669        config: ProgramAccountsConfig,
670    ) -> crate::Result<WithSlot<Vec<(Pubkey, T)>>>
671    where
672        T: AccountDeserialize + Discriminator,
673    {
674        let filters = std::iter::empty()
675            .chain(
676                filter_by_store
677                    .inspect(|filter| {
678                        let store = &filter.store;
679                        tracing::debug!(%store, offset=%filter.store_offset(), "store bytes to filter: {}", hex::encode(store));
680                    })
681                    .map(RpcFilterType::from),
682            )
683            .chain(other_filters);
684        accounts_lazy_with_context(self.store_program(), filters, config)
685            .await?
686            .map(|iter| iter.collect())
687            .transpose()
688    }
689
690    /// Fetch account without deserialization.
691    pub async fn raw_account_with_config(
692        &self,
693        address: &Pubkey,
694        config: RpcAccountInfoConfig,
695    ) -> crate::Result<WithSlot<Option<Account>>> {
696        let client = self.store_program().rpc();
697        get_account_with_context(&client, address, config).await
698    }
699
700    /// Fetch account and decode.
701    #[cfg(feature = "decode")]
702    pub async fn decode_account_with_config(
703        &self,
704        address: &Pubkey,
705        config: RpcAccountInfoConfig,
706    ) -> crate::Result<WithSlot<Option<gmsol_decode::gmsol::programs::GMSOLAccountData>>> {
707        use crate::utils::decode::KeyedAccount;
708        use gmsol_decode::{decoder::AccountAccessDecoder, gmsol::programs::GMSOLAccountData};
709
710        let account = self.raw_account_with_config(address, config).await?;
711        let slot = account.slot();
712        match account.into_value() {
713            Some(account) => {
714                let account = KeyedAccount {
715                    pubkey: *address,
716                    account: WithSlot::new(slot, account),
717                };
718                let decoder = AccountAccessDecoder::new(account);
719                let decoded = GMSOLAccountData::decode(decoder)?;
720                Ok(WithSlot::new(slot, Some(decoded)))
721            }
722            None => Ok(WithSlot::new(slot, None)),
723        }
724    }
725
726    /// Fetch account with the given address with config.
727    ///
728    /// The value inside the returned context will be `None` if the account does not exist.
729    pub async fn account_with_config<T>(
730        &self,
731        address: &Pubkey,
732        config: RpcAccountInfoConfig,
733    ) -> crate::Result<WithSlot<Option<T>>>
734    where
735        T: AccountDeserialize,
736    {
737        let client = self.store_program().rpc();
738        account_with_context(&client, address, config).await
739    }
740
741    /// Fetch account with the given address.
742    pub async fn account<T: AccountDeserialize>(
743        &self,
744        address: &Pubkey,
745    ) -> crate::Result<Option<T>> {
746        Ok(self
747            .account_with_config(address, Default::default())
748            .await?
749            .into_value())
750    }
751
752    /// Fetch accounts owned by the store program.
753    pub async fn store_accounts<T>(
754        &self,
755        filter_by_store: Option<StoreFilter>,
756        other_filters: impl IntoIterator<Item = RpcFilterType>,
757    ) -> crate::Result<Vec<(Pubkey, T)>>
758    where
759        T: AccountDeserialize + Discriminator,
760    {
761        let res = self
762            .store_accounts_with_config(
763                filter_by_store,
764                other_filters,
765                ProgramAccountsConfig::default(),
766            )
767            .await?;
768        tracing::debug!(slot=%res.slot(), "accounts fetched");
769        Ok(res.into_value())
770    }
771
772    /// Fetch [`Store`](store_accounts::Store) account with its address.
773    pub async fn store(&self, address: &Pubkey) -> crate::Result<Arc<store_accounts::Store>> {
774        Ok(self
775            .account::<SharedZeroCopy<store_accounts::Store>>(address)
776            .await?
777            .ok_or(crate::Error::NotFound)?
778            .0)
779    }
780
781    /// Fetch user account with its address.
782    pub async fn user(&self, address: &Pubkey) -> crate::Result<store_accounts::UserHeader> {
783        Ok(self
784            .account::<ZeroCopy<store_accounts::UserHeader>>(address)
785            .await?
786            .ok_or(crate::Error::NotFound)?
787            .0)
788    }
789
790    /// Fetch the [`TokenMap`] address of the given store.
791    pub async fn authorized_token_map_address(
792        &self,
793        store: &Pubkey,
794    ) -> crate::Result<Option<Pubkey>> {
795        let store = self.store(store).await?;
796        let token_map = store.token_map;
797        Ok(optional_address(&token_map).copied())
798    }
799
800    /// Fetch [`TokenMap`] account with its address.
801    pub async fn token_map(&self, address: &Pubkey) -> crate::Result<TokenMap> {
802        self.account(address).await?.ok_or(crate::Error::NotFound)
803    }
804
805    /// Fetch the authorized token map of the given store.
806    pub async fn authorized_token_map(&self, store: &Pubkey) -> crate::Result<TokenMap> {
807        let address = self
808            .authorized_token_map_address(store)
809            .await?
810            .ok_or(crate::Error::custom("token map is not set"))?;
811        self.token_map(&address).await
812    }
813
814    /// Fetch all [`Market`](store_accounts::Market) accounts of the given store.
815    pub async fn markets_with_config(
816        &self,
817        store: &Pubkey,
818        config: ProgramAccountsConfig,
819    ) -> crate::Result<WithSlot<BTreeMap<Pubkey, Arc<store_accounts::Market>>>> {
820        let markets = self
821            .store_accounts_with_config::<SharedZeroCopy<store_accounts::Market>>(
822                Some(StoreFilter::new(
823                    store,
824                    bytemuck::offset_of!(store_accounts::Market, store),
825                )),
826                None,
827                config,
828            )
829            .await?
830            .map(|accounts| {
831                accounts
832                    .into_iter()
833                    .map(|(pubkey, m)| (pubkey, m.0))
834                    .collect::<BTreeMap<_, _>>()
835            });
836        Ok(markets)
837    }
838
839    /// Fetch all [`Market`](store_accounts::Market) accounts of the given store.
840    pub async fn markets(
841        &self,
842        store: &Pubkey,
843    ) -> crate::Result<BTreeMap<Pubkey, Arc<store_accounts::Market>>> {
844        let markets = self
845            .markets_with_config(store, ProgramAccountsConfig::default())
846            .await?
847            .into_value();
848        Ok(markets)
849    }
850
851    /// Fetch [`Market`](store_accounts::Market) at the given address with config.
852    ///
853    /// The value inside the returned context will be `None` if the account does not exist.
854    pub async fn market_with_config<T>(
855        &self,
856        address: &Pubkey,
857        config: RpcAccountInfoConfig,
858    ) -> crate::Result<WithSlot<Option<Arc<store_accounts::Market>>>> {
859        let market = self
860            .account_with_config::<SharedZeroCopy<store_accounts::Market>>(address, config)
861            .await?;
862        Ok(market.map(|m| m.map(|m| m.0)))
863    }
864
865    /// Fetch [`Market`](store_accounts::Market) account with its address.
866    pub async fn market(&self, address: &Pubkey) -> crate::Result<Arc<store_accounts::Market>> {
867        Ok(self
868            .account::<SharedZeroCopy<store_accounts::Market>>(address)
869            .await?
870            .ok_or(crate::Error::NotFound)?
871            .0)
872    }
873
874    /// Fetch [`Market`](store_accounts::Market) account with its token address.
875    pub async fn market_by_token(
876        &self,
877        store: &Pubkey,
878        market_token: &Pubkey,
879    ) -> crate::Result<Arc<store_accounts::Market>> {
880        let address = self.find_market_address(store, market_token);
881        self.market(&address).await
882    }
883
884    /// Fetch all [`Glv`](store_accounts::Glv) accounts of the given store.
885    pub async fn glvs_with_config(
886        &self,
887        store: &Pubkey,
888        config: ProgramAccountsConfig,
889    ) -> crate::Result<WithSlot<BTreeMap<Pubkey, store_accounts::Glv>>> {
890        let glvs = self
891            .store_accounts_with_config::<ZeroCopy<store_accounts::Glv>>(
892                Some(StoreFilter::new(
893                    store,
894                    bytemuck::offset_of!(store_accounts::Glv, store),
895                )),
896                None,
897                config,
898            )
899            .await?
900            .map(|accounts| {
901                accounts
902                    .into_iter()
903                    .map(|(pubkey, m)| (pubkey, m.0))
904                    .collect::<BTreeMap<_, _>>()
905            });
906        Ok(glvs)
907    }
908
909    /// Fetch all [`Glv`](store_accounts::Glv) accounts of the given store.
910    pub async fn glvs(
911        &self,
912        store: &Pubkey,
913    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Glv>> {
914        let glvs = self
915            .glvs_with_config(store, ProgramAccountsConfig::default())
916            .await?
917            .into_value();
918        Ok(glvs)
919    }
920
921    /// Fetch [`MarketStatus`] with market token address.
922    pub async fn market_status(
923        &self,
924        store: &Pubkey,
925        market_token: &Pubkey,
926        prices: Prices<u128>,
927        maximize_pnl: bool,
928        maximize_pool_value: bool,
929    ) -> crate::Result<MarketStatus> {
930        let req = self.get_market_status(
931            store,
932            market_token,
933            prices,
934            maximize_pnl,
935            maximize_pool_value,
936        );
937        let status = view::view::<MarketStatus>(
938            &self.store_program().rpc(),
939            &req.signed_transaction_with_options(true, None, None, default_before_sign)
940                .await?,
941        )
942        .await?;
943        Ok(status)
944    }
945
946    /// Fetch current market token price with market token address.
947    pub async fn market_token_price(
948        &self,
949        store: &Pubkey,
950        market_token: &Pubkey,
951        prices: Prices<u128>,
952        pnl_factor: PnlFactorKind,
953        maximize: bool,
954    ) -> crate::Result<u128> {
955        let req = self.get_market_token_price(store, market_token, prices, pnl_factor, maximize);
956        let price = view::view::<u128>(
957            &self.store_program().rpc(),
958            &req.signed_transaction_with_options(true, None, None, default_before_sign)
959                .await?,
960        )
961        .await?;
962        Ok(price)
963    }
964
965    /// Fetch [`Position`](store_accounts::Position) accounts.
966    pub async fn positions(
967        &self,
968        store: &Pubkey,
969        owner: Option<&Pubkey>,
970        market_token: Option<&Pubkey>,
971    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Position>> {
972        let filter = match owner {
973            Some(owner) => {
974                let mut bytes = owner.as_ref().to_owned();
975                if let Some(market_token) = market_token {
976                    bytes.extend_from_slice(market_token.as_ref());
977                }
978                let filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
979                    bytemuck::offset_of!(store_accounts::Position, owner) + DISC_OFFSET,
980                    &bytes,
981                ));
982                Some(filter)
983            }
984            None => market_token.and_then(|token| {
985                Some(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
986                    bytemuck::offset_of!(store_accounts::Position, market_token) + DISC_OFFSET,
987                    &token.try_to_vec().ok()?,
988                )))
989            }),
990        };
991
992        let store_filter =
993            StoreFilter::new(store, bytemuck::offset_of!(store_accounts::Position, store));
994
995        let positions = self
996            .store_accounts::<ZeroCopy<store_accounts::Position>>(Some(store_filter), filter)
997            .await?
998            .into_iter()
999            .map(|(pubkey, p)| (pubkey, p.0))
1000            .collect();
1001
1002        Ok(positions)
1003    }
1004
1005    /// Fetch [`Position`](store_accounts::Position) account with its address.
1006    pub async fn position(&self, address: &Pubkey) -> crate::Result<store_accounts::Position> {
1007        let position = self
1008            .account::<ZeroCopy<store_accounts::Position>>(address)
1009            .await?
1010            .ok_or(crate::Error::NotFound)?;
1011        Ok(position.0)
1012    }
1013
1014    /// Fetch [`Order`](store_accounts::Order) account with its address.
1015    pub async fn order(&self, address: &Pubkey) -> crate::Result<store_accounts::Order> {
1016        Ok(self
1017            .account::<ZeroCopy<store_accounts::Order>>(address)
1018            .await?
1019            .ok_or(crate::Error::NotFound)?
1020            .0)
1021    }
1022
1023    /// Fetch [`Order`](store_accounts::Order) account at the the given address with config.
1024    ///
1025    /// The value inside the returned context will be `None` if the account does not exist.
1026    pub async fn order_with_config(
1027        &self,
1028        address: &Pubkey,
1029        config: RpcAccountInfoConfig,
1030    ) -> crate::Result<WithSlot<Option<store_accounts::Order>>> {
1031        Ok(self
1032            .account_with_config::<ZeroCopy<store_accounts::Order>>(address, config)
1033            .await?
1034            .map(|a| a.map(|a| a.0)))
1035    }
1036
1037    fn create_action_filters(
1038        &self,
1039        store: &Pubkey,
1040        owner: Option<&Pubkey>,
1041        market_token: Option<&Pubkey>,
1042    ) -> (StoreFilter, Vec<RpcFilterType>) {
1043        let mut filters = Vec::default();
1044        if let Some(owner) = owner {
1045            filters.push(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1046                bytemuck::offset_of!(store_types::ActionHeader, owner) + DISC_OFFSET,
1047                owner.as_ref(),
1048            )));
1049        }
1050        if let Some(market_token) = market_token {
1051            let market = self.find_market_address(store, market_token);
1052            filters.push(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1053                bytemuck::offset_of!(store_types::ActionHeader, market) + DISC_OFFSET,
1054                market.as_ref(),
1055            )));
1056        }
1057        let store_filter = StoreFilter::new(
1058            store,
1059            bytemuck::offset_of!(store_types::ActionHeader, store),
1060        );
1061
1062        (store_filter, filters)
1063    }
1064
1065    /// Fetch [`Order`](store_accounts::Order) accounts.
1066    pub async fn orders(
1067        &self,
1068        store: &Pubkey,
1069        owner: Option<&Pubkey>,
1070        market_token: Option<&Pubkey>,
1071    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Order>> {
1072        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1073
1074        let orders = self
1075            .store_accounts::<ZeroCopy<store_accounts::Order>>(Some(store_filter), filters)
1076            .await?
1077            .into_iter()
1078            .map(|(addr, order)| (addr, order.0))
1079            .collect();
1080
1081        Ok(orders)
1082    }
1083
1084    /// Fetch [`Depsoit`](store_accounts::Deposit) account with its address.
1085    pub async fn deposit(&self, address: &Pubkey) -> crate::Result<store_accounts::Deposit> {
1086        Ok(self
1087            .account::<ZeroCopy<_>>(address)
1088            .await?
1089            .ok_or(crate::Error::NotFound)?
1090            .0)
1091    }
1092
1093    /// Fetch [`Deposit`](store_accounts::Deposit) accounts.
1094    pub async fn deposits(
1095        &self,
1096        store: &Pubkey,
1097        owner: Option<&Pubkey>,
1098        market_token: Option<&Pubkey>,
1099    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Deposit>> {
1100        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1101
1102        let orders = self
1103            .store_accounts::<ZeroCopy<store_accounts::Deposit>>(Some(store_filter), filters)
1104            .await?
1105            .into_iter()
1106            .map(|(addr, action)| (addr, action.0))
1107            .collect();
1108
1109        Ok(orders)
1110    }
1111
1112    /// Fetch [`Withdrawal`](store_accounts::Withdrawal) account with its address.
1113    pub async fn withdrawal(&self, address: &Pubkey) -> crate::Result<store_accounts::Withdrawal> {
1114        Ok(self
1115            .account::<ZeroCopy<store_accounts::Withdrawal>>(address)
1116            .await?
1117            .ok_or(crate::Error::NotFound)?
1118            .0)
1119    }
1120
1121    /// Fetch [`Withdrawal`](store_accounts::Withdrawal) accounts.
1122    pub async fn withdrawals(
1123        &self,
1124        store: &Pubkey,
1125        owner: Option<&Pubkey>,
1126        market_token: Option<&Pubkey>,
1127    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Withdrawal>> {
1128        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1129
1130        let orders = self
1131            .store_accounts::<ZeroCopy<store_accounts::Withdrawal>>(Some(store_filter), filters)
1132            .await?
1133            .into_iter()
1134            .map(|(addr, action)| (addr, action.0))
1135            .collect();
1136
1137        Ok(orders)
1138    }
1139
1140    /// Fetch [`Shift`](store_accounts::Shift) accounts.
1141    pub async fn shifts(
1142        &self,
1143        store: &Pubkey,
1144        owner: Option<&Pubkey>,
1145        market_token: Option<&Pubkey>,
1146    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Shift>> {
1147        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1148
1149        let orders = self
1150            .store_accounts::<ZeroCopy<store_accounts::Shift>>(Some(store_filter), filters)
1151            .await?
1152            .into_iter()
1153            .map(|(addr, action)| (addr, action.0))
1154            .collect();
1155
1156        Ok(orders)
1157    }
1158
1159    /// Fetch [`GlvDeposit`](store_accounts::GlvDeposit) accounts.
1160    pub async fn glv_deposits(
1161        &self,
1162        store: &Pubkey,
1163        owner: Option<&Pubkey>,
1164        market_token: Option<&Pubkey>,
1165    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvDeposit>> {
1166        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1167
1168        let orders = self
1169            .store_accounts::<ZeroCopy<store_accounts::GlvDeposit>>(Some(store_filter), filters)
1170            .await?
1171            .into_iter()
1172            .map(|(addr, action)| (addr, action.0))
1173            .collect();
1174
1175        Ok(orders)
1176    }
1177
1178    /// Fetch [`GlvWithdrawal`](store_accounts::GlvWithdrawal) accounts.
1179    pub async fn glv_withdrawals(
1180        &self,
1181        store: &Pubkey,
1182        owner: Option<&Pubkey>,
1183        market_token: Option<&Pubkey>,
1184    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvWithdrawal>> {
1185        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1186
1187        let orders = self
1188            .store_accounts::<ZeroCopy<store_accounts::GlvWithdrawal>>(Some(store_filter), filters)
1189            .await?
1190            .into_iter()
1191            .map(|(addr, action)| (addr, action.0))
1192            .collect();
1193
1194        Ok(orders)
1195    }
1196
1197    /// Fetch [`GlvShift`](store_accounts::GlvShift) accounts.
1198    pub async fn glv_shifts(
1199        &self,
1200        store: &Pubkey,
1201        owner: Option<&Pubkey>,
1202        market_token: Option<&Pubkey>,
1203    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvShift>> {
1204        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1205
1206        let orders = self
1207            .store_accounts::<ZeroCopy<store_accounts::GlvShift>>(Some(store_filter), filters)
1208            .await?
1209            .into_iter()
1210            .map(|(addr, action)| (addr, action.0))
1211            .collect();
1212
1213        Ok(orders)
1214    }
1215
1216    /// Fetch [`PriceFeed`](store_accounts::PriceFeed) account with its address.
1217    pub async fn price_feed(
1218        &self,
1219        address: &Pubkey,
1220    ) -> crate::Result<Option<store_accounts::PriceFeed>> {
1221        Ok(self
1222            .account::<ZeroCopy<store_accounts::PriceFeed>>(address)
1223            .await?
1224            .map(|a| a.0))
1225    }
1226
1227    /// Get the [`PubsubClient`].
1228    pub async fn pub_sub(&self) -> crate::Result<&PubsubClient> {
1229        let client = self
1230            .pub_sub
1231            .get_or_try_init(|| {
1232                PubsubClient::new(self.cluster().clone(), self.subscription_config.clone())
1233            })
1234            .await?;
1235        Ok(client)
1236    }
1237
1238    /// Subscribe to [`GMSOLCPIEvent`]s from the store program.
1239    #[cfg(feature = "decode")]
1240    pub async fn subscribe_store_cpi_events(
1241        &self,
1242        commitment: Option<CommitmentConfig>,
1243    ) -> crate::Result<impl futures_util::Stream<Item = crate::Result<WithSlot<Vec<GMSOLCPIEvent>>>>>
1244    {
1245        use futures_util::TryStreamExt;
1246        use transaction_history::extract_cpi_events;
1247
1248        let program_id = self.store_program_id();
1249        let event_authority = self.store_event_authority();
1250        let query = Arc::new(self.store_program().rpc());
1251        let commitment = commitment.unwrap_or(self.subscription_config.commitment);
1252        let signatures = self
1253            .pub_sub()
1254            .await?
1255            .logs_subscribe(&event_authority, Some(commitment))
1256            .await?
1257            .and_then(|txn| {
1258                let signature = txn
1259                    .map(|txn| txn.signature.parse().map_err(crate::Error::custom))
1260                    .transpose();
1261                async move { signature }
1262            });
1263        let events = extract_cpi_events(
1264            signatures,
1265            query,
1266            program_id,
1267            &event_authority,
1268            commitment,
1269            Some(0),
1270        )
1271        .try_filter_map(|event| {
1272            let decoded = event
1273                .map(|event| {
1274                    event
1275                        .events
1276                        .iter()
1277                        .map(|event| GMSOLCPIEvent::decode(event).map_err(crate::Error::from))
1278                        .collect::<crate::Result<Vec<_>>>()
1279                })
1280                .transpose()
1281                .inspect_err(|err| tracing::error!(%err, "decode error"))
1282                .ok();
1283            async move { Ok(decoded) }
1284        });
1285        Ok(events)
1286    }
1287
1288    /// Fetch historical [`GMSOLCPIEvent`]s for the given account.
1289    #[cfg(feature = "decode")]
1290    pub async fn historical_store_cpi_events(
1291        &self,
1292        address: &Pubkey,
1293        commitment: Option<CommitmentConfig>,
1294    ) -> crate::Result<impl futures_util::Stream<Item = crate::Result<WithSlot<Vec<GMSOLCPIEvent>>>>>
1295    {
1296        use futures_util::TryStreamExt;
1297        use transaction_history::{extract_cpi_events, fetch_transaction_history_with_config};
1298
1299        let commitment = commitment.unwrap_or(self.commitment());
1300        let client = Arc::new(self.store_program().rpc());
1301        let signatures = fetch_transaction_history_with_config(
1302            client.clone(),
1303            address,
1304            commitment,
1305            None,
1306            None,
1307            None,
1308        )
1309        .await?;
1310        let events = extract_cpi_events(
1311            signatures,
1312            client,
1313            self.store_program_id(),
1314            &self.store_event_authority(),
1315            commitment,
1316            Some(0),
1317        )
1318        .try_filter(|events| std::future::ready(!events.value().events.is_empty()))
1319        .and_then(|encoded| {
1320            let decoded = encoded
1321                .map(|event| {
1322                    event
1323                        .events
1324                        .iter()
1325                        .map(|event| GMSOLCPIEvent::decode(event).map_err(crate::Error::from))
1326                        .collect::<crate::Result<Vec<_>>>()
1327                })
1328                .transpose();
1329            async move { decoded }
1330        });
1331        Ok(events)
1332    }
1333
1334    /// Wait for an order to be completed using current slot as min context slot.
1335    #[cfg(feature = "decode")]
1336    pub async fn complete_order(
1337        &self,
1338        address: &Pubkey,
1339        commitment: Option<CommitmentConfig>,
1340    ) -> crate::Result<Option<store_events::TradeEvent>> {
1341        let slot = self.get_slot(None).await?;
1342        self.complete_order_with_config(
1343            address,
1344            slot,
1345            std::time::Duration::from_secs(5),
1346            commitment,
1347        )
1348        .await
1349    }
1350
1351    /// Get last order events.
1352    #[cfg(feature = "decode")]
1353    pub async fn last_order_events(
1354        &self,
1355        order: &Pubkey,
1356        before_slot: u64,
1357        commitment: CommitmentConfig,
1358    ) -> crate::Result<Vec<GMSOLCPIEvent>> {
1359        use futures_util::{StreamExt, TryStreamExt};
1360
1361        let events = self
1362            .historical_store_cpi_events(order, Some(commitment))
1363            .await?
1364            .try_filter(|events| {
1365                let pass = events.slot() <= before_slot;
1366                async move { pass }
1367            })
1368            .take(1);
1369        futures_util::pin_mut!(events);
1370        match events.next().await.transpose()? {
1371            Some(events) => Ok(events.into_value()),
1372            None => Err(crate::Error::custom(format!(
1373                "events not found, slot={before_slot}"
1374            ))),
1375        }
1376    }
1377
1378    /// Wait for an order to be completed with the given config.
1379    #[cfg(feature = "decode")]
1380    pub async fn complete_order_with_config(
1381        &self,
1382        address: &Pubkey,
1383        mut slot: u64,
1384        polling: std::time::Duration,
1385        commitment: Option<CommitmentConfig>,
1386    ) -> crate::Result<Option<store_events::TradeEvent>> {
1387        use futures_util::{StreamExt, TryStreamExt};
1388        use solana_account_decoder::UiAccountEncoding;
1389
1390        let mut trade = None;
1391        let commitment = commitment.unwrap_or(self.subscription_config.commitment);
1392
1393        let events = self.subscribe_store_cpi_events(Some(commitment)).await?;
1394
1395        let config = RpcAccountInfoConfig {
1396            encoding: Some(UiAccountEncoding::Base64),
1397            commitment: Some(commitment),
1398            min_context_slot: Some(slot),
1399            ..Default::default()
1400        };
1401        let mut slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1402        if slot_reached {
1403            let order = self.order_with_config(address, config.clone()).await?;
1404            slot = order.slot();
1405            let order = order.into_value();
1406            if order.is_none() {
1407                let events = self.last_order_events(address, slot, commitment).await?;
1408                return Ok(events
1409                    .into_iter()
1410                    .filter_map(|event| {
1411                        if let GMSOLCPIEvent::TradeEvent(event) = event {
1412                            Some(event)
1413                        } else {
1414                            None
1415                        }
1416                    })
1417                    .next());
1418            }
1419        }
1420        let address = *address;
1421        let stream = events
1422            .try_filter_map(|events| async {
1423                if events.slot() < slot {
1424                    return Ok(None);
1425                }
1426                let events = events
1427                    .into_value()
1428                    .into_iter()
1429                    .filter(|event| {
1430                        matches!(
1431                            event,
1432                            GMSOLCPIEvent::TradeEvent(_) | GMSOLCPIEvent::OrderRemoved(_)
1433                        )
1434                    })
1435                    .map(Ok);
1436                Ok(Some(futures_util::stream::iter(events)))
1437            })
1438            .try_flatten();
1439        let stream =
1440            tokio_stream::StreamExt::timeout_repeating(stream, tokio::time::interval(polling));
1441        futures_util::pin_mut!(stream);
1442        while let Some(res) = stream.next().await {
1443            match res {
1444                Ok(Ok(event)) => match event {
1445                    GMSOLCPIEvent::TradeEvent(event) => {
1446                        trade = Some(event);
1447                    }
1448                    GMSOLCPIEvent::OrderRemoved(_remove) => {
1449                        return Ok(trade);
1450                    }
1451                    _ => unreachable!(),
1452                },
1453                Ok(Err(err)) => {
1454                    return Err(err);
1455                }
1456                Err(_elapsed) => {
1457                    if slot_reached {
1458                        let res = self.order_with_config(&address, config.clone()).await?;
1459                        if res.value().is_none() {
1460                            let events = self
1461                                .last_order_events(&address, res.slot(), commitment)
1462                                .await?;
1463                            return Ok(events
1464                                .into_iter()
1465                                .filter_map(|event| {
1466                                    if let GMSOLCPIEvent::TradeEvent(event) = event {
1467                                        Some(event)
1468                                    } else {
1469                                        None
1470                                    }
1471                                })
1472                                .next());
1473                        }
1474                    } else {
1475                        slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1476                    }
1477                }
1478            }
1479        }
1480        Err(crate::Error::custom("the watch stream end"))
1481    }
1482
1483    /// Shutdown the client gracefully.
1484    pub async fn shutdown(&self) -> crate::Result<()> {
1485        self.pub_sub().await?.shutdown().await
1486    }
1487
1488    /// Get GT exchanges.
1489    pub async fn gt_exchanges(
1490        &self,
1491        store: &Pubkey,
1492        owner: &Pubkey,
1493    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GtExchange>> {
1494        use store_accounts::GtExchange;
1495
1496        let store_filter = StoreFilter::new(store, bytemuck::offset_of!(GtExchange, store));
1497        let owner_filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1498            8 + bytemuck::offset_of!(GtExchange, owner),
1499            owner.as_ref(),
1500        ));
1501        let exchanges = self
1502            .store_accounts::<ZeroCopy<GtExchange>>(Some(store_filter), Some(owner_filter))
1503            .await?;
1504        Ok(exchanges
1505            .into_iter()
1506            .map(|(address, exchange)| (address, exchange.0))
1507            .collect())
1508    }
1509
1510    /// Fetch [`InstructionBuffer`] account with its address.
1511    pub async fn instruction_buffer(
1512        &self,
1513        address: &Pubkey,
1514    ) -> crate::Result<Option<InstructionBuffer>> {
1515        self.account::<InstructionBuffer>(address).await
1516    }
1517}
1518
1519/// Store Filter.
1520#[derive(Debug)]
1521pub struct StoreFilter {
1522    /// Store.
1523    store: Pubkey,
1524    /// Store offset.
1525    store_offset: usize,
1526    /// Ignore disc bytes.
1527    ignore_disc_offset: bool,
1528}
1529
1530impl StoreFilter {
1531    /// Create a new store filter.
1532    pub fn new(store: &Pubkey, store_offset: usize) -> Self {
1533        Self {
1534            store: *store,
1535            store_offset,
1536            ignore_disc_offset: false,
1537        }
1538    }
1539
1540    /// Ignore discriminator offset.
1541    pub fn ignore_disc_offset(mut self, ignore: bool) -> Self {
1542        self.ignore_disc_offset = ignore;
1543        self
1544    }
1545
1546    /// Store offset.
1547    pub fn store_offset(&self) -> usize {
1548        if self.ignore_disc_offset {
1549            self.store_offset
1550        } else {
1551            self.store_offset + DISC_OFFSET
1552        }
1553    }
1554}
1555
1556impl From<StoreFilter> for RpcFilterType {
1557    fn from(filter: StoreFilter) -> Self {
1558        let store = filter.store;
1559        let store_offset = filter.store_offset();
1560        RpcFilterType::Memcmp(Memcmp::new_base58_encoded(store_offset, store.as_ref()))
1561    }
1562}