Skip to main content

gmsol_sdk/client/
mod.rs

1/// Client for transaction subscription.
2pub mod pubsub;
3
4/// Utilities for program accounts.
5pub mod accounts;
6
7/// Utilities for transaction history.
8pub mod transaction_history;
9
10/// Definition of [`TokenMap`].
11pub mod token_map;
12
13/// Operations.
14pub mod ops;
15
16/// Feeds parser.
17pub mod feeds_parser;
18
19/// Pull oracle support.
20pub mod pull_oracle;
21
22/// Utilities for token accounts.
23pub mod token_account;
24
25/// Simulate a transaction and view its output.
26pub mod view;
27
28/// Instruction buffer.
29pub mod instruction_buffer;
30
31/// Program IDs.
32pub mod program_ids;
33
34/// Chainlink support.
35#[cfg(feature = "chainlink")]
36pub mod chainlink;
37
38/// Pyth support.
39#[cfg(feature = "pyth")]
40pub mod pyth;
41
42/// Switchboard support.
43#[cfg(feature = "switchboard")]
44pub mod switchboard;
45
46/// Squads operations.
47#[cfg(feature = "squads")]
48pub mod squads;
49
50/// Chaos Labs Risk Oracle support.
51#[cfg(feature = "chaoslabs-risk-oracle")]
52pub mod risk_oracle;
53
54use std::{
55    collections::BTreeMap,
56    ops::Deref,
57    sync::{Arc, OnceLock},
58};
59
60use accounts::{
61    account_with_context, accounts_lazy_with_context, get_account_with_context,
62    ProgramAccountsConfig,
63};
64use gmsol_model::{price::Prices, PnlFactorKind};
65use gmsol_programs::{
66    anchor_lang::{AccountDeserialize, AnchorSerialize, Discriminator},
67    bytemuck,
68    gmsol_store::{
69        accounts as store_accounts,
70        types::{self as store_types, MarketStatus},
71    },
72};
73use gmsol_solana_utils::{
74    bundle_builder::{BundleBuilder, BundleOptions, CreateBundleOptions},
75    cluster::Cluster,
76    program::Program,
77    transaction_builder::{default_before_sign, Config, TransactionBuilder},
78    utils::WithSlot,
79};
80use gmsol_utils::oracle::PriceProviderKind;
81use instruction_buffer::InstructionBuffer;
82use ops::market::MarketOps;
83use pubsub::{PubsubClient, SubscriptionConfig};
84use solana_client::{
85    nonblocking::rpc_client::RpcClient,
86    rpc_config::RpcAccountInfoConfig,
87    rpc_filter::{Memcmp, RpcFilterType},
88};
89use solana_sdk::{
90    account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Signer,
91};
92use token_map::TokenMap;
93use tokio::sync::OnceCell;
94use typed_builder::TypedBuilder;
95
96use crate::{
97    builders::{
98        callback::{Callback, CallbackParams},
99        StoreProgram,
100    },
101    pda::{NonceBytes, ReferralCodeBytes},
102    utils::{
103        optional::optional_address,
104        zero_copy::{SharedZeroCopy, ZeroCopy},
105    },
106};
107
108#[cfg(liquidity_provider)]
109use gmsol_solana_utils::Program as ProgramTrait;
110
111#[cfg(liquidity_provider)]
112use crate::builders::liquidity_provider::LiquidityProviderProgram;
113
114#[cfg(feature = "decode")]
115use gmsol_decode::{gmsol::programs::GMSOLCPIEvent, Decode};
116
117#[cfg(feature = "decode")]
118use gmsol_programs::gmsol_store::events as store_events;
119
120/// Discriminator offset.
121pub const DISC_OFFSET: usize = 8;
122
123/// Options for [`Client`].
124#[derive(Debug, Clone, TypedBuilder)]
125pub struct ClientOptions {
126    #[builder(default)]
127    store_program_id: Option<Pubkey>,
128    #[builder(default)]
129    treasury_program_id: Option<Pubkey>,
130    #[cfg(liquidity_provider)]
131    #[builder(default)]
132    liquidity_provider_program_id: Option<Pubkey>,
133    #[builder(default)]
134    timelock_program_id: Option<Pubkey>,
135    #[builder(default)]
136    commitment: CommitmentConfig,
137    #[builder(default)]
138    subscription: SubscriptionConfig,
139}
140
141impl Default for ClientOptions {
142    fn default() -> Self {
143        Self::builder().build()
144    }
145}
146
147/// Client for interacting with the GMX-Solana protocol.
148pub struct Client<C> {
149    cfg: Config<C>,
150    store_program: Program<C>,
151    treasury_program: Program<C>,
152    #[cfg(liquidity_provider)]
153    liquidity_provider_program: LiquidityProviderProgram,
154    timelock_program: Program<C>,
155    rpc: OnceLock<RpcClient>,
156    pub_sub: OnceCell<PubsubClient>,
157    subscription_config: SubscriptionConfig,
158}
159
160impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
161    /// Create a new [`Client`] with the given options.
162    pub fn new_with_options(
163        cluster: Cluster,
164        payer: C,
165        options: ClientOptions,
166    ) -> crate::Result<Self> {
167        let ClientOptions {
168            store_program_id,
169            treasury_program_id,
170            #[cfg(liquidity_provider)]
171            liquidity_provider_program_id,
172            timelock_program_id,
173            commitment,
174            subscription,
175        } = options;
176        let cfg = Config::new(cluster, payer, commitment);
177        Ok(Self {
178            store_program: Program::new(
179                store_program_id.unwrap_or(gmsol_programs::gmsol_store::ID),
180                cfg.clone(),
181            ),
182            treasury_program: Program::new(
183                treasury_program_id.unwrap_or(gmsol_programs::gmsol_treasury::ID),
184                cfg.clone(),
185            ),
186            #[cfg(liquidity_provider)]
187            liquidity_provider_program: LiquidityProviderProgram::builder()
188                .id(liquidity_provider_program_id
189                    .unwrap_or(gmsol_programs::gmsol_liquidity_provider::ID))
190                .build(),
191            timelock_program: Program::new(
192                timelock_program_id.unwrap_or(gmsol_programs::gmsol_timelock::ID),
193                cfg.clone(),
194            ),
195            cfg,
196            pub_sub: OnceCell::default(),
197            rpc: Default::default(),
198            subscription_config: subscription,
199        })
200    }
201
202    /// Create a new [`Client`] with default options.
203    pub fn new(cluster: Cluster, payer: C) -> crate::Result<Self> {
204        Self::new_with_options(cluster, payer, ClientOptions::default())
205    }
206
207    /// Create a clone of this client with a new payer.
208    pub fn try_clone_with_payer<C2: Clone + Deref<Target = impl Signer>>(
209        &self,
210        payer: C2,
211    ) -> crate::Result<Client<C2>> {
212        Client::new_with_options(
213            self.cluster().clone(),
214            payer,
215            ClientOptions {
216                store_program_id: Some(*self.store_program_id()),
217                treasury_program_id: Some(*self.treasury_program_id()),
218                #[cfg(liquidity_provider)]
219                liquidity_provider_program_id: Some(*self.liquidity_provider_program.id()),
220                timelock_program_id: Some(*self.timelock_program_id()),
221                commitment: self.commitment(),
222                subscription: self.subscription_config.clone(),
223            },
224        )
225    }
226
227    /// Create a clone of this client.
228    pub fn try_clone(&self) -> crate::Result<Self> {
229        Ok(Self {
230            cfg: self.cfg.clone(),
231            store_program: self.program(*self.store_program_id()),
232            treasury_program: self.program(*self.treasury_program_id()),
233            #[cfg(liquidity_provider)]
234            liquidity_provider_program: self.liquidity_provider_program.clone(),
235            timelock_program: self.program(*self.timelock_program_id()),
236            pub_sub: OnceCell::default(),
237            rpc: Default::default(),
238            subscription_config: self.subscription_config.clone(),
239        })
240    }
241
242    /// Replace subscription config with the given.
243    pub fn set_subscription_config(&mut self, config: SubscriptionConfig) -> &mut Self {
244        self.subscription_config = config;
245        self
246    }
247
248    /// Create a new [`Program`] with the given program id.
249    pub fn program(&self, program_id: Pubkey) -> Program<C> {
250        Program::new(program_id, self.cfg.clone())
251    }
252
253    /// Get current cluster.
254    pub fn cluster(&self) -> &Cluster {
255        self.cfg.cluster()
256    }
257
258    /// Get current commitment config.
259    pub fn commitment(&self) -> CommitmentConfig {
260        *self.cfg.commitment()
261    }
262
263    /// Get current payer.
264    pub fn payer(&self) -> Pubkey {
265        self.cfg.payer()
266    }
267
268    /// Get [`RpcClient`].
269    pub fn rpc(&self) -> &RpcClient {
270        self.rpc.get_or_init(|| self.cfg.rpc())
271    }
272
273    /// Get store program.
274    pub fn store_program(&self) -> &Program<C> {
275        &self.store_program
276    }
277
278    /// Creates a [`StoreProgram`].
279    pub fn store_program_for_builders(&self, store: &Pubkey) -> StoreProgram {
280        StoreProgram::builder()
281            .id(*self.store_program_id())
282            .store(*store)
283            .build()
284    }
285
286    /// Get [`LiquidityProviderProgram`] for builders.
287    #[cfg(liquidity_provider)]
288    pub fn lp_program_for_builders(&self) -> &LiquidityProviderProgram {
289        &self.liquidity_provider_program
290    }
291
292    /// Get treasury program.
293    pub fn treasury_program(&self) -> &Program<C> {
294        &self.treasury_program
295    }
296
297    /// Get timelock program.
298    pub fn timelock_program(&self) -> &Program<C> {
299        &self.timelock_program
300    }
301
302    /// Create a new store program.
303    pub fn new_store_program(&self) -> crate::Result<Program<C>> {
304        Ok(self.program(*self.store_program_id()))
305    }
306
307    /// Create a new treasury program.
308    pub fn new_treasury_program(&self) -> crate::Result<Program<C>> {
309        Ok(self.program(*self.treasury_program_id()))
310    }
311
312    /// Get the program id of the store program.
313    pub fn store_program_id(&self) -> &Pubkey {
314        self.store_program().id()
315    }
316
317    /// Get the program id of the treasury program.
318    pub fn treasury_program_id(&self) -> &Pubkey {
319        self.treasury_program().id()
320    }
321
322    /// Get the program id of the timelock program.
323    pub fn timelock_program_id(&self) -> &Pubkey {
324        self.timelock_program().id()
325    }
326
327    /// Get the program id of the liquidity provider program.
328    #[cfg(liquidity_provider)]
329    pub fn liquidity_provider_program_id(&self) -> &Pubkey {
330        self.liquidity_provider_program.id()
331    }
332
333    /// Create a [`TransactionBuilder`] for the store program.
334    pub fn store_transaction(&self) -> TransactionBuilder<'_, C> {
335        self.store_program().transaction()
336    }
337
338    /// Create a [`TransactionBuilder`] for the treasury program.
339    pub fn treasury_transaction(&self) -> TransactionBuilder<'_, C> {
340        self.treasury_program().transaction()
341    }
342
343    /// Create a [`TransactionBuilder`] for the timelock program.
344    pub fn timelock_transaction(&self) -> TransactionBuilder<'_, C> {
345        self.timelock_program().transaction()
346    }
347
348    /// Create a [`BundleBuilder`] with the given options.
349    pub fn bundle_with_options(&self, options: BundleOptions) -> BundleBuilder<'_, C> {
350        BundleBuilder::new_with_options(CreateBundleOptions {
351            cluster: self.cluster().clone(),
352            commitment: self.commitment(),
353            options,
354        })
355    }
356
357    /// Create a [`BundleBuilder`] with default options.
358    pub fn bundle(&self) -> BundleBuilder<C> {
359        self.bundle_with_options(Default::default())
360    }
361
362    /// Find PDA for [`Store`](gmsol_programs::gmsol_store::accounts::Store) account.
363    pub fn find_store_address(&self, key: &str) -> Pubkey {
364        crate::pda::find_store_address(key, self.store_program_id()).0
365    }
366
367    /// Find PDA for store wallet account.
368    pub fn find_store_wallet_address(&self, store: &Pubkey) -> Pubkey {
369        crate::pda::find_store_wallet_address(store, self.store_program_id()).0
370    }
371
372    /// Get the event authority PDA for the `Store` program.
373    pub fn store_event_authority(&self) -> Pubkey {
374        crate::pda::find_event_authority_address(self.store_program_id()).0
375    }
376
377    /// Find PDA for market vault account.
378    pub fn find_market_vault_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
379        crate::pda::find_market_vault_address(store, token, self.store_program_id()).0
380    }
381
382    /// Find PDA for market token mint account.
383    pub fn find_market_token_address(
384        &self,
385        store: &Pubkey,
386        index_token: &Pubkey,
387        long_token: &Pubkey,
388        short_token: &Pubkey,
389    ) -> Pubkey {
390        crate::pda::find_market_token_address(
391            store,
392            index_token,
393            long_token,
394            short_token,
395            self.store_program_id(),
396        )
397        .0
398    }
399
400    /// Find PDA for market account.
401    pub fn find_market_address(&self, store: &Pubkey, token: &Pubkey) -> Pubkey {
402        crate::pda::find_market_address(store, token, self.store_program_id()).0
403    }
404
405    /// Find PDA for deposit account.
406    pub fn find_deposit_address(
407        &self,
408        store: &Pubkey,
409        user: &Pubkey,
410        nonce: &NonceBytes,
411    ) -> Pubkey {
412        crate::pda::find_deposit_address(store, user, nonce, self.store_program_id()).0
413    }
414
415    /// Find PDA for first deposit owner.
416    pub fn find_first_deposit_owner_address(&self) -> Pubkey {
417        crate::pda::find_first_deposit_receiver_address(self.store_program_id()).0
418    }
419
420    /// Find PDA for withdrawal account.
421    pub fn find_withdrawal_address(
422        &self,
423        store: &Pubkey,
424        user: &Pubkey,
425        nonce: &NonceBytes,
426    ) -> Pubkey {
427        crate::pda::find_withdrawal_address(store, user, nonce, self.store_program_id()).0
428    }
429
430    /// Find PDA for order.
431    pub fn find_order_address(&self, store: &Pubkey, user: &Pubkey, nonce: &NonceBytes) -> Pubkey {
432        crate::pda::find_order_address(store, user, nonce, self.store_program_id()).0
433    }
434
435    /// Find PDA for shift.
436    pub fn find_shift_address(&self, store: &Pubkey, owner: &Pubkey, nonce: &NonceBytes) -> Pubkey {
437        crate::pda::find_shift_address(store, owner, nonce, self.store_program_id()).0
438    }
439
440    /// Find PDA for position.
441    pub fn find_position_address(
442        &self,
443        store: &Pubkey,
444        user: &Pubkey,
445        market_token: &Pubkey,
446        collateral_token: &Pubkey,
447        is_long: bool,
448    ) -> crate::Result<Pubkey> {
449        Ok(crate::pda::find_position_address(
450            store,
451            user,
452            market_token,
453            collateral_token,
454            is_long,
455            self.store_program_id(),
456        )
457        .0)
458    }
459
460    /// Find PDA for claimable account.
461    pub fn find_claimable_account_address(
462        &self,
463        store: &Pubkey,
464        mint: &Pubkey,
465        user: &Pubkey,
466        time_key: &[u8],
467    ) -> Pubkey {
468        crate::pda::find_claimable_account_address(
469            store,
470            mint,
471            user,
472            time_key,
473            self.store_program_id(),
474        )
475        .0
476    }
477
478    /// Find PDA for trade event buffer account.
479    pub fn find_trade_event_buffer_address(
480        &self,
481        store: &Pubkey,
482        authority: &Pubkey,
483        index: u16,
484    ) -> Pubkey {
485        crate::pda::find_trade_event_buffer_address(
486            store,
487            authority,
488            index,
489            self.store_program_id(),
490        )
491        .0
492    }
493
494    /// Find PDA for user account.
495    pub fn find_user_address(&self, store: &Pubkey, owner: &Pubkey) -> Pubkey {
496        crate::pda::find_user_address(store, owner, self.store_program_id()).0
497    }
498
499    /// Find PDA for referral code.
500    pub fn find_referral_code_address(&self, store: &Pubkey, code: ReferralCodeBytes) -> Pubkey {
501        crate::pda::find_referral_code_address(store, code, self.store_program_id()).0
502    }
503
504    /// Find PDA for GLV token mint.
505    pub fn find_glv_token_address(&self, store: &Pubkey, index: u16) -> Pubkey {
506        crate::pda::find_glv_token_address(store, index, self.store_program_id()).0
507    }
508
509    /// Find PDA for GLV.
510    pub fn find_glv_address(&self, glv_token: &Pubkey) -> Pubkey {
511        crate::pda::find_glv_address(glv_token, self.store_program_id()).0
512    }
513
514    /// Find PDA for GLV deposit.
515    pub fn find_glv_deposit_address(
516        &self,
517        store: &Pubkey,
518        owner: &Pubkey,
519        nonce: &NonceBytes,
520    ) -> Pubkey {
521        crate::pda::find_glv_deposit_address(store, owner, nonce, self.store_program_id()).0
522    }
523
524    /// Find PDA for GLV withdrawal.
525    pub fn find_glv_withdrawal_address(
526        &self,
527        store: &Pubkey,
528        owner: &Pubkey,
529        nonce: &NonceBytes,
530    ) -> Pubkey {
531        crate::pda::find_glv_withdrawal_address(store, owner, nonce, self.store_program_id()).0
532    }
533
534    /// Find PDA for GT exchange vault.
535    pub fn find_gt_exchange_vault_address(
536        &self,
537        store: &Pubkey,
538        time_window_index: i64,
539        time_window: u32,
540    ) -> Pubkey {
541        crate::pda::find_gt_exchange_vault_address(
542            store,
543            time_window_index,
544            time_window,
545            self.store_program_id(),
546        )
547        .0
548    }
549
550    /// Find PDA for GT exchange.
551    pub fn find_gt_exchange_address(&self, vault: &Pubkey, owner: &Pubkey) -> Pubkey {
552        crate::pda::find_gt_exchange_address(vault, owner, self.store_program_id()).0
553    }
554
555    /// Find PDA for custom price feed.
556    pub fn find_price_feed_address(
557        &self,
558        store: &Pubkey,
559        authority: &Pubkey,
560        index: u16,
561        provider: PriceProviderKind,
562        token: &Pubkey,
563    ) -> Pubkey {
564        crate::pda::find_price_feed_address(
565            store,
566            authority,
567            index,
568            provider,
569            token,
570            self.store_program_id(),
571        )
572        .0
573    }
574
575    /// Find PDA for treasury global config.
576    pub fn find_treasury_config_address(&self, store: &Pubkey) -> Pubkey {
577        crate::pda::find_treasury_config_address(store, self.treasury_program_id()).0
578    }
579
580    /// Find PDA for treasury vault config.
581    pub fn find_treasury_vault_config_address(&self, config: &Pubkey, index: u16) -> Pubkey {
582        crate::pda::find_treasury_vault_config_address(config, index, self.treasury_program_id()).0
583    }
584
585    /// Find PDA for GT bank.
586    pub fn find_gt_bank_address(
587        &self,
588        treasury_vault_config: &Pubkey,
589        gt_exchange_vault: &Pubkey,
590    ) -> Pubkey {
591        crate::pda::find_gt_bank_address(
592            treasury_vault_config,
593            gt_exchange_vault,
594            self.treasury_program_id(),
595        )
596        .0
597    }
598
599    /// Find PDA for treasury receiver.
600    pub fn find_treasury_receiver_address(&self, config: &Pubkey) -> Pubkey {
601        crate::pda::find_treasury_receiver_address(config, self.treasury_program_id()).0
602    }
603
604    /// Find PDA for timelock config.
605    pub fn find_timelock_config_address(&self, store: &Pubkey) -> Pubkey {
606        crate::pda::find_timelock_config_address(store, self.timelock_program_id()).0
607    }
608
609    /// Find PDA for timelock executor.
610    pub fn find_executor_address(&self, store: &Pubkey, role: &str) -> crate::Result<Pubkey> {
611        Ok(crate::pda::find_executor_address(store, role, self.timelock_program_id())?.0)
612    }
613
614    /// Find the wallet PDA for the given timelock executor.
615    pub fn find_executor_wallet_address(&self, executor: &Pubkey) -> Pubkey {
616        crate::pda::find_executor_wallet_address(executor, self.timelock_program_id()).0
617    }
618
619    /// Find the PDA for callback authority.
620    pub fn find_callback_authority_address(&self) -> Pubkey {
621        crate::pda::find_callback_authority(self.store_program_id()).0
622    }
623
624    /// Find the PDA for virtual inventory for swaps.
625    pub fn find_virtual_inventory_for_swaps_address(&self, store: &Pubkey, index: u32) -> Pubkey {
626        crate::pda::find_virtual_inventory_for_swaps_address(store, index, self.store_program_id())
627            .0
628    }
629
630    /// Find the PDA for virtual inventory for positions.
631    pub fn find_virtual_inventory_for_positions_address(
632        &self,
633        store: &Pubkey,
634        index_token: &Pubkey,
635    ) -> Pubkey {
636        crate::pda::find_virtual_inventory_for_positions_address(
637            store,
638            index_token,
639            self.store_program_id(),
640        )
641        .0
642    }
643
644    pub(crate) fn get_callback_params(&self, callback: Option<&Callback>) -> CallbackParams {
645        match callback {
646            Some(callback) => CallbackParams {
647                callback_version: Some(callback.version),
648                callback_authority: Some(self.find_callback_authority_address()),
649                callback_program: Some(callback.program.0),
650                callback_shared_data_account: Some(callback.shared_data.0),
651                callback_partitioned_data_account: Some(callback.partitioned_data.0),
652            },
653            None => CallbackParams::default(),
654        }
655    }
656
657    /// Get latest slot.
658    pub async fn get_slot(&self, commitment: Option<CommitmentConfig>) -> crate::Result<u64> {
659        let slot = self
660            .store_program()
661            .rpc()
662            .get_slot_with_commitment(commitment.unwrap_or(self.commitment()))
663            .await
664            .map_err(crate::Error::custom)?;
665        Ok(slot)
666    }
667
668    /// Fetch accounts owned by the store program.
669    pub async fn store_accounts_with_config<T>(
670        &self,
671        filter_by_store: Option<StoreFilter>,
672        other_filters: impl IntoIterator<Item = RpcFilterType>,
673        config: ProgramAccountsConfig,
674    ) -> crate::Result<WithSlot<Vec<(Pubkey, T)>>>
675    where
676        T: AccountDeserialize + Discriminator,
677    {
678        let filters = std::iter::empty()
679            .chain(
680                filter_by_store
681                    .inspect(|filter| {
682                        let store = &filter.store;
683                        tracing::debug!(%store, offset=%filter.store_offset(), "store bytes to filter: {}", hex::encode(store));
684                    })
685                    .map(RpcFilterType::from),
686            )
687            .chain(other_filters);
688        accounts_lazy_with_context(self.store_program(), filters, config)
689            .await?
690            .map(|iter| iter.collect())
691            .transpose()
692    }
693
694    /// Fetch account without deserialization.
695    pub async fn raw_account_with_config(
696        &self,
697        address: &Pubkey,
698        config: RpcAccountInfoConfig,
699    ) -> crate::Result<WithSlot<Option<Account>>> {
700        let client = self.store_program().rpc();
701        get_account_with_context(&client, address, config).await
702    }
703
704    /// Fetch account and decode.
705    #[cfg(feature = "decode")]
706    pub async fn decode_account_with_config(
707        &self,
708        address: &Pubkey,
709        config: RpcAccountInfoConfig,
710    ) -> crate::Result<WithSlot<Option<gmsol_decode::gmsol::programs::GMSOLAccountData>>> {
711        use crate::utils::decode::KeyedAccount;
712        use gmsol_decode::{decoder::AccountAccessDecoder, gmsol::programs::GMSOLAccountData};
713
714        let account = self.raw_account_with_config(address, config).await?;
715        let slot = account.slot();
716        match account.into_value() {
717            Some(account) => {
718                let account = KeyedAccount {
719                    pubkey: *address,
720                    account: WithSlot::new(slot, account),
721                };
722                let decoder = AccountAccessDecoder::new(account);
723                let decoded = GMSOLAccountData::decode(decoder)?;
724                Ok(WithSlot::new(slot, Some(decoded)))
725            }
726            None => Ok(WithSlot::new(slot, None)),
727        }
728    }
729
730    /// Fetch account with the given address with config.
731    ///
732    /// The value inside the returned context will be `None` if the account does not exist.
733    pub async fn account_with_config<T>(
734        &self,
735        address: &Pubkey,
736        config: RpcAccountInfoConfig,
737    ) -> crate::Result<WithSlot<Option<T>>>
738    where
739        T: AccountDeserialize,
740    {
741        let client = self.store_program().rpc();
742        account_with_context(&client, address, config).await
743    }
744
745    /// Fetch account with the given address.
746    pub async fn account<T: AccountDeserialize>(
747        &self,
748        address: &Pubkey,
749    ) -> crate::Result<Option<T>> {
750        Ok(self
751            .account_with_config(address, Default::default())
752            .await?
753            .into_value())
754    }
755
756    /// Fetch accounts owned by the store program.
757    pub async fn store_accounts<T>(
758        &self,
759        filter_by_store: Option<StoreFilter>,
760        other_filters: impl IntoIterator<Item = RpcFilterType>,
761    ) -> crate::Result<Vec<(Pubkey, T)>>
762    where
763        T: AccountDeserialize + Discriminator,
764    {
765        let res = self
766            .store_accounts_with_config(
767                filter_by_store,
768                other_filters,
769                ProgramAccountsConfig::default(),
770            )
771            .await?;
772        tracing::debug!(slot=%res.slot(), "accounts fetched");
773        Ok(res.into_value())
774    }
775
776    /// Fetch [`Store`](store_accounts::Store) account with its address.
777    pub async fn store(&self, address: &Pubkey) -> crate::Result<Arc<store_accounts::Store>> {
778        Ok(self
779            .account::<SharedZeroCopy<store_accounts::Store>>(address)
780            .await?
781            .ok_or(crate::Error::NotFound)?
782            .0)
783    }
784
785    /// Fetch user account with its address.
786    pub async fn user(&self, address: &Pubkey) -> crate::Result<store_accounts::UserHeader> {
787        Ok(self
788            .account::<ZeroCopy<store_accounts::UserHeader>>(address)
789            .await?
790            .ok_or(crate::Error::NotFound)?
791            .0)
792    }
793
794    /// Fetch the [`TokenMap`] address of the given store.
795    pub async fn authorized_token_map_address(
796        &self,
797        store: &Pubkey,
798    ) -> crate::Result<Option<Pubkey>> {
799        let store = self.store(store).await?;
800        let token_map = store.token_map;
801        Ok(optional_address(&token_map).copied())
802    }
803
804    /// Fetch [`TokenMap`] account with its address.
805    pub async fn token_map(&self, address: &Pubkey) -> crate::Result<TokenMap> {
806        self.account(address).await?.ok_or(crate::Error::NotFound)
807    }
808
809    /// Fetch the authorized token map of the given store.
810    pub async fn authorized_token_map(&self, store: &Pubkey) -> crate::Result<TokenMap> {
811        let address = self
812            .authorized_token_map_address(store)
813            .await?
814            .ok_or(crate::Error::custom("token map is not set"))?;
815        self.token_map(&address).await
816    }
817
818    /// Fetch all [`Market`](store_accounts::Market) accounts of the given store.
819    pub async fn markets_with_config(
820        &self,
821        store: &Pubkey,
822        config: ProgramAccountsConfig,
823    ) -> crate::Result<WithSlot<BTreeMap<Pubkey, Arc<store_accounts::Market>>>> {
824        let markets = self
825            .store_accounts_with_config::<SharedZeroCopy<store_accounts::Market>>(
826                Some(StoreFilter::new(
827                    store,
828                    bytemuck::offset_of!(store_accounts::Market, store),
829                )),
830                None,
831                config,
832            )
833            .await?
834            .map(|accounts| {
835                accounts
836                    .into_iter()
837                    .map(|(pubkey, m)| (pubkey, m.0))
838                    .collect::<BTreeMap<_, _>>()
839            });
840        Ok(markets)
841    }
842
843    /// Fetch all [`Market`](store_accounts::Market) accounts of the given store.
844    pub async fn markets(
845        &self,
846        store: &Pubkey,
847    ) -> crate::Result<BTreeMap<Pubkey, Arc<store_accounts::Market>>> {
848        let markets = self
849            .markets_with_config(store, ProgramAccountsConfig::default())
850            .await?
851            .into_value();
852        Ok(markets)
853    }
854
855    /// Fetch [`Market`](store_accounts::Market) at the given address with config.
856    ///
857    /// The value inside the returned context will be `None` if the account does not exist.
858    pub async fn market_with_config<T>(
859        &self,
860        address: &Pubkey,
861        config: RpcAccountInfoConfig,
862    ) -> crate::Result<WithSlot<Option<Arc<store_accounts::Market>>>> {
863        let market = self
864            .account_with_config::<SharedZeroCopy<store_accounts::Market>>(address, config)
865            .await?;
866        Ok(market.map(|m| m.map(|m| m.0)))
867    }
868
869    /// Fetch [`Market`](store_accounts::Market) account with its address.
870    pub async fn market(&self, address: &Pubkey) -> crate::Result<Arc<store_accounts::Market>> {
871        Ok(self
872            .account::<SharedZeroCopy<store_accounts::Market>>(address)
873            .await?
874            .ok_or(crate::Error::NotFound)?
875            .0)
876    }
877
878    /// Fetch [`Market`](store_accounts::Market) account with its token address.
879    pub async fn market_by_token(
880        &self,
881        store: &Pubkey,
882        market_token: &Pubkey,
883    ) -> crate::Result<Arc<store_accounts::Market>> {
884        let address = self.find_market_address(store, market_token);
885        self.market(&address).await
886    }
887
888    /// Fetch all [`Glv`](store_accounts::Glv) accounts of the given store.
889    pub async fn glvs_with_config(
890        &self,
891        store: &Pubkey,
892        config: ProgramAccountsConfig,
893    ) -> crate::Result<WithSlot<BTreeMap<Pubkey, store_accounts::Glv>>> {
894        let glvs = self
895            .store_accounts_with_config::<ZeroCopy<store_accounts::Glv>>(
896                Some(StoreFilter::new(
897                    store,
898                    bytemuck::offset_of!(store_accounts::Glv, store),
899                )),
900                None,
901                config,
902            )
903            .await?
904            .map(|accounts| {
905                accounts
906                    .into_iter()
907                    .map(|(pubkey, m)| (pubkey, m.0))
908                    .collect::<BTreeMap<_, _>>()
909            });
910        Ok(glvs)
911    }
912
913    /// Fetch all [`Glv`](store_accounts::Glv) accounts of the given store.
914    pub async fn glvs(
915        &self,
916        store: &Pubkey,
917    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Glv>> {
918        let glvs = self
919            .glvs_with_config(store, ProgramAccountsConfig::default())
920            .await?
921            .into_value();
922        Ok(glvs)
923    }
924
925    /// Fetch [`MarketStatus`] with market token address.
926    pub async fn market_status(
927        &self,
928        store: &Pubkey,
929        market_token: &Pubkey,
930        prices: Prices<u128>,
931        maximize_pnl: bool,
932        maximize_pool_value: bool,
933    ) -> crate::Result<MarketStatus> {
934        let req = self.get_market_status(
935            store,
936            market_token,
937            prices,
938            maximize_pnl,
939            maximize_pool_value,
940        );
941        let status = view::view::<MarketStatus>(
942            &self.store_program().rpc(),
943            &req.signed_transaction_with_options(true, None, None, default_before_sign)
944                .await?,
945        )
946        .await?;
947        Ok(status)
948    }
949
950    /// Fetch current market token price with market token address.
951    pub async fn market_token_price(
952        &self,
953        store: &Pubkey,
954        market_token: &Pubkey,
955        prices: Prices<u128>,
956        pnl_factor: PnlFactorKind,
957        maximize: bool,
958    ) -> crate::Result<u128> {
959        let req = self.get_market_token_price(store, market_token, prices, pnl_factor, maximize);
960        let price = view::view::<u128>(
961            &self.store_program().rpc(),
962            &req.signed_transaction_with_options(true, None, None, default_before_sign)
963                .await?,
964        )
965        .await?;
966        Ok(price)
967    }
968
969    /// Fetch [`Position`](store_accounts::Position) accounts.
970    pub async fn positions(
971        &self,
972        store: &Pubkey,
973        owner: Option<&Pubkey>,
974        market_token: Option<&Pubkey>,
975    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Position>> {
976        let filter = match owner {
977            Some(owner) => {
978                let mut bytes = owner.as_ref().to_owned();
979                if let Some(market_token) = market_token {
980                    bytes.extend_from_slice(market_token.as_ref());
981                }
982                let filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
983                    bytemuck::offset_of!(store_accounts::Position, owner) + DISC_OFFSET,
984                    &bytes,
985                ));
986                Some(filter)
987            }
988            None => market_token.and_then(|token| {
989                Some(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
990                    bytemuck::offset_of!(store_accounts::Position, market_token) + DISC_OFFSET,
991                    &token.try_to_vec().ok()?,
992                )))
993            }),
994        };
995
996        let store_filter =
997            StoreFilter::new(store, bytemuck::offset_of!(store_accounts::Position, store));
998
999        let positions = self
1000            .store_accounts::<ZeroCopy<store_accounts::Position>>(Some(store_filter), filter)
1001            .await?
1002            .into_iter()
1003            .map(|(pubkey, p)| (pubkey, p.0))
1004            .collect();
1005
1006        Ok(positions)
1007    }
1008
1009    /// Fetch [`Position`](store_accounts::Position) account with its address.
1010    pub async fn position(&self, address: &Pubkey) -> crate::Result<store_accounts::Position> {
1011        let position = self
1012            .account::<ZeroCopy<store_accounts::Position>>(address)
1013            .await?
1014            .ok_or(crate::Error::NotFound)?;
1015        Ok(position.0)
1016    }
1017
1018    /// Fetch [`Order`](store_accounts::Order) account with its address.
1019    pub async fn order(&self, address: &Pubkey) -> crate::Result<store_accounts::Order> {
1020        Ok(self
1021            .account::<ZeroCopy<store_accounts::Order>>(address)
1022            .await?
1023            .ok_or(crate::Error::NotFound)?
1024            .0)
1025    }
1026
1027    /// Fetch [`Order`](store_accounts::Order) account at the the given address with config.
1028    ///
1029    /// The value inside the returned context will be `None` if the account does not exist.
1030    pub async fn order_with_config(
1031        &self,
1032        address: &Pubkey,
1033        config: RpcAccountInfoConfig,
1034    ) -> crate::Result<WithSlot<Option<store_accounts::Order>>> {
1035        Ok(self
1036            .account_with_config::<ZeroCopy<store_accounts::Order>>(address, config)
1037            .await?
1038            .map(|a| a.map(|a| a.0)))
1039    }
1040
1041    fn create_action_filters(
1042        &self,
1043        store: &Pubkey,
1044        owner: Option<&Pubkey>,
1045        market_token: Option<&Pubkey>,
1046    ) -> (StoreFilter, Vec<RpcFilterType>) {
1047        let mut filters = Vec::default();
1048        if let Some(owner) = owner {
1049            filters.push(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1050                bytemuck::offset_of!(store_types::ActionHeader, owner) + DISC_OFFSET,
1051                owner.as_ref(),
1052            )));
1053        }
1054        if let Some(market_token) = market_token {
1055            let market = self.find_market_address(store, market_token);
1056            filters.push(RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1057                bytemuck::offset_of!(store_types::ActionHeader, market) + DISC_OFFSET,
1058                market.as_ref(),
1059            )));
1060        }
1061        let store_filter = StoreFilter::new(
1062            store,
1063            bytemuck::offset_of!(store_types::ActionHeader, store),
1064        );
1065
1066        (store_filter, filters)
1067    }
1068
1069    /// Fetch [`Order`](store_accounts::Order) accounts.
1070    pub async fn orders(
1071        &self,
1072        store: &Pubkey,
1073        owner: Option<&Pubkey>,
1074        market_token: Option<&Pubkey>,
1075    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Order>> {
1076        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1077
1078        let orders = self
1079            .store_accounts::<ZeroCopy<store_accounts::Order>>(Some(store_filter), filters)
1080            .await?
1081            .into_iter()
1082            .map(|(addr, order)| (addr, order.0))
1083            .collect();
1084
1085        Ok(orders)
1086    }
1087
1088    /// Fetch [`Depsoit`](store_accounts::Deposit) account with its address.
1089    pub async fn deposit(&self, address: &Pubkey) -> crate::Result<store_accounts::Deposit> {
1090        Ok(self
1091            .account::<ZeroCopy<_>>(address)
1092            .await?
1093            .ok_or(crate::Error::NotFound)?
1094            .0)
1095    }
1096
1097    /// Fetch [`Deposit`](store_accounts::Deposit) accounts.
1098    pub async fn deposits(
1099        &self,
1100        store: &Pubkey,
1101        owner: Option<&Pubkey>,
1102        market_token: Option<&Pubkey>,
1103    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Deposit>> {
1104        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1105
1106        let orders = self
1107            .store_accounts::<ZeroCopy<store_accounts::Deposit>>(Some(store_filter), filters)
1108            .await?
1109            .into_iter()
1110            .map(|(addr, action)| (addr, action.0))
1111            .collect();
1112
1113        Ok(orders)
1114    }
1115
1116    /// Fetch [`Withdrawal`](store_accounts::Withdrawal) account with its address.
1117    pub async fn withdrawal(&self, address: &Pubkey) -> crate::Result<store_accounts::Withdrawal> {
1118        Ok(self
1119            .account::<ZeroCopy<store_accounts::Withdrawal>>(address)
1120            .await?
1121            .ok_or(crate::Error::NotFound)?
1122            .0)
1123    }
1124
1125    /// Fetch [`Withdrawal`](store_accounts::Withdrawal) accounts.
1126    pub async fn withdrawals(
1127        &self,
1128        store: &Pubkey,
1129        owner: Option<&Pubkey>,
1130        market_token: Option<&Pubkey>,
1131    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Withdrawal>> {
1132        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1133
1134        let orders = self
1135            .store_accounts::<ZeroCopy<store_accounts::Withdrawal>>(Some(store_filter), filters)
1136            .await?
1137            .into_iter()
1138            .map(|(addr, action)| (addr, action.0))
1139            .collect();
1140
1141        Ok(orders)
1142    }
1143
1144    /// Fetch [`Shift`](store_accounts::Shift) accounts.
1145    pub async fn shifts(
1146        &self,
1147        store: &Pubkey,
1148        owner: Option<&Pubkey>,
1149        market_token: Option<&Pubkey>,
1150    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::Shift>> {
1151        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1152
1153        let orders = self
1154            .store_accounts::<ZeroCopy<store_accounts::Shift>>(Some(store_filter), filters)
1155            .await?
1156            .into_iter()
1157            .map(|(addr, action)| (addr, action.0))
1158            .collect();
1159
1160        Ok(orders)
1161    }
1162
1163    /// Fetch [`GlvDeposit`](store_accounts::GlvDeposit) accounts.
1164    pub async fn glv_deposits(
1165        &self,
1166        store: &Pubkey,
1167        owner: Option<&Pubkey>,
1168        market_token: Option<&Pubkey>,
1169    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvDeposit>> {
1170        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1171
1172        let orders = self
1173            .store_accounts::<ZeroCopy<store_accounts::GlvDeposit>>(Some(store_filter), filters)
1174            .await?
1175            .into_iter()
1176            .map(|(addr, action)| (addr, action.0))
1177            .collect();
1178
1179        Ok(orders)
1180    }
1181
1182    /// Fetch [`GlvWithdrawal`](store_accounts::GlvWithdrawal) accounts.
1183    pub async fn glv_withdrawals(
1184        &self,
1185        store: &Pubkey,
1186        owner: Option<&Pubkey>,
1187        market_token: Option<&Pubkey>,
1188    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvWithdrawal>> {
1189        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1190
1191        let orders = self
1192            .store_accounts::<ZeroCopy<store_accounts::GlvWithdrawal>>(Some(store_filter), filters)
1193            .await?
1194            .into_iter()
1195            .map(|(addr, action)| (addr, action.0))
1196            .collect();
1197
1198        Ok(orders)
1199    }
1200
1201    /// Fetch [`GlvShift`](store_accounts::GlvShift) accounts.
1202    pub async fn glv_shifts(
1203        &self,
1204        store: &Pubkey,
1205        owner: Option<&Pubkey>,
1206        market_token: Option<&Pubkey>,
1207    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GlvShift>> {
1208        let (store_filter, filters) = self.create_action_filters(store, owner, market_token);
1209
1210        let orders = self
1211            .store_accounts::<ZeroCopy<store_accounts::GlvShift>>(Some(store_filter), filters)
1212            .await?
1213            .into_iter()
1214            .map(|(addr, action)| (addr, action.0))
1215            .collect();
1216
1217        Ok(orders)
1218    }
1219
1220    /// Fetch [`PriceFeed`](store_accounts::PriceFeed) account with its address.
1221    pub async fn price_feed(
1222        &self,
1223        address: &Pubkey,
1224    ) -> crate::Result<Option<store_accounts::PriceFeed>> {
1225        Ok(self
1226            .account::<ZeroCopy<store_accounts::PriceFeed>>(address)
1227            .await?
1228            .map(|a| a.0))
1229    }
1230
1231    /// Get the [`PubsubClient`].
1232    pub async fn pub_sub(&self) -> crate::Result<&PubsubClient> {
1233        let client = self
1234            .pub_sub
1235            .get_or_try_init(|| {
1236                PubsubClient::new(self.cluster().clone(), self.subscription_config.clone())
1237            })
1238            .await?;
1239        Ok(client)
1240    }
1241
1242    /// Subscribe to [`GMSOLCPIEvent`]s from the store program.
1243    #[cfg(feature = "decode")]
1244    pub async fn subscribe_store_cpi_events(
1245        &self,
1246        commitment: Option<CommitmentConfig>,
1247    ) -> crate::Result<impl futures_util::Stream<Item = crate::Result<WithSlot<Vec<GMSOLCPIEvent>>>>>
1248    {
1249        use futures_util::TryStreamExt;
1250        use transaction_history::extract_cpi_events;
1251
1252        let program_id = self.store_program_id();
1253        let event_authority = self.store_event_authority();
1254        let query = Arc::new(self.store_program().rpc());
1255        let commitment = commitment.unwrap_or(self.subscription_config.commitment);
1256        let signatures = self
1257            .pub_sub()
1258            .await?
1259            .logs_subscribe(&event_authority, Some(commitment))
1260            .await?
1261            .and_then(|txn| {
1262                let signature = txn
1263                    .map(|txn| txn.signature.parse().map_err(crate::Error::custom))
1264                    .transpose();
1265                async move { signature }
1266            });
1267        let events = extract_cpi_events(
1268            signatures,
1269            query,
1270            program_id,
1271            &event_authority,
1272            commitment,
1273            Some(0),
1274        )
1275        .try_filter_map(|event| {
1276            let decoded = event
1277                .map(|event| {
1278                    event
1279                        .events
1280                        .iter()
1281                        .map(|event| GMSOLCPIEvent::decode(event).map_err(crate::Error::from))
1282                        .collect::<crate::Result<Vec<_>>>()
1283                })
1284                .transpose()
1285                .inspect_err(|err| tracing::error!(%err, "decode error"))
1286                .ok();
1287            async move { Ok(decoded) }
1288        });
1289        Ok(events)
1290    }
1291
1292    /// Fetch historical [`GMSOLCPIEvent`]s for the given account.
1293    #[cfg(feature = "decode")]
1294    pub async fn historical_store_cpi_events(
1295        &self,
1296        address: &Pubkey,
1297        commitment: Option<CommitmentConfig>,
1298    ) -> crate::Result<impl futures_util::Stream<Item = crate::Result<WithSlot<Vec<GMSOLCPIEvent>>>>>
1299    {
1300        use futures_util::TryStreamExt;
1301        use transaction_history::{extract_cpi_events, fetch_transaction_history_with_config};
1302
1303        let commitment = commitment.unwrap_or(self.commitment());
1304        let client = Arc::new(self.store_program().rpc());
1305        let signatures = fetch_transaction_history_with_config(
1306            client.clone(),
1307            address,
1308            commitment,
1309            None,
1310            None,
1311            None,
1312        )
1313        .await?;
1314        let events = extract_cpi_events(
1315            signatures,
1316            client,
1317            self.store_program_id(),
1318            &self.store_event_authority(),
1319            commitment,
1320            Some(0),
1321        )
1322        .try_filter(|events| std::future::ready(!events.value().events.is_empty()))
1323        .and_then(|encoded| {
1324            let decoded = encoded
1325                .map(|event| {
1326                    event
1327                        .events
1328                        .iter()
1329                        .map(|event| GMSOLCPIEvent::decode(event).map_err(crate::Error::from))
1330                        .collect::<crate::Result<Vec<_>>>()
1331                })
1332                .transpose();
1333            async move { decoded }
1334        });
1335        Ok(events)
1336    }
1337
1338    /// Wait for an order to be completed using current slot as min context slot.
1339    #[cfg(feature = "decode")]
1340    pub async fn complete_order(
1341        &self,
1342        address: &Pubkey,
1343        commitment: Option<CommitmentConfig>,
1344    ) -> crate::Result<Option<store_events::TradeEvent>> {
1345        let slot = self.get_slot(None).await?;
1346        self.complete_order_with_config(
1347            address,
1348            slot,
1349            std::time::Duration::from_secs(5),
1350            commitment,
1351        )
1352        .await
1353    }
1354
1355    /// Get last order events.
1356    #[cfg(feature = "decode")]
1357    pub async fn last_order_events(
1358        &self,
1359        order: &Pubkey,
1360        before_slot: u64,
1361        commitment: CommitmentConfig,
1362    ) -> crate::Result<Vec<GMSOLCPIEvent>> {
1363        use futures_util::{StreamExt, TryStreamExt};
1364
1365        let events = self
1366            .historical_store_cpi_events(order, Some(commitment))
1367            .await?
1368            .try_filter(|events| {
1369                let pass = events.slot() <= before_slot;
1370                async move { pass }
1371            })
1372            .take(1);
1373        futures_util::pin_mut!(events);
1374        match events.next().await.transpose()? {
1375            Some(events) => Ok(events.into_value()),
1376            None => Err(crate::Error::custom(format!(
1377                "events not found, slot={before_slot}"
1378            ))),
1379        }
1380    }
1381
1382    /// Wait for an order to be completed with the given config.
1383    #[cfg(feature = "decode")]
1384    pub async fn complete_order_with_config(
1385        &self,
1386        address: &Pubkey,
1387        mut slot: u64,
1388        polling: std::time::Duration,
1389        commitment: Option<CommitmentConfig>,
1390    ) -> crate::Result<Option<store_events::TradeEvent>> {
1391        use futures_util::{StreamExt, TryStreamExt};
1392        use solana_account_decoder::UiAccountEncoding;
1393
1394        let mut trade = None;
1395        let commitment = commitment.unwrap_or(self.subscription_config.commitment);
1396
1397        let events = self.subscribe_store_cpi_events(Some(commitment)).await?;
1398
1399        let config = RpcAccountInfoConfig {
1400            encoding: Some(UiAccountEncoding::Base64),
1401            commitment: Some(commitment),
1402            min_context_slot: Some(slot),
1403            ..Default::default()
1404        };
1405        let mut slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1406        if slot_reached {
1407            let order = self.order_with_config(address, config.clone()).await?;
1408            slot = order.slot();
1409            let order = order.into_value();
1410            if order.is_none() {
1411                let events = self.last_order_events(address, slot, commitment).await?;
1412                return Ok(events
1413                    .into_iter()
1414                    .filter_map(|event| {
1415                        if let GMSOLCPIEvent::TradeEvent(event) = event {
1416                            Some(event)
1417                        } else {
1418                            None
1419                        }
1420                    })
1421                    .next());
1422            }
1423        }
1424        let address = *address;
1425        let stream = events
1426            .try_filter_map(|events| async {
1427                if events.slot() < slot {
1428                    return Ok(None);
1429                }
1430                let events = events
1431                    .into_value()
1432                    .into_iter()
1433                    .filter(|event| {
1434                        matches!(
1435                            event,
1436                            GMSOLCPIEvent::TradeEvent(_) | GMSOLCPIEvent::OrderRemoved(_)
1437                        )
1438                    })
1439                    .map(Ok);
1440                Ok(Some(futures_util::stream::iter(events)))
1441            })
1442            .try_flatten();
1443        let stream =
1444            tokio_stream::StreamExt::timeout_repeating(stream, tokio::time::interval(polling));
1445        futures_util::pin_mut!(stream);
1446        while let Some(res) = stream.next().await {
1447            match res {
1448                Ok(Ok(event)) => match event {
1449                    GMSOLCPIEvent::TradeEvent(event) => {
1450                        trade = Some(event);
1451                    }
1452                    GMSOLCPIEvent::OrderRemoved(_remove) => {
1453                        return Ok(trade);
1454                    }
1455                    _ => unreachable!(),
1456                },
1457                Ok(Err(err)) => {
1458                    return Err(err);
1459                }
1460                Err(_elapsed) => {
1461                    if slot_reached {
1462                        let res = self.order_with_config(&address, config.clone()).await?;
1463                        if res.value().is_none() {
1464                            let events = self
1465                                .last_order_events(&address, res.slot(), commitment)
1466                                .await?;
1467                            return Ok(events
1468                                .into_iter()
1469                                .filter_map(|event| {
1470                                    if let GMSOLCPIEvent::TradeEvent(event) = event {
1471                                        Some(event)
1472                                    } else {
1473                                        None
1474                                    }
1475                                })
1476                                .next());
1477                        }
1478                    } else {
1479                        slot_reached = self.get_slot(Some(commitment)).await? >= slot;
1480                    }
1481                }
1482            }
1483        }
1484        Err(crate::Error::custom("the watch stream end"))
1485    }
1486
1487    /// Shutdown the client gracefully.
1488    pub async fn shutdown(&self) -> crate::Result<()> {
1489        self.pub_sub().await?.shutdown().await
1490    }
1491
1492    /// Get GT exchanges.
1493    pub async fn gt_exchanges(
1494        &self,
1495        store: &Pubkey,
1496        owner: &Pubkey,
1497    ) -> crate::Result<BTreeMap<Pubkey, store_accounts::GtExchange>> {
1498        use store_accounts::GtExchange;
1499
1500        let store_filter = StoreFilter::new(store, bytemuck::offset_of!(GtExchange, store));
1501        let owner_filter = RpcFilterType::Memcmp(Memcmp::new_base58_encoded(
1502            8 + bytemuck::offset_of!(GtExchange, owner),
1503            owner.as_ref(),
1504        ));
1505        let exchanges = self
1506            .store_accounts::<ZeroCopy<GtExchange>>(Some(store_filter), Some(owner_filter))
1507            .await?;
1508        Ok(exchanges
1509            .into_iter()
1510            .map(|(address, exchange)| (address, exchange.0))
1511            .collect())
1512    }
1513
1514    /// Fetch [`InstructionBuffer`] account with its address.
1515    pub async fn instruction_buffer(
1516        &self,
1517        address: &Pubkey,
1518    ) -> crate::Result<Option<InstructionBuffer>> {
1519        self.account::<InstructionBuffer>(address).await
1520    }
1521}
1522
1523/// Store Filter.
1524#[derive(Debug)]
1525pub struct StoreFilter {
1526    /// Store.
1527    store: Pubkey,
1528    /// Store offset.
1529    store_offset: usize,
1530    /// Ignore disc bytes.
1531    ignore_disc_offset: bool,
1532}
1533
1534impl StoreFilter {
1535    /// Create a new store filter.
1536    pub fn new(store: &Pubkey, store_offset: usize) -> Self {
1537        Self {
1538            store: *store,
1539            store_offset,
1540            ignore_disc_offset: false,
1541        }
1542    }
1543
1544    /// Ignore discriminator offset.
1545    pub fn ignore_disc_offset(mut self, ignore: bool) -> Self {
1546        self.ignore_disc_offset = ignore;
1547        self
1548    }
1549
1550    /// Store offset.
1551    pub fn store_offset(&self) -> usize {
1552        if self.ignore_disc_offset {
1553            self.store_offset
1554        } else {
1555            self.store_offset + DISC_OFFSET
1556        }
1557    }
1558}
1559
1560impl From<StoreFilter> for RpcFilterType {
1561    fn from(filter: StoreFilter) -> Self {
1562        let store = filter.store;
1563        let store_offset = filter.store_offset();
1564        RpcFilterType::Memcmp(Memcmp::new_base58_encoded(store_offset, store.as_ref()))
1565    }
1566}