Skip to main content

anchor_client/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! An RPC client to interact with Solana programs written in [`anchor_lang`].
4//!
5//! # Examples
6//!
7//! A simple example that creates a client, sends a transaction and fetches an account:
8//!
9//! ```ignore
10//! use std::rc::Rc;
11//!
12//! use anchor_client::{Client, Cluster, Signer};
13//! use my_program::{accounts, instruction, MyAccount};
14//! use solana_keypair::{read_keypair_file, Keypair};
15//! use solana_system_interface::program as system_program;
16//!
17//! fn main() -> Result<(), Box<dyn std::error::Error>> {
18//!     // Create client
19//!     let payer = read_keypair_file("keypair.json")?;
20//!     let client = Client::new(Cluster::Localnet, Rc::new(payer));
21//!
22//!     // Create program
23//!     let program = client.program(my_program::ID)?;
24//!
25//!     // Send transaction
26//!     let my_account_kp = Keypair::new();
27//!     program
28//!         .request()
29//!         .accounts(accounts::Initialize {
30//!             my_account: my_account_kp.pubkey(),
31//!             payer: program.payer(),
32//!             system_program: system_program::ID,
33//!         })
34//!         .args(instruction::Initialize { field: 42 })
35//!         .signer(&my_account_kp)
36//!         .send()?;
37//!
38//!     // Fetch account
39//!     let my_account: MyAccount = program.account(my_account_kp.pubkey())?;
40//!     assert_eq!(my_account.field, 42);
41//!
42//!     Ok(())
43//! }
44//! ```
45//!
46//! More examples can be found in [here].
47//!
48//! [here]: https://github.com/otter-sec/anchor/tree/v1.1.2/client/example/src
49//!
50//! # Features
51//!
52//! ## `async`
53//!
54//! The client is blocking by default. To enable asynchronous client, add `async` feature:
55//!
56//! ```toml
57//! anchor-client = { version = "1.1.2 ", features = ["async"] }
58//! ````
59//!
60//! ## `mock`
61//!
62//! This feature allows passing in a custom RPC client when creating program instances, which is
63//! useful for mocking RPC responses, e.g. via [`RpcClient::new_mock`].
64//!
65//! [`RpcClient::new_mock`]: https://docs.rs/solana-rpc-client/3.0.0/solana_rpc_client/rpc_client/struct.RpcClient.html#method.new_mock
66
67#[cfg(feature = "async")]
68pub use nonblocking::ThreadSafeSigner;
69pub use {
70    anchor_lang,
71    cluster::Cluster,
72    solana_commitment_config::CommitmentConfig,
73    solana_hash::Hash,
74    solana_instruction::Instruction,
75    solana_message::AddressLookupTableAccount,
76    solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
77    solana_rpc_client_api::{
78        client_error::{Error as SolanaClientError, ErrorKind as SolanaClientErrorKind},
79        config::RpcSendTransactionConfig,
80        filter::RpcFilterType,
81    },
82    solana_signer::{Signer, SignerError},
83    solana_transaction::{versioned::VersionedTransaction, Transaction},
84};
85use {
86    anchor_lang::{
87        solana_program::{program_error::ProgramError, pubkey::Pubkey},
88        AccountDeserialize, Discriminator, InstructionData, ToAccountMetas,
89    },
90    futures::{Future, StreamExt},
91    regex::Regex,
92    solana_account_decoder::{UiAccount, UiAccountEncoding},
93    solana_instruction::AccountMeta,
94    solana_message::v0,
95    solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
96    solana_rpc_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient,
97    solana_rpc_client_api::{
98        config::{
99            RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
100            RpcTransactionLogsFilter,
101        },
102        filter::Memcmp,
103        response::{Response as RpcResponse, RpcLogsResponse},
104    },
105    solana_signature::Signature,
106    std::{
107        iter::Map,
108        marker::PhantomData,
109        ops::Deref,
110        pin::Pin,
111        sync::{Arc, LazyLock},
112        vec::IntoIter,
113    },
114    thiserror::Error,
115    tokio::{
116        runtime::Handle,
117        sync::{
118            mpsc::{unbounded_channel, UnboundedReceiver},
119            OnceCell,
120        },
121        task::JoinHandle,
122    },
123};
124
125mod cluster;
126
127/// Specifies which transaction version to use when building transactions.
128#[derive(Debug, Clone, Default)]
129pub enum TxVersion<'a> {
130    /// Legacy transaction format.
131    #[default]
132    Legacy,
133    /// Versioned transaction format (v0) with optional address lookup tables.
134    V0(&'a [AddressLookupTableAccount]),
135}
136
137#[cfg(not(feature = "async"))]
138mod blocking;
139#[cfg(feature = "async")]
140mod nonblocking;
141
142const PROGRAM_LOG: &str = "Program log: ";
143const PROGRAM_DATA: &str = "Program data: ";
144
145type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
146/// Client defines the base configuration for building RPC clients to
147/// communicate with Anchor programs running on a Solana cluster. It's
148/// primary use is to build a `Program` client via the `program` method.
149pub struct Client<C> {
150    cfg: Config<C>,
151}
152
153impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
154    pub fn new(cluster: Cluster, payer: C) -> Self {
155        Self {
156            cfg: Config {
157                cluster,
158                payer,
159                options: None,
160            },
161        }
162    }
163
164    pub fn new_with_options(cluster: Cluster, payer: C, options: CommitmentConfig) -> Self {
165        Self {
166            cfg: Config {
167                cluster,
168                payer,
169                options: Some(options),
170            },
171        }
172    }
173
174    pub fn program(
175        &self,
176        program_id: Pubkey,
177        #[cfg(feature = "mock")] rpc_client: AsyncRpcClient,
178    ) -> Result<Program<C>, ClientError> {
179        let cfg = Config {
180            cluster: self.cfg.cluster.clone(),
181            options: self.cfg.options,
182            payer: self.cfg.payer.clone(),
183        };
184
185        Program::new(
186            program_id,
187            cfg,
188            #[cfg(feature = "mock")]
189            rpc_client,
190        )
191    }
192}
193
194/// Auxiliary data structure to align the types of the Solana CLI utils with Anchor client.
195/// Client<C> implementation requires <C: Clone + Deref<Target = impl Signer>> which does not comply with Box<dyn Signer>
196/// that's used when loaded Signer from keypair file. This struct is used to wrap the usage.
197pub struct DynSigner(pub Arc<dyn Signer>);
198
199impl Signer for DynSigner {
200    fn pubkey(&self) -> Pubkey {
201        self.0.pubkey()
202    }
203
204    fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
205        self.0.try_pubkey()
206    }
207
208    fn sign_message(&self, message: &[u8]) -> Signature {
209        self.0.sign_message(message)
210    }
211
212    fn try_sign_message(&self, message: &[u8]) -> Result<Signature, SignerError> {
213        self.0.try_sign_message(message)
214    }
215
216    fn is_interactive(&self) -> bool {
217        self.0.is_interactive()
218    }
219}
220
221// Internal configuration for a client.
222#[derive(Debug)]
223pub struct Config<C> {
224    cluster: Cluster,
225    payer: C,
226    options: Option<CommitmentConfig>,
227}
228
229pub struct EventUnsubscriber<'a> {
230    handle: JoinHandle<Result<(), ClientError>>,
231    rx: UnboundedReceiver<UnsubscribeFn>,
232    #[cfg(not(feature = "async"))]
233    runtime_handle: &'a Handle,
234    _lifetime_marker: PhantomData<&'a Handle>,
235}
236
237impl EventUnsubscriber<'_> {
238    async fn unsubscribe_internal(mut self) {
239        if let Some(unsubscribe) = self.rx.recv().await {
240            unsubscribe().await;
241        }
242
243        let _ = self.handle.await;
244    }
245}
246
247/// Program is the primary client handle to be used to build and send requests.
248pub struct Program<C> {
249    program_id: Pubkey,
250    cfg: Config<C>,
251    sub_client: OnceCell<Arc<PubsubClient>>,
252    #[cfg(not(feature = "async"))]
253    rt: tokio::runtime::Runtime,
254    internal_rpc_client: AsyncRpcClient,
255}
256
257impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
258    pub fn payer(&self) -> Pubkey {
259        self.cfg.payer.pubkey()
260    }
261
262    pub fn id(&self) -> Pubkey {
263        self.program_id
264    }
265
266    #[cfg(feature = "mock")]
267    pub fn internal_rpc(&self) -> &AsyncRpcClient {
268        &self.internal_rpc_client
269    }
270
271    async fn account_internal<T: AccountDeserialize>(
272        &self,
273        address: Pubkey,
274    ) -> Result<T, ClientError> {
275        let account = self
276            .internal_rpc_client
277            .get_account_with_commitment(&address, CommitmentConfig::processed())
278            .await
279            .map_err(Box::new)?
280            .value
281            .ok_or(ClientError::AccountNotFound)?;
282        let mut data: &[u8] = &account.data;
283        T::try_deserialize(&mut data).map_err(Into::into)
284    }
285
286    async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
287        &self,
288        filters: Vec<RpcFilterType>,
289    ) -> Result<ProgramAccountsIterator<T>, ClientError> {
290        let account_type_filter =
291            RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, T::DISCRIMINATOR));
292        let config = RpcProgramAccountsConfig {
293            filters: Some([vec![account_type_filter], filters].concat()),
294            account_config: RpcAccountInfoConfig {
295                encoding: Some(UiAccountEncoding::Base64),
296                ..RpcAccountInfoConfig::default()
297            },
298            ..RpcProgramAccountsConfig::default()
299        };
300
301        Ok(ProgramAccountsIterator {
302            inner: self
303                .internal_rpc_client
304                .get_program_ui_accounts_with_config(&self.id(), config)
305                .await
306                .map_err(Box::new)?
307                .into_iter()
308                .map(|(key, account)| {
309                    let data = account.data.decode().ok_or_else(|| {
310                        ClientError::SolanaClientError(Box::new(
311                            SolanaClientError::new_with_request(
312                                SolanaClientErrorKind::Custom(
313                                    "Failed to decode account data".to_string(),
314                                ),
315                                solana_rpc_client_api::request::RpcRequest::GetProgramAccounts,
316                            ),
317                        ))
318                    })?;
319                    Ok((key, T::try_deserialize(&mut data.as_slice())?))
320                }),
321        })
322    }
323
324    async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
325        &self,
326        mut f: impl FnMut(&EventContext, T) + Send + 'static,
327    ) -> Result<
328        (
329            JoinHandle<Result<(), ClientError>>,
330            UnboundedReceiver<UnsubscribeFn>,
331        ),
332        ClientError,
333    > {
334        let client = self
335            .sub_client
336            .get_or_try_init(|| async {
337                PubsubClient::new(self.cfg.cluster.ws_url())
338                    .await
339                    .map(Arc::new)
340                    .map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e)))
341            })
342            .await?
343            .clone();
344
345        let (tx, rx) = unbounded_channel::<_>();
346        let config = RpcTransactionLogsConfig {
347            commitment: self.cfg.options,
348        };
349        let program_id_str = self.program_id.to_string();
350        let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
351
352        let handle = tokio::spawn(async move {
353            let (mut notifications, unsubscribe) = client
354                .logs_subscribe(filter, config)
355                .await
356                .map_err(Box::new)?;
357
358            tx.send(unsubscribe).map_err(|e| {
359                ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed {
360                    message: "Unsubscribe failed".to_string(),
361                    reason: e.to_string(),
362                }))
363            })?;
364
365            while let Some(logs) = notifications.next().await {
366                let signature: Signature = logs.value.signature.parse().map_err(|e| {
367                    ClientError::LogParseError(format!(
368                        "Invalid signature '{}': {e}",
369                        logs.value.signature
370                    ))
371                })?;
372                let ctx = EventContext {
373                    signature,
374                    slot: logs.context.slot,
375                };
376                let events = parse_logs_response(logs, &program_id_str)?;
377                for e in events {
378                    f(&ctx, e);
379                }
380            }
381            Ok::<(), ClientError>(())
382        });
383
384        Ok((handle, rx))
385    }
386}
387
388/// Iterator with items of type (Pubkey, T). Used to lazily deserialize account structs.
389/// Wrapper type hides the inner type from usages so the implementation can be changed.
390pub struct ProgramAccountsIterator<T> {
391    inner: Map<IntoIter<(Pubkey, UiAccount)>, AccountConverterFunction<T>>,
392}
393
394/// Function type that accepts solana accounts and returns deserialized anchor accounts
395type AccountConverterFunction<T> = fn((Pubkey, UiAccount)) -> Result<(Pubkey, T), ClientError>;
396
397impl<T> Iterator for ProgramAccountsIterator<T> {
398    type Item = Result<(Pubkey, T), ClientError>;
399
400    fn next(&mut self) -> Option<Self::Item> {
401        self.inner.next()
402    }
403}
404
405pub fn handle_program_log<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
406    self_program_str: &str,
407    l: &str,
408) -> Result<(Option<T>, Option<String>, bool), ClientError> {
409    use {
410        anchor_lang::__private::base64,
411        base64::{engine::general_purpose::STANDARD, Engine},
412    };
413
414    // Log emitted from the current program.
415    if let Some(log) = l
416        .strip_prefix(PROGRAM_LOG)
417        .or_else(|| l.strip_prefix(PROGRAM_DATA))
418    {
419        let log_bytes = match STANDARD.decode(log) {
420            Ok(log_bytes) => log_bytes,
421            _ => {
422                #[cfg(feature = "debug")]
423                println!("Could not base64 decode log: {}", log);
424                return Ok((None, None, false));
425            }
426        };
427
428        let event = log_bytes
429            .starts_with(T::DISCRIMINATOR)
430            .then(|| {
431                let mut data = &log_bytes[T::DISCRIMINATOR.len()..];
432                T::deserialize(&mut data).map_err(|e| ClientError::LogParseError(e.to_string()))
433            })
434            .transpose()?;
435
436        Ok((event, None, false))
437    }
438    // System log.
439    else {
440        let (program, did_pop) = handle_system_log(self_program_str, l);
441        Ok((None, program, did_pop))
442    }
443}
444
445pub fn handle_system_log(this_program_str: &str, log: &str) -> (Option<String>, bool) {
446    static INVOKE_RE: LazyLock<Regex> = LazyLock::new(|| {
447        Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[([\d]+)\]$").unwrap()
448    });
449    if let Some(invoke_match) = INVOKE_RE.captures(log) {
450        if invoke_match.get(1).unwrap().as_str() == this_program_str {
451            return (Some(this_program_str.to_string()), false);
452
453            // `Invoke [1]` instructions are pushed to the stack in `parse_logs_response`,
454            // so this ensures we only push CPIs to the stack at this stage
455        } else if invoke_match.get(2).unwrap().as_str() != "1" {
456            return (Some("cpi".to_string()), false); // Any string will do.
457        }
458    }
459
460    if log.starts_with(&format!("Program {this_program_str} log:")) {
461        (Some(this_program_str.to_string()), false)
462    } else {
463        static SUCESS_RE: LazyLock<Regex> =
464            LazyLock::new(|| Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) success$").unwrap());
465        if SUCESS_RE.is_match(log) {
466            (None, true)
467        } else {
468            (None, false)
469        }
470    }
471}
472
473pub struct Execution {
474    stack: Vec<String>,
475}
476
477impl Execution {
478    pub fn new(logs: &mut &[String]) -> Result<Self, ClientError> {
479        let l = &logs[0];
480        *logs = &logs[1..];
481        static RE: LazyLock<Regex> = LazyLock::new(|| {
482            Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap()
483        });
484        let c = RE
485            .captures(l)
486            .ok_or_else(|| ClientError::LogParseError(l.to_string()))?;
487        let program = c
488            .get(1)
489            .ok_or_else(|| ClientError::LogParseError(l.to_string()))?
490            .as_str()
491            .to_string();
492        Ok(Self {
493            stack: vec![program],
494        })
495    }
496
497    pub fn program(&self) -> String {
498        assert!(!self.stack.is_empty());
499        self.stack[self.stack.len() - 1].clone()
500    }
501
502    pub fn push(&mut self, new_program: String) {
503        self.stack.push(new_program);
504    }
505
506    pub fn pop(&mut self) {
507        assert!(!self.stack.is_empty());
508        self.stack.pop().unwrap();
509    }
510}
511
512#[derive(Debug)]
513pub struct EventContext {
514    pub signature: Signature,
515    pub slot: u64,
516}
517
518#[derive(Debug, Error)]
519pub enum ClientError {
520    #[error("Account not found")]
521    AccountNotFound,
522    #[error("{0}")]
523    AnchorError(#[from] anchor_lang::error::Error),
524    #[error("{0}")]
525    ProgramError(#[from] ProgramError),
526    #[error("{0}")]
527    SolanaClientError(#[from] Box<SolanaClientError>),
528    #[error("{0}")]
529    SolanaClientPubsubError(#[from] Box<PubsubClientError>),
530    #[error("Unable to parse log: {0}")]
531    LogParseError(String),
532    #[error(transparent)]
533    IOError(#[from] std::io::Error),
534    #[error("{0}")]
535    SignerError(#[from] SignerError),
536}
537
538impl ClientError {
539    /// Adding a new variant to [`ClientError`] is a breaking change in v1. To mitigate this issue,
540    /// use this helper method for all errors that cannot be precisely described by [`ClientError`].
541    fn other<E>(e: E) -> Self
542    where
543        E: Into<Box<dyn std::error::Error + Send + Sync>>,
544    {
545        Self::IOError(std::io::Error::other(e))
546    }
547}
548
549pub trait AsSigner {
550    fn as_signer(&self) -> &dyn Signer;
551}
552
553impl AsSigner for Box<dyn Signer + '_> {
554    fn as_signer(&self) -> &dyn Signer {
555        self.as_ref()
556    }
557}
558
559/// `RequestBuilder` provides a builder interface to create and send
560/// transactions to a cluster.
561pub struct RequestBuilder<'a, C, S: 'a> {
562    cluster: String,
563    program_id: Pubkey,
564    accounts: Vec<AccountMeta>,
565    options: CommitmentConfig,
566    instructions: Vec<Instruction>,
567    payer: C,
568    instruction_data: Option<Vec<u8>>,
569    signers: Vec<S>,
570    #[cfg(not(feature = "async"))]
571    handle: &'a Handle,
572    internal_rpc_client: &'a AsyncRpcClient,
573    _phantom: PhantomData<&'a ()>,
574}
575
576// Shared implementation for all RequestBuilders
577impl<C: Deref<Target = impl Signer> + Clone, S: AsSigner> RequestBuilder<'_, C, S> {
578    #[must_use]
579    pub fn payer(mut self, payer: C) -> Self {
580        self.payer = payer;
581        self
582    }
583
584    #[must_use]
585    pub fn cluster(mut self, url: &str) -> Self {
586        self.cluster = url.to_string();
587        self
588    }
589
590    #[must_use]
591    pub fn instruction(mut self, ix: Instruction) -> Self {
592        self.instructions.push(ix);
593        self
594    }
595
596    #[must_use]
597    pub fn program(mut self, program_id: Pubkey) -> Self {
598        self.program_id = program_id;
599        self
600    }
601
602    /// Set the accounts to pass to the instruction.
603    ///
604    /// `accounts` argument can be:
605    ///
606    /// - Any type that implements [`ToAccountMetas`] trait
607    /// - A vector of [`AccountMeta`]s (for remaining accounts)
608    ///
609    /// Note that the given accounts are appended to the previous list of accounts instead of
610    /// overriding the existing ones (if any).
611    ///
612    /// # Example
613    ///
614    /// ```ignore
615    /// program
616    ///     .request()
617    ///     // Regular accounts
618    ///     .accounts(accounts::Initialize {
619    ///         my_account: my_account_kp.pubkey(),
620    ///         payer: program.payer(),
621    ///         system_program: system_program::ID,
622    ///     })
623    ///     // Remaining accounts
624    ///     .accounts(vec![AccountMeta {
625    ///         pubkey: remaining,
626    ///         is_signer: true,
627    ///         is_writable: true,
628    ///     }])
629    ///     .args(instruction::Initialize { field: 42 })
630    ///     .send()?;
631    /// ```
632    #[must_use]
633    pub fn accounts(mut self, accounts: impl ToAccountMetas) -> Self {
634        let mut metas = accounts.to_account_metas(None);
635        self.accounts.append(&mut metas);
636        self
637    }
638
639    #[must_use]
640    pub fn options(mut self, options: CommitmentConfig) -> Self {
641        self.options = options;
642        self
643    }
644
645    #[must_use]
646    pub fn args(mut self, args: impl InstructionData) -> Self {
647        self.instruction_data = Some(args.data());
648        self
649    }
650
651    pub fn instructions(&self) -> Vec<Instruction> {
652        let mut instructions = self.instructions.clone();
653        if let Some(ix_data) = &self.instruction_data {
654            instructions.push(Instruction {
655                program_id: self.program_id,
656                data: ix_data.clone(),
657                accounts: self.accounts.clone(),
658            });
659        }
660
661        instructions
662    }
663
664    /// Build the request into a transaction.
665    ///
666    /// Note: This will build a transaction with the legacy transaction format. If you'd like to use
667    /// a different transaction format, use [`transaction_versioned`].
668    pub fn transaction(&self) -> Transaction {
669        let instructions = &self.instructions();
670        Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()))
671    }
672
673    /// Build an unsigned transaction.
674    ///
675    /// # Arguments
676    ///
677    /// * `version` - The transaction version to use ([`TxVersion::Legacy`] or [`TxVersion::V0`]).
678    /// * `recent_blockhash` - A recent blockhash to include in the transaction message.
679    ///
680    /// # Example
681    ///
682    /// ```no_run
683    /// use anchor_client::{Client, Cluster, TxVersion};
684    /// use anchor_lang::prelude::Pubkey;
685    /// use solana_signer::null_signer::NullSigner;
686    /// use solana_message::AddressLookupTableAccount;
687    /// use solana_message::Hash;
688    ///
689    /// let payer = NullSigner::new(&Pubkey::default());
690    /// let client = Client::new(Cluster::Localnet, std::rc::Rc::new(payer));
691    ///
692    /// let program = client.program(Pubkey::default()).unwrap();
693    /// // Dummy blockhash
694    /// let blockhash = Hash::from([0; 32]);
695    /// let lookup_table = AddressLookupTableAccount { key: Pubkey::default(), addresses: vec![] };
696    ///
697    /// let request = program.request();
698    /// // Legacy transaction
699    /// let tx = request.transaction_versioned(TxVersion::Legacy, blockhash).unwrap();
700    ///
701    /// // V0 transaction with address lookup tables
702    /// let tx = request.transaction_versioned(TxVersion::V0(&[lookup_table]), blockhash).unwrap();
703    ///
704    /// // V0 transaction without lookup tables
705    /// let tx = request.transaction_versioned(TxVersion::V0(&[]), blockhash).unwrap();
706    //// ```
707    pub fn transaction_versioned(
708        &self,
709        version: TxVersion<'_>,
710        recent_blockhash: Hash,
711    ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
712        let instructions = self.instructions();
713        let payer = self.payer.pubkey();
714
715        match version {
716            TxVersion::Legacy => {
717                let message = solana_message::legacy::Message::new_with_blockhash(
718                    &instructions,
719                    Some(&payer),
720                    &recent_blockhash,
721                );
722                Ok(solana_transaction::versioned::VersionedTransaction {
723                    signatures: vec![
724                        solana_signature::Signature::default();
725                        message.header.num_required_signatures as usize
726                    ],
727                    message: solana_message::VersionedMessage::Legacy(message),
728                })
729            }
730            TxVersion::V0(address_lookup_table_accounts) => {
731                let message = v0::Message::try_compile(
732                    &payer,
733                    &instructions,
734                    address_lookup_table_accounts,
735                    recent_blockhash,
736                )
737                .map_err(ClientError::other)?;
738                Ok(solana_transaction::versioned::VersionedTransaction {
739                    signatures: vec![
740                        solana_signature::Signature::default();
741                        message.header.num_required_signatures as usize
742                    ],
743                    message: solana_message::VersionedMessage::V0(message),
744                })
745            }
746        }
747    }
748
749    fn signed_transaction_with_blockhash_versioned(
750        &self,
751        version: TxVersion<'_>,
752        latest_hash: Hash,
753    ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
754        let signers: Vec<&dyn Signer> = self.signers.iter().map(|s| s.as_signer()).collect();
755        let mut all_signers = signers;
756        all_signers.push(&*self.payer);
757
758        let instructions = self.instructions();
759        let payer = self.payer.pubkey();
760
761        let message = match version {
762            TxVersion::Legacy => {
763                let msg = solana_message::legacy::Message::new_with_blockhash(
764                    &instructions,
765                    Some(&payer),
766                    &latest_hash,
767                );
768                solana_message::VersionedMessage::Legacy(msg)
769            }
770            TxVersion::V0(address_lookup_table_accounts) => {
771                let msg = v0::Message::try_compile(
772                    &payer,
773                    &instructions,
774                    address_lookup_table_accounts,
775                    latest_hash,
776                )
777                .map_err(ClientError::other)?;
778                solana_message::VersionedMessage::V0(msg)
779            }
780        };
781
782        let tx =
783            solana_transaction::versioned::VersionedTransaction::try_new(message, &all_signers)?;
784
785        Ok(tx)
786    }
787
788    async fn signed_transaction_internal(
789        &self,
790        version: TxVersion<'_>,
791    ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
792        let latest_hash = self
793            .internal_rpc_client
794            .get_latest_blockhash()
795            .await
796            .map_err(Box::new)?;
797
798        self.signed_transaction_with_blockhash_versioned(version, latest_hash)
799    }
800
801    async fn send_internal(&self, version: TxVersion<'_>) -> Result<Signature, ClientError> {
802        let latest_hash = self
803            .internal_rpc_client
804            .get_latest_blockhash()
805            .await
806            .map_err(Box::new)?;
807        let tx = self.signed_transaction_with_blockhash_versioned(version, latest_hash)?;
808
809        self.internal_rpc_client
810            .send_and_confirm_transaction(&tx)
811            .await
812            .map_err(|e| Box::new(e).into())
813    }
814
815    async fn send_with_spinner_and_config_internal(
816        &self,
817        version: TxVersion<'_>,
818        config: RpcSendTransactionConfig,
819    ) -> Result<Signature, ClientError> {
820        let latest_hash = self
821            .internal_rpc_client
822            .get_latest_blockhash()
823            .await
824            .map_err(Box::new)?;
825        let tx = self.signed_transaction_with_blockhash_versioned(version, latest_hash)?;
826
827        self.internal_rpc_client
828            .send_and_confirm_transaction_with_spinner_and_config(
829                &tx,
830                self.internal_rpc_client.commitment(),
831                config,
832            )
833            .await
834            .map_err(|e| Box::new(e).into())
835    }
836}
837
838fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
839    logs: RpcResponse<RpcLogsResponse>,
840    program_id_str: &str,
841) -> Result<Vec<T>, ClientError> {
842    let mut logs = &logs.value.logs[..];
843    let mut events: Vec<T> = Vec::new();
844    if !logs.is_empty() {
845        if let Ok(mut execution) = Execution::new(&mut logs) {
846            // Create a new peekable iterator so that we can peek at the next log whilst iterating
847            let mut logs_iter = logs.iter().peekable();
848            static RE: LazyLock<Regex> = LazyLock::new(|| {
849                Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[(\d+)\]$").unwrap()
850            });
851
852            while let Some(l) = logs_iter.next() {
853                // Parse the log.
854                let (event, new_program, did_pop) = {
855                    if program_id_str == execution.program() {
856                        handle_program_log(program_id_str, l)?
857                    } else {
858                        let (program, did_pop) = handle_system_log(program_id_str, l);
859                        (None, program, did_pop)
860                    }
861                };
862                // Emit the event.
863                if let Some(e) = event {
864                    events.push(e);
865                }
866                // Switch program context on CPI.
867                if let Some(new_program) = new_program {
868                    execution.push(new_program);
869                }
870                // Program returned.
871                if did_pop {
872                    execution.pop();
873
874                    // If the current iteration popped then it means there was a
875                    //`Program x success` log. If the next log in the iteration is
876                    // of depth [1] then we're not within a CPI and this is a new instruction.
877                    //
878                    // We need to ensure that the `Execution` instance is updated with
879                    // the next program ID, or else `execution.program()` will cause
880                    // a panic during the next iteration.
881                    //
882                    // Use the full regex match to gate this branch. A loose
883                    // `ends_with("invoke [1]")` check would also accept program-emitted
884                    // log lines that happen to end in that suffix (e.g.
885                    // `"Program log: ...invoke [1]"`), which then fail the strict
886                    // `^Program <pubkey> invoke [N]$` regex and panic on unwrap.
887                    if let Some(&next_log) = logs_iter.peek() {
888                        if let Some(caps) = RE.captures(next_log) {
889                            if &caps[2] == "1" {
890                                execution.push(caps[1].to_string());
891                            }
892                        }
893                    };
894                }
895            }
896        }
897    }
898    Ok(events)
899}
900
901#[cfg(test)]
902mod tests {
903    // Creating a mock struct that implements `anchor_lang::events`
904    // for type inference in `test_logs`
905    use {
906        anchor_lang::{prelude::*, Event},
907        futures::{SinkExt, StreamExt},
908        solana_rpc_client_api::response::RpcResponseContext,
909        std::sync::atomic::{AtomicU64, Ordering},
910        tokio_tungstenite::tungstenite::Message,
911    };
912    #[derive(Debug, Clone, Copy)]
913    #[event]
914    pub struct MockEvent {}
915
916    use super::*;
917    #[test]
918    fn new_execution() {
919        let mut logs: &[String] =
920            &["Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw invoke [1]".to_string()];
921        let exe = Execution::new(&mut logs).unwrap();
922        assert_eq!(
923            exe.stack[0],
924            "7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw".to_string()
925        );
926    }
927
928    #[test]
929    fn handle_system_log_pop() {
930        let log = "Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw success";
931        let (program, did_pop) = handle_system_log("asdf", log);
932        assert_eq!(program, None);
933        assert!(did_pop);
934    }
935
936    #[test]
937    fn handle_system_log_no_pop() {
938        let log = "Program 7swsTUiQ6KUK4uFYquQKg4epFRsBnvbrTf2fZQCa2sTJ qwer";
939        let (program, did_pop) = handle_system_log("asdf", log);
940        assert_eq!(program, None);
941        assert!(!did_pop);
942    }
943
944    #[test]
945    fn test_parse_logs_response() -> Result<()> {
946        // Mock logs received within an `RpcResponse`. These are based on a Jupiter transaction.
947        let logs = vec![
948            "Program VeryCoolProgram invoke [1]", // Outer instruction #1 starts
949            "Program log: Instruction: VeryCoolEvent",
950            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
951            "Program log: Instruction: Transfer",
952            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 664387 compute \
953             units",
954            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
955            "Program VeryCoolProgram consumed 42417 of 700000 compute units",
956            "Program VeryCoolProgram success", // Outer instruction #1 ends
957            "Program EvenCoolerProgram invoke [1]", // Outer instruction #2 starts
958            "Program log: Instruction: EvenCoolerEvent",
959            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
960            "Program log: Instruction: TransferChecked",
961            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6200 of 630919 compute \
962             units",
963            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
964            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
965            "Program log: Instruction: Swap",
966            "Program log: INVARIANT: SWAP",
967            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
968            "Program log: Instruction: Transfer",
969            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 539321 compute \
970             units",
971            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
972            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
973            "Program log: Instruction: Transfer",
974            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 531933 compute \
975             units",
976            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
977            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 84670 of 610768 \
978             compute units",
979            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
980            "Program EvenCoolerProgram invoke [2]",
981            "Program EvenCoolerProgram consumed 2021 of 523272 compute units",
982            "Program EvenCoolerProgram success",
983            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
984            "Program log: Instruction: Swap",
985            "Program log: INVARIANT: SWAP",
986            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
987            "Program log: Instruction: Transfer",
988            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 418618 compute \
989             units",
990            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
991            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
992            "Program log: Instruction: Transfer",
993            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 411230 compute \
994             units",
995            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
996            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 102212 of 507607 \
997             compute units",
998            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
999            "Program EvenCoolerProgram invoke [2]",
1000            "Program EvenCoolerProgram consumed 2021 of 402569 compute units",
1001            "Program EvenCoolerProgram success",
1002            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP invoke [2]",
1003            "Program log: Instruction: Swap",
1004            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1005            "Program log: Instruction: Transfer",
1006            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 371140 compute \
1007             units",
1008            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1009            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1010            "Program log: Instruction: MintTo",
1011            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4492 of 341800 compute \
1012             units",
1013            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1014            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1015            "Program log: Instruction: Transfer",
1016            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 334370 compute \
1017             units",
1018            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1019            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP consumed 57610 of 386812 \
1020             compute units",
1021            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP success",
1022            "Program EvenCoolerProgram invoke [2]",
1023            "Program EvenCoolerProgram consumed 2021 of 326438 compute units",
1024            "Program EvenCoolerProgram success",
1025            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
1026            "Program log: Instruction: TransferChecked",
1027            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6173 of 319725 compute \
1028             units",
1029            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1030            "Program EvenCoolerProgram consumed 345969 of 657583 compute units",
1031            "Program EvenCoolerProgram success", // Outer instruction #2 ends
1032            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1033            "Program ComputeBudget111111111111111111111111111111 success",
1034            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1035            "Program ComputeBudget111111111111111111111111111111 success",
1036        ];
1037
1038        // Converting to Vec<String> as expected in `RpcLogsResponse`
1039        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1040
1041        let program_id_str = "VeryCoolProgram";
1042
1043        // No events returned here. Just ensuring that the function doesn't panic
1044        // due an incorrectly emptied stack.
1045        parse_logs_response::<MockEvent>(
1046            RpcResponse {
1047                context: RpcResponseContext::new(0),
1048                value: RpcLogsResponse {
1049                    signature: "".to_string(),
1050                    err: None,
1051                    logs: logs.to_vec(),
1052                },
1053            },
1054            program_id_str,
1055        )
1056        .unwrap();
1057
1058        Ok(())
1059    }
1060
1061    #[test]
1062    fn test_parse_logs_response_fake_pop() -> Result<()> {
1063        let logs = [
1064            "Program fake111111111111111111111111111111111111112 invoke [1]",
1065            "Program log: i logged success",
1066            "Program log: i logged success",
1067            "Program fake111111111111111111111111111111111111112 consumed 1411 of 200000 compute \
1068             units",
1069            "Program fake111111111111111111111111111111111111112 success",
1070        ];
1071
1072        // Converting to Vec<String> as expected in `RpcLogsResponse`
1073        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1074
1075        let program_id_str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb";
1076
1077        // No events returned here. Just ensuring that the function doesn't panic
1078        // due an incorrectly emptied stack.
1079        parse_logs_response::<MockEvent>(
1080            RpcResponse {
1081                context: RpcResponseContext::new(0),
1082                value: RpcLogsResponse {
1083                    signature: "".to_string(),
1084                    err: None,
1085                    logs: logs.to_vec(),
1086                },
1087            },
1088            program_id_str,
1089        )
1090        .unwrap();
1091
1092        Ok(())
1093    }
1094
1095    /// Regression for #4461: a program-emitted `Program log:` line that ends
1096    /// with the literal `"invoke [1]"` (e.g. log content that happens to
1097    /// describe a CPI) used to satisfy the `ends_with` gate but fail the
1098    /// strict `^Program <pubkey> invoke [N]$` regex, panicking on
1099    /// `.captures(...).unwrap()` whenever it appeared right after a CPI pop.
1100    #[test]
1101    fn test_parse_logs_response_log_line_ends_with_invoke_1() -> Result<()> {
1102        let logs = [
1103            "Program VeryCoolProgram invoke [1]",
1104            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
1105            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1106            // Program-emitted log that happens to end with "invoke [1]"
1107            // immediately after a CPI returns. Pre-fix this would panic.
1108            "Program log: forwarded inner instruction invoke [1]",
1109            "Program VeryCoolProgram success",
1110        ];
1111        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1112
1113        parse_logs_response::<MockEvent>(
1114            RpcResponse {
1115                context: RpcResponseContext::new(0),
1116                value: RpcLogsResponse {
1117                    signature: "".to_string(),
1118                    err: None,
1119                    logs,
1120                },
1121            },
1122            "VeryCoolProgram",
1123        )
1124        .unwrap();
1125
1126        Ok(())
1127    }
1128
1129    #[test]
1130    fn test_parse_log_response_inner_events() -> Result<()> {
1131        use {
1132            anchor_lang::__private::base64,
1133            base64::{engine::general_purpose::STANDARD, Engine},
1134        };
1135
1136        let mock_event = MockEvent {};
1137        let program_data_log = format!("Program data: {}", STANDARD.encode(mock_event.data()));
1138
1139        let logs = vec![
1140            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1141            "Program ComputeBudget111111111111111111111111111111 success",
1142            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1143            "Program ComputeBudget111111111111111111111111111111 success",
1144            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 invoke [1]",
1145            "Program log: Instruction: ValidateNonce",
1146            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 consumed 4839 of 239700 compute \
1147             units",
1148            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 success",
1149            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 invoke [1]",
1150            "Program log: Instruction: SellExactInPumpFunV3",
1151            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P invoke [2]",
1152            "Program log: Instruction: Sell",
1153            "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ invoke [3]",
1154            "Program log: Instruction: GetFees",
1155            "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ consumed 3136 of 187774 compute \
1156             units",
1157            "Program return: pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ \
1158             AAAAAAAAAABfAAAAAAAAAB4AAAAAAAAA",
1159            "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ success",
1160            "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb invoke [3]",
1161            "Program log: Instruction: TransferChecked",
1162            "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb consumed 2475 of 180928 compute \
1163             units",
1164            "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb success",
1165            &program_data_log,
1166            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P invoke [3]",
1167            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P consumed 2060 of 166037 compute \
1168             units",
1169            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P success",
1170            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P consumed 60634 of 223605 compute \
1171             units",
1172            "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P success",
1173            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 consumed 72662 of 234861 compute \
1174             units",
1175            "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 success",
1176            "Program 11111111111111111111111111111111 invoke [1]",
1177            "Program 11111111111111111111111111111111 success",
1178            "Program 11111111111111111111111111111111 invoke [1]",
1179            "Program 11111111111111111111111111111111 success",
1180        ];
1181
1182        // Converting to Vec<String> as expected in `RpcLogsResponse`
1183        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1184
1185        let program_id_str = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";
1186
1187        let events = parse_logs_response::<MockEvent>(
1188            RpcResponse {
1189                context: RpcResponseContext::new(0),
1190                value: RpcLogsResponse {
1191                    signature: "".to_string(),
1192                    err: None,
1193                    logs: logs.to_vec(),
1194                },
1195            },
1196            program_id_str,
1197        )
1198        .unwrap();
1199
1200        assert_eq!(events.len(), 1);
1201
1202        Ok(())
1203    }
1204
1205    /// Regression test that registering multiple event listeners does not deadlock.
1206    #[test]
1207    fn multiple_listeners_no_deadlock() {
1208        // Spin up a tiny mock websocket server that responds to `logsSubscribe`
1209        // JSON-RPC requests with a valid subscription id.
1210        let rt = tokio::runtime::Builder::new_multi_thread()
1211            .enable_all()
1212            .build()
1213            .unwrap();
1214
1215        let (addr_tx, addr_rx) = std::sync::mpsc::channel();
1216
1217        rt.spawn(async move {
1218            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1219            let addr = listener.local_addr().unwrap();
1220            addr_tx.send(addr).unwrap();
1221
1222            static SUB_ID: AtomicU64 = AtomicU64::new(0);
1223
1224            loop {
1225                let (stream, _) = listener.accept().await.unwrap();
1226                tokio::spawn(async move {
1227                    let mut ws = tokio_tungstenite::accept_async(stream).await.unwrap();
1228                    while let Some(Ok(Message::Text(_))) = ws.next().await {
1229                        let sub_id = SUB_ID.fetch_add(1, Ordering::Relaxed);
1230                        // The PubsubClient sends sequential integer ids starting at 0.
1231                        let resp =
1232                            format!(r#"{{"jsonrpc":"2.0","result":{sub_id},"id":{sub_id}}}"#);
1233                        ws.send(Message::Text(resp.into())).await.unwrap();
1234                    }
1235                });
1236            }
1237        });
1238
1239        let addr = addr_rx.recv().unwrap();
1240        let ws_url = format!("ws://{}", addr);
1241
1242        let client = super::Client::new(
1243            super::Cluster::Custom(ws_url.clone(), ws_url),
1244            std::sync::Arc::new(solana_keypair::Keypair::new()),
1245        );
1246        let program = client.program(Pubkey::new_unique()).unwrap();
1247
1248        // With the old RwLock-based code, the second call would deadlock.
1249        // Use a timeout to ensure the test fails instead of hanging forever.
1250        let (done_tx, done_rx) = std::sync::mpsc::channel();
1251        let handle = std::thread::spawn(move || {
1252            #[cfg(not(feature = "async"))]
1253            {
1254                let _listener1 = program
1255                    .on::<MockEvent>(|_ctx, _event| {})
1256                    .expect("first listener");
1257
1258                let _listener2 = program
1259                    .on::<MockEvent>(|_ctx, _event| {})
1260                    .expect("second listener");
1261            }
1262
1263            #[cfg(feature = "async")]
1264            {
1265                let rt = tokio::runtime::Builder::new_current_thread()
1266                    .enable_all()
1267                    .build()
1268                    .unwrap();
1269                rt.block_on(async {
1270                    let _listener1 = program
1271                        .on::<MockEvent>(|_ctx, _event| {})
1272                        .await
1273                        .expect("first listener");
1274
1275                    let _listener2 = program
1276                        .on::<MockEvent>(|_ctx, _event| {})
1277                        .await
1278                        .expect("second listener");
1279                });
1280            }
1281
1282            let _ = done_tx.send(());
1283        });
1284
1285        // If this times out, the deadlock is still present.
1286        done_rx
1287            .recv_timeout(std::time::Duration::from_secs(5))
1288            .expect("registering two listeners should not deadlock");
1289
1290        handle.join().unwrap();
1291    }
1292}