Skip to main content

anchor_client/
nonblocking.rs

1use {
2    crate::{
3        AsSigner, ClientError, Config, EventContext, EventUnsubscriber, Program,
4        ProgramAccountsIterator, RequestBuilder,
5    },
6    anchor_lang::{prelude::Pubkey, AccountDeserialize, Discriminator},
7    solana_commitment_config::CommitmentConfig,
8    solana_rpc_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient,
9    solana_rpc_client_api::{config::RpcSendTransactionConfig, filter::RpcFilterType},
10    solana_signature::Signature,
11    solana_signer::Signer,
12    solana_transaction::Transaction,
13    std::{marker::PhantomData, ops::Deref, sync::Arc},
14    tokio::sync::OnceCell,
15};
16
17impl<'a> EventUnsubscriber<'a> {
18    /// Unsubscribe gracefully.
19    pub async fn unsubscribe(self) {
20        self.unsubscribe_internal().await
21    }
22}
23
24pub trait ThreadSafeSigner: Signer + Send + Sync + 'static {
25    fn to_signer(&self) -> &dyn Signer;
26}
27
28impl<T: Signer + Send + Sync + 'static> ThreadSafeSigner for T {
29    fn to_signer(&self) -> &dyn Signer {
30        self
31    }
32}
33
34impl AsSigner for Arc<dyn ThreadSafeSigner> {
35    fn as_signer(&self) -> &dyn Signer {
36        self.to_signer()
37    }
38}
39
40impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
41    pub fn new(
42        program_id: Pubkey,
43        cfg: Config<C>,
44        #[cfg(feature = "mock")] rpc_client: AsyncRpcClient,
45    ) -> Result<Self, ClientError> {
46        #[cfg(not(feature = "mock"))]
47        let rpc_client = {
48            let comm_config = cfg.options.unwrap_or_default();
49            let cluster_url = cfg.cluster.url().to_string();
50            AsyncRpcClient::new_with_commitment(cluster_url.clone(), comm_config)
51        };
52
53        Ok(Self {
54            program_id,
55            cfg,
56            sub_client: OnceCell::new(),
57            internal_rpc_client: rpc_client,
58        })
59    }
60
61    // We disable the `rpc` method for `mock` feature because otherwise we'd either have to
62    // return a new `RpcClient` instance (which is different to the one used internally)
63    // or require the user to pass another one in for blocking (since we use the non-blocking one under the hood).
64    // The former of these would be confusing and the latter would be very annoying, especially since a user
65    // using the mock feature likely already has a `RpcClient` instance at hand anyway.
66    #[cfg(not(feature = "mock"))]
67    pub fn rpc(&self) -> AsyncRpcClient {
68        AsyncRpcClient::new_with_commitment(
69            self.cfg.cluster.url().to_string(),
70            self.cfg.options.unwrap_or_default(),
71        )
72    }
73
74    /// Returns a threadsafe request builder
75    pub fn request(&self) -> RequestBuilder<'_, C, Arc<dyn ThreadSafeSigner>> {
76        RequestBuilder::from(
77            self.program_id,
78            self.cfg.cluster.url(),
79            self.cfg.payer.clone(),
80            self.cfg.options,
81            &self.internal_rpc_client,
82        )
83    }
84
85    /// Returns the account at the given address.
86    pub async fn account<T: AccountDeserialize>(&self, address: Pubkey) -> Result<T, ClientError> {
87        self.account_internal(address).await
88    }
89
90    /// Returns all program accounts of the given type matching the given filters
91    pub async fn accounts<T: AccountDeserialize + Discriminator>(
92        &self,
93        filters: Vec<RpcFilterType>,
94    ) -> Result<Vec<(Pubkey, T)>, ClientError> {
95        self.accounts_lazy(filters).await?.collect()
96    }
97
98    /// Returns all program accounts of the given type matching the given filters as an iterator
99    /// Deserialization is executed lazily
100    pub async fn accounts_lazy<T: AccountDeserialize + Discriminator>(
101        &self,
102        filters: Vec<RpcFilterType>,
103    ) -> Result<ProgramAccountsIterator<T>, ClientError> {
104        self.accounts_lazy_internal(filters).await
105    }
106
107    /// Subscribe to program logs.
108    ///
109    /// Returns an [`EventUnsubscriber`] to unsubscribe and close connection gracefully.
110    pub async fn on<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
111        &self,
112        f: impl FnMut(&EventContext, T) + Send + 'static,
113    ) -> Result<EventUnsubscriber<'_>, ClientError> {
114        let (handle, rx) = self.on_internal(f).await?;
115
116        Ok(EventUnsubscriber {
117            handle,
118            rx,
119            _lifetime_marker: PhantomData,
120        })
121    }
122}
123
124impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C, Arc<dyn ThreadSafeSigner>> {
125    pub fn from(
126        program_id: Pubkey,
127        cluster: &str,
128        payer: C,
129        options: Option<CommitmentConfig>,
130        rpc_client: &'a AsyncRpcClient,
131    ) -> Self {
132        Self {
133            program_id,
134            payer,
135            cluster: cluster.to_string(),
136            accounts: Vec::new(),
137            options: options.unwrap_or_default(),
138            instructions: Vec::new(),
139            instruction_data: None,
140            signers: Vec::new(),
141            internal_rpc_client: rpc_client,
142            _phantom: PhantomData,
143        }
144    }
145
146    #[must_use]
147    pub fn signer<T: ThreadSafeSigner>(mut self, signer: T) -> Self {
148        self.signers.push(Arc::new(signer));
149        self
150    }
151
152    pub async fn signed_transaction(&self) -> Result<Transaction, ClientError> {
153        self.signed_transaction_internal().await
154    }
155
156    pub async fn send(self) -> Result<Signature, ClientError> {
157        self.send_internal().await
158    }
159
160    pub async fn send_with_spinner_and_config(
161        self,
162        config: RpcSendTransactionConfig,
163    ) -> Result<Signature, ClientError> {
164        self.send_with_spinner_and_config_internal(config).await
165    }
166}