Skip to main content

anchor_client/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3//! An RPC client to interact with Solana programs written in [`anchor_lang`].
4//!
5//! # Examples
6//!
7//! A simple example that creates a client, sends a transaction and fetches an account:
8//!
9//! ```ignore
10//! use std::rc::Rc;
11//!
12//! use anchor_client::{Client, Cluster, Signer};
13//! use my_program::{accounts, instruction, MyAccount};
14//! use solana_keypair::{read_keypair_file, Keypair};
15//! use solana_system_interface::program as system_program;
16//!
17//! fn main() -> Result<(), Box<dyn std::error::Error>> {
18//!     // Create client
19//!     let payer = read_keypair_file("keypair.json")?;
20//!     let client = Client::new(Cluster::Localnet, Rc::new(payer));
21//!
22//!     // Create program
23//!     let program = client.program(my_program::ID)?;
24//!
25//!     // Send transaction
26//!     let my_account_kp = Keypair::new();
27//!     program
28//!         .request()
29//!         .accounts(accounts::Initialize {
30//!             my_account: my_account_kp.pubkey(),
31//!             payer: program.payer(),
32//!             system_program: system_program::ID,
33//!         })
34//!         .args(instruction::Initialize { field: 42 })
35//!         .signer(&my_account_kp)
36//!         .send()?;
37//!
38//!     // Fetch account
39//!     let my_account: MyAccount = program.account(my_account_kp.pubkey())?;
40//!     assert_eq!(my_account.field, 42);
41//!
42//!     Ok(())
43//! }
44//! ```
45//!
46//! More examples can be found in [here].
47//!
48//! [here]: https://github.com/solana-foundation/anchor/tree/v1.0.1/client/example/src
49//!
50//! # Features
51//!
52//! ## `async`
53//!
54//! The client is blocking by default. To enable asynchronous client, add `async` feature:
55//!
56//! ```toml
57//! anchor-client = { version = "1.0.1 ", features = ["async"] }
58//! ````
59//!
60//! ## `mock`
61//!
62//! This feature allows passing in a custom RPC client when creating program instances, which is
63//! useful for mocking RPC responses, e.g. via [`RpcClient::new_mock`].
64//!
65//! [`RpcClient::new_mock`]: https://docs.rs/solana-rpc-client/3.0.0/solana_rpc_client/rpc_client/struct.RpcClient.html#method.new_mock
66
67#[cfg(feature = "async")]
68pub use nonblocking::ThreadSafeSigner;
69pub use {
70    anchor_lang,
71    cluster::Cluster,
72    solana_commitment_config::CommitmentConfig,
73    solana_instruction::Instruction,
74    solana_program::hash::Hash,
75    solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
76    solana_rpc_client_api::{
77        client_error::Error as SolanaClientError, config::RpcSendTransactionConfig,
78        filter::RpcFilterType,
79    },
80    solana_signer::{Signer, SignerError},
81    solana_transaction::Transaction,
82};
83use {
84    anchor_lang::{
85        solana_program::{program_error::ProgramError, pubkey::Pubkey},
86        AccountDeserialize, Discriminator, InstructionData, ToAccountMetas,
87    },
88    futures::{Future, StreamExt},
89    regex::Regex,
90    solana_account_decoder::{UiAccount, UiAccountEncoding},
91    solana_instruction::AccountMeta,
92    solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
93    solana_rpc_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient,
94    solana_rpc_client_api::{
95        config::{
96            RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
97            RpcTransactionLogsFilter,
98        },
99        filter::Memcmp,
100        response::{Response as RpcResponse, RpcLogsResponse},
101    },
102    solana_signature::Signature,
103    std::{iter::Map, marker::PhantomData, ops::Deref, pin::Pin, sync::Arc, vec::IntoIter},
104    thiserror::Error,
105    tokio::{
106        runtime::Handle,
107        sync::{
108            mpsc::{unbounded_channel, UnboundedReceiver},
109            OnceCell,
110        },
111        task::JoinHandle,
112    },
113};
114
115mod cluster;
116
117#[cfg(not(feature = "async"))]
118mod blocking;
119#[cfg(feature = "async")]
120mod nonblocking;
121
122const PROGRAM_LOG: &str = "Program log: ";
123const PROGRAM_DATA: &str = "Program data: ";
124
125type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
126/// Client defines the base configuration for building RPC clients to
127/// communicate with Anchor programs running on a Solana cluster. It's
128/// primary use is to build a `Program` client via the `program` method.
129pub struct Client<C> {
130    cfg: Config<C>,
131}
132
133impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
134    pub fn new(cluster: Cluster, payer: C) -> Self {
135        Self {
136            cfg: Config {
137                cluster,
138                payer,
139                options: None,
140            },
141        }
142    }
143
144    pub fn new_with_options(cluster: Cluster, payer: C, options: CommitmentConfig) -> Self {
145        Self {
146            cfg: Config {
147                cluster,
148                payer,
149                options: Some(options),
150            },
151        }
152    }
153
154    pub fn program(
155        &self,
156        program_id: Pubkey,
157        #[cfg(feature = "mock")] rpc_client: AsyncRpcClient,
158    ) -> Result<Program<C>, ClientError> {
159        let cfg = Config {
160            cluster: self.cfg.cluster.clone(),
161            options: self.cfg.options,
162            payer: self.cfg.payer.clone(),
163        };
164
165        Program::new(
166            program_id,
167            cfg,
168            #[cfg(feature = "mock")]
169            rpc_client,
170        )
171    }
172}
173
174/// Auxiliary data structure to align the types of the Solana CLI utils with Anchor client.
175/// Client<C> implementation requires <C: Clone + Deref<Target = impl Signer>> which does not comply with Box<dyn Signer>
176/// that's used when loaded Signer from keypair file. This struct is used to wrap the usage.
177pub struct DynSigner(pub Arc<dyn Signer>);
178
179impl Signer for DynSigner {
180    fn pubkey(&self) -> Pubkey {
181        self.0.pubkey()
182    }
183
184    fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
185        self.0.try_pubkey()
186    }
187
188    fn sign_message(&self, message: &[u8]) -> Signature {
189        self.0.sign_message(message)
190    }
191
192    fn try_sign_message(&self, message: &[u8]) -> Result<Signature, SignerError> {
193        self.0.try_sign_message(message)
194    }
195
196    fn is_interactive(&self) -> bool {
197        self.0.is_interactive()
198    }
199}
200
201// Internal configuration for a client.
202#[derive(Debug)]
203pub struct Config<C> {
204    cluster: Cluster,
205    payer: C,
206    options: Option<CommitmentConfig>,
207}
208
209pub struct EventUnsubscriber<'a> {
210    handle: JoinHandle<Result<(), ClientError>>,
211    rx: UnboundedReceiver<UnsubscribeFn>,
212    #[cfg(not(feature = "async"))]
213    runtime_handle: &'a Handle,
214    _lifetime_marker: PhantomData<&'a Handle>,
215}
216
217impl EventUnsubscriber<'_> {
218    async fn unsubscribe_internal(mut self) {
219        if let Some(unsubscribe) = self.rx.recv().await {
220            unsubscribe().await;
221        }
222
223        let _ = self.handle.await;
224    }
225}
226
227/// Program is the primary client handle to be used to build and send requests.
228pub struct Program<C> {
229    program_id: Pubkey,
230    cfg: Config<C>,
231    sub_client: OnceCell<Arc<PubsubClient>>,
232    #[cfg(not(feature = "async"))]
233    rt: tokio::runtime::Runtime,
234    internal_rpc_client: AsyncRpcClient,
235}
236
237impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
238    pub fn payer(&self) -> Pubkey {
239        self.cfg.payer.pubkey()
240    }
241
242    pub fn id(&self) -> Pubkey {
243        self.program_id
244    }
245
246    #[cfg(feature = "mock")]
247    pub fn internal_rpc(&self) -> &AsyncRpcClient {
248        &self.internal_rpc_client
249    }
250
251    async fn account_internal<T: AccountDeserialize>(
252        &self,
253        address: Pubkey,
254    ) -> Result<T, ClientError> {
255        let account = self
256            .internal_rpc_client
257            .get_account_with_commitment(&address, CommitmentConfig::processed())
258            .await
259            .map_err(Box::new)?
260            .value
261            .ok_or(ClientError::AccountNotFound)?;
262        let mut data: &[u8] = &account.data;
263        T::try_deserialize(&mut data).map_err(Into::into)
264    }
265
266    async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
267        &self,
268        filters: Vec<RpcFilterType>,
269    ) -> Result<ProgramAccountsIterator<T>, ClientError> {
270        let account_type_filter =
271            RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, T::DISCRIMINATOR));
272        let config = RpcProgramAccountsConfig {
273            filters: Some([vec![account_type_filter], filters].concat()),
274            account_config: RpcAccountInfoConfig {
275                encoding: Some(UiAccountEncoding::Base64),
276                ..RpcAccountInfoConfig::default()
277            },
278            ..RpcProgramAccountsConfig::default()
279        };
280
281        Ok(ProgramAccountsIterator {
282            inner: self
283                .internal_rpc_client
284                .get_program_ui_accounts_with_config(&self.id(), config)
285                .await
286                .map_err(Box::new)?
287                .into_iter()
288                .map(|(key, account)| {
289                    let data = account
290                        .data
291                        .decode()
292                        .expect("account was fetched with binary encoding");
293                    Ok((key, T::try_deserialize(&mut data.as_slice())?))
294                }),
295        })
296    }
297
298    async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
299        &self,
300        mut f: impl FnMut(&EventContext, T) + Send + 'static,
301    ) -> Result<
302        (
303            JoinHandle<Result<(), ClientError>>,
304            UnboundedReceiver<UnsubscribeFn>,
305        ),
306        ClientError,
307    > {
308        let client = self
309            .sub_client
310            .get_or_try_init(|| async {
311                PubsubClient::new(self.cfg.cluster.ws_url())
312                    .await
313                    .map(Arc::new)
314                    .map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e)))
315            })
316            .await?
317            .clone();
318
319        let (tx, rx) = unbounded_channel::<_>();
320        let config = RpcTransactionLogsConfig {
321            commitment: self.cfg.options,
322        };
323        let program_id_str = self.program_id.to_string();
324        let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
325
326        let handle = tokio::spawn(async move {
327            let (mut notifications, unsubscribe) = client
328                .logs_subscribe(filter, config)
329                .await
330                .map_err(Box::new)?;
331
332            tx.send(unsubscribe).map_err(|e| {
333                ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed {
334                    message: "Unsubscribe failed".to_string(),
335                    reason: e.to_string(),
336                }))
337            })?;
338
339            while let Some(logs) = notifications.next().await {
340                let ctx = EventContext {
341                    signature: logs.value.signature.parse().unwrap(),
342                    slot: logs.context.slot,
343                };
344                let events = parse_logs_response(logs, &program_id_str)?;
345                for e in events {
346                    f(&ctx, e);
347                }
348            }
349            Ok::<(), ClientError>(())
350        });
351
352        Ok((handle, rx))
353    }
354}
355
356/// Iterator with items of type (Pubkey, T). Used to lazily deserialize account structs.
357/// Wrapper type hides the inner type from usages so the implementation can be changed.
358pub struct ProgramAccountsIterator<T> {
359    inner: Map<IntoIter<(Pubkey, UiAccount)>, AccountConverterFunction<T>>,
360}
361
362/// Function type that accepts solana accounts and returns deserialized anchor accounts
363type AccountConverterFunction<T> = fn((Pubkey, UiAccount)) -> Result<(Pubkey, T), ClientError>;
364
365impl<T> Iterator for ProgramAccountsIterator<T> {
366    type Item = Result<(Pubkey, T), ClientError>;
367
368    fn next(&mut self) -> Option<Self::Item> {
369        self.inner.next()
370    }
371}
372
373pub fn handle_program_log<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
374    self_program_str: &str,
375    l: &str,
376) -> Result<(Option<T>, Option<String>, bool), ClientError> {
377    use {
378        anchor_lang::__private::base64,
379        base64::{engine::general_purpose::STANDARD, Engine},
380    };
381
382    // Log emitted from the current program.
383    if let Some(log) = l
384        .strip_prefix(PROGRAM_LOG)
385        .or_else(|| l.strip_prefix(PROGRAM_DATA))
386    {
387        let log_bytes = match STANDARD.decode(log) {
388            Ok(log_bytes) => log_bytes,
389            _ => {
390                #[cfg(feature = "debug")]
391                println!("Could not base64 decode log: {}", log);
392                return Ok((None, None, false));
393            }
394        };
395
396        let event = log_bytes
397            .starts_with(T::DISCRIMINATOR)
398            .then(|| {
399                let mut data = &log_bytes[T::DISCRIMINATOR.len()..];
400                T::deserialize(&mut data).map_err(|e| ClientError::LogParseError(e.to_string()))
401            })
402            .transpose()?;
403
404        Ok((event, None, false))
405    }
406    // System log.
407    else {
408        let (program, did_pop) = handle_system_log(self_program_str, l);
409        Ok((None, program, did_pop))
410    }
411}
412
413pub fn handle_system_log(this_program_str: &str, log: &str) -> (Option<String>, bool) {
414    if log.starts_with(&format!("Program {this_program_str} log:")) {
415        (Some(this_program_str.to_string()), false)
416
417        // `Invoke [1]` instructions are pushed to the stack in `parse_logs_response`,
418        // so this ensures we only push CPIs to the stack at this stage
419    } else if log.contains("invoke") && !log.ends_with("[1]") {
420        (Some("cpi".to_string()), false) // Any string will do.
421    } else {
422        let re = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) success$").unwrap();
423        if re.is_match(log) {
424            (None, true)
425        } else {
426            (None, false)
427        }
428    }
429}
430
431pub struct Execution {
432    stack: Vec<String>,
433}
434
435impl Execution {
436    pub fn new(logs: &mut &[String]) -> Result<Self, ClientError> {
437        let l = &logs[0];
438        *logs = &logs[1..];
439
440        let re = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap();
441        let c = re
442            .captures(l)
443            .ok_or_else(|| ClientError::LogParseError(l.to_string()))?;
444        let program = c
445            .get(1)
446            .ok_or_else(|| ClientError::LogParseError(l.to_string()))?
447            .as_str()
448            .to_string();
449        Ok(Self {
450            stack: vec![program],
451        })
452    }
453
454    pub fn program(&self) -> String {
455        assert!(!self.stack.is_empty());
456        self.stack[self.stack.len() - 1].clone()
457    }
458
459    pub fn push(&mut self, new_program: String) {
460        self.stack.push(new_program);
461    }
462
463    pub fn pop(&mut self) {
464        assert!(!self.stack.is_empty());
465        self.stack.pop().unwrap();
466    }
467}
468
469#[derive(Debug)]
470pub struct EventContext {
471    pub signature: Signature,
472    pub slot: u64,
473}
474
475#[derive(Debug, Error)]
476pub enum ClientError {
477    #[error("Account not found")]
478    AccountNotFound,
479    #[error("{0}")]
480    AnchorError(#[from] anchor_lang::error::Error),
481    #[error("{0}")]
482    ProgramError(#[from] ProgramError),
483    #[error("{0}")]
484    SolanaClientError(#[from] Box<SolanaClientError>),
485    #[error("{0}")]
486    SolanaClientPubsubError(#[from] Box<PubsubClientError>),
487    #[error("Unable to parse log: {0}")]
488    LogParseError(String),
489    #[error(transparent)]
490    IOError(#[from] std::io::Error),
491    #[error("{0}")]
492    SignerError(#[from] SignerError),
493}
494
495pub trait AsSigner {
496    fn as_signer(&self) -> &dyn Signer;
497}
498
499impl AsSigner for Box<dyn Signer + '_> {
500    fn as_signer(&self) -> &dyn Signer {
501        self.as_ref()
502    }
503}
504
505/// `RequestBuilder` provides a builder interface to create and send
506/// transactions to a cluster.
507pub struct RequestBuilder<'a, C, S: 'a> {
508    cluster: String,
509    program_id: Pubkey,
510    accounts: Vec<AccountMeta>,
511    options: CommitmentConfig,
512    instructions: Vec<Instruction>,
513    payer: C,
514    instruction_data: Option<Vec<u8>>,
515    signers: Vec<S>,
516    #[cfg(not(feature = "async"))]
517    handle: &'a Handle,
518    internal_rpc_client: &'a AsyncRpcClient,
519    _phantom: PhantomData<&'a ()>,
520}
521
522// Shared implementation for all RequestBuilders
523impl<C: Deref<Target = impl Signer> + Clone, S: AsSigner> RequestBuilder<'_, C, S> {
524    #[must_use]
525    pub fn payer(mut self, payer: C) -> Self {
526        self.payer = payer;
527        self
528    }
529
530    #[must_use]
531    pub fn cluster(mut self, url: &str) -> Self {
532        self.cluster = url.to_string();
533        self
534    }
535
536    #[must_use]
537    pub fn instruction(mut self, ix: Instruction) -> Self {
538        self.instructions.push(ix);
539        self
540    }
541
542    #[must_use]
543    pub fn program(mut self, program_id: Pubkey) -> Self {
544        self.program_id = program_id;
545        self
546    }
547
548    /// Set the accounts to pass to the instruction.
549    ///
550    /// `accounts` argument can be:
551    ///
552    /// - Any type that implements [`ToAccountMetas`] trait
553    /// - A vector of [`AccountMeta`]s (for remaining accounts)
554    ///
555    /// Note that the given accounts are appended to the previous list of accounts instead of
556    /// overriding the existing ones (if any).
557    ///
558    /// # Example
559    ///
560    /// ```ignore
561    /// program
562    ///     .request()
563    ///     // Regular accounts
564    ///     .accounts(accounts::Initialize {
565    ///         my_account: my_account_kp.pubkey(),
566    ///         payer: program.payer(),
567    ///         system_program: system_program::ID,
568    ///     })
569    ///     // Remaining accounts
570    ///     .accounts(vec![AccountMeta {
571    ///         pubkey: remaining,
572    ///         is_signer: true,
573    ///         is_writable: true,
574    ///     }])
575    ///     .args(instruction::Initialize { field: 42 })
576    ///     .send()?;
577    /// ```
578    #[must_use]
579    pub fn accounts(mut self, accounts: impl ToAccountMetas) -> Self {
580        let mut metas = accounts.to_account_metas(None);
581        self.accounts.append(&mut metas);
582        self
583    }
584
585    #[must_use]
586    pub fn options(mut self, options: CommitmentConfig) -> Self {
587        self.options = options;
588        self
589    }
590
591    #[must_use]
592    pub fn args(mut self, args: impl InstructionData) -> Self {
593        self.instruction_data = Some(args.data());
594        self
595    }
596
597    pub fn instructions(&self) -> Vec<Instruction> {
598        let mut instructions = self.instructions.clone();
599        if let Some(ix_data) = &self.instruction_data {
600            instructions.push(Instruction {
601                program_id: self.program_id,
602                data: ix_data.clone(),
603                accounts: self.accounts.clone(),
604            });
605        }
606
607        instructions
608    }
609
610    fn signed_transaction_with_blockhash(
611        &self,
612        latest_hash: Hash,
613    ) -> Result<Transaction, ClientError> {
614        let signers: Vec<&dyn Signer> = self.signers.iter().map(|s| s.as_signer()).collect();
615        let mut all_signers = signers;
616        all_signers.push(&*self.payer);
617
618        let mut tx = self.transaction();
619        tx.try_sign(&all_signers, latest_hash)?;
620
621        Ok(tx)
622    }
623
624    pub fn transaction(&self) -> Transaction {
625        let instructions = &self.instructions();
626        Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()))
627    }
628
629    async fn signed_transaction_internal(&self) -> Result<Transaction, ClientError> {
630        let latest_hash = self
631            .internal_rpc_client
632            .get_latest_blockhash()
633            .await
634            .map_err(Box::new)?;
635
636        let tx = self.signed_transaction_with_blockhash(latest_hash)?;
637        Ok(tx)
638    }
639
640    async fn send_internal(&self) -> Result<Signature, ClientError> {
641        let latest_hash = self
642            .internal_rpc_client
643            .get_latest_blockhash()
644            .await
645            .map_err(Box::new)?;
646        let tx = self.signed_transaction_with_blockhash(latest_hash)?;
647
648        self.internal_rpc_client
649            .send_and_confirm_transaction(&tx)
650            .await
651            .map_err(|e| Box::new(e).into())
652    }
653
654    async fn send_with_spinner_and_config_internal(
655        &self,
656        config: RpcSendTransactionConfig,
657    ) -> Result<Signature, ClientError> {
658        let latest_hash = self
659            .internal_rpc_client
660            .get_latest_blockhash()
661            .await
662            .map_err(Box::new)?;
663        let tx = self.signed_transaction_with_blockhash(latest_hash)?;
664
665        self.internal_rpc_client
666            .send_and_confirm_transaction_with_spinner_and_config(
667                &tx,
668                self.internal_rpc_client.commitment(),
669                config,
670            )
671            .await
672            .map_err(|e| Box::new(e).into())
673    }
674}
675
676fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
677    logs: RpcResponse<RpcLogsResponse>,
678    program_id_str: &str,
679) -> Result<Vec<T>, ClientError> {
680    let mut logs = &logs.value.logs[..];
681    let mut events: Vec<T> = Vec::new();
682    if !logs.is_empty() {
683        if let Ok(mut execution) = Execution::new(&mut logs) {
684            // Create a new peekable iterator so that we can peek at the next log whilst iterating
685            let mut logs_iter = logs.iter().peekable();
686            let regex = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap();
687
688            while let Some(l) = logs_iter.next() {
689                // Parse the log.
690                let (event, new_program, did_pop) = {
691                    if program_id_str == execution.program() {
692                        handle_program_log(program_id_str, l)?
693                    } else {
694                        let (program, did_pop) = handle_system_log(program_id_str, l);
695                        (None, program, did_pop)
696                    }
697                };
698                // Emit the event.
699                if let Some(e) = event {
700                    events.push(e);
701                }
702                // Switch program context on CPI.
703                if let Some(new_program) = new_program {
704                    execution.push(new_program);
705                }
706                // Program returned.
707                if did_pop {
708                    execution.pop();
709
710                    // If the current iteration popped then it means there was a
711                    //`Program x success` log. If the next log in the iteration is
712                    // of depth [1] then we're not within a CPI and this is a new instruction.
713                    //
714                    // We need to ensure that the `Execution` instance is updated with
715                    // the next program ID, or else `execution.program()` will cause
716                    // a panic during the next iteration.
717                    if let Some(&next_log) = logs_iter.peek() {
718                        if next_log.ends_with("invoke [1]") {
719                            let next_instruction =
720                                regex.captures(next_log).unwrap().get(1).unwrap().as_str();
721                            // Within this if block, there will always be a regex match.
722                            // Therefore it's safe to unwrap and the captured program ID
723                            // at index 1 can also be safely unwrapped.
724                            execution.push(next_instruction.to_string());
725                        }
726                    };
727                }
728            }
729        }
730    }
731    Ok(events)
732}
733
734#[cfg(test)]
735mod tests {
736    // Creating a mock struct that implements `anchor_lang::events`
737    // for type inference in `test_logs`
738    use {
739        anchor_lang::prelude::*,
740        futures::{SinkExt, StreamExt},
741        solana_rpc_client_api::response::RpcResponseContext,
742        std::sync::atomic::{AtomicU64, Ordering},
743        tokio_tungstenite::tungstenite::Message,
744    };
745    #[derive(Debug, Clone, Copy)]
746    #[event]
747    pub struct MockEvent {}
748
749    use super::*;
750    #[test]
751    fn new_execution() {
752        let mut logs: &[String] =
753            &["Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw invoke [1]".to_string()];
754        let exe = Execution::new(&mut logs).unwrap();
755        assert_eq!(
756            exe.stack[0],
757            "7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw".to_string()
758        );
759    }
760
761    #[test]
762    fn handle_system_log_pop() {
763        let log = "Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw success";
764        let (program, did_pop) = handle_system_log("asdf", log);
765        assert_eq!(program, None);
766        assert!(did_pop);
767    }
768
769    #[test]
770    fn handle_system_log_no_pop() {
771        let log = "Program 7swsTUiQ6KUK4uFYquQKg4epFRsBnvbrTf2fZQCa2sTJ qwer";
772        let (program, did_pop) = handle_system_log("asdf", log);
773        assert_eq!(program, None);
774        assert!(!did_pop);
775    }
776
777    #[test]
778    fn test_parse_logs_response() -> Result<()> {
779        // Mock logs received within an `RpcResponse`. These are based on a Jupiter transaction.
780        let logs = vec![
781            "Program VeryCoolProgram invoke [1]", // Outer instruction #1 starts
782            "Program log: Instruction: VeryCoolEvent",
783            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
784            "Program log: Instruction: Transfer",
785            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 664387 compute \
786             units",
787            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
788            "Program VeryCoolProgram consumed 42417 of 700000 compute units",
789            "Program VeryCoolProgram success", // Outer instruction #1 ends
790            "Program EvenCoolerProgram invoke [1]", // Outer instruction #2 starts
791            "Program log: Instruction: EvenCoolerEvent",
792            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
793            "Program log: Instruction: TransferChecked",
794            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6200 of 630919 compute \
795             units",
796            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
797            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
798            "Program log: Instruction: Swap",
799            "Program log: INVARIANT: SWAP",
800            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
801            "Program log: Instruction: Transfer",
802            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 539321 compute \
803             units",
804            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
805            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
806            "Program log: Instruction: Transfer",
807            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 531933 compute \
808             units",
809            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
810            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 84670 of 610768 \
811             compute units",
812            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
813            "Program EvenCoolerProgram invoke [2]",
814            "Program EvenCoolerProgram consumed 2021 of 523272 compute units",
815            "Program EvenCoolerProgram success",
816            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
817            "Program log: Instruction: Swap",
818            "Program log: INVARIANT: SWAP",
819            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
820            "Program log: Instruction: Transfer",
821            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 418618 compute \
822             units",
823            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
824            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
825            "Program log: Instruction: Transfer",
826            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 411230 compute \
827             units",
828            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
829            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 102212 of 507607 \
830             compute units",
831            "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
832            "Program EvenCoolerProgram invoke [2]",
833            "Program EvenCoolerProgram consumed 2021 of 402569 compute units",
834            "Program EvenCoolerProgram success",
835            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP invoke [2]",
836            "Program log: Instruction: Swap",
837            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
838            "Program log: Instruction: Transfer",
839            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 371140 compute \
840             units",
841            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
842            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
843            "Program log: Instruction: MintTo",
844            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4492 of 341800 compute \
845             units",
846            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
847            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
848            "Program log: Instruction: Transfer",
849            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 334370 compute \
850             units",
851            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
852            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP consumed 57610 of 386812 \
853             compute units",
854            "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP success",
855            "Program EvenCoolerProgram invoke [2]",
856            "Program EvenCoolerProgram consumed 2021 of 326438 compute units",
857            "Program EvenCoolerProgram success",
858            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
859            "Program log: Instruction: TransferChecked",
860            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6173 of 319725 compute \
861             units",
862            "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
863            "Program EvenCoolerProgram consumed 345969 of 657583 compute units",
864            "Program EvenCoolerProgram success", // Outer instruction #2 ends
865            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
866            "Program ComputeBudget111111111111111111111111111111 success",
867            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
868            "Program ComputeBudget111111111111111111111111111111 success",
869        ];
870
871        // Converting to Vec<String> as expected in `RpcLogsResponse`
872        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
873
874        let program_id_str = "VeryCoolProgram";
875
876        // No events returned here. Just ensuring that the function doesn't panic
877        // due an incorrectly emptied stack.
878        parse_logs_response::<MockEvent>(
879            RpcResponse {
880                context: RpcResponseContext::new(0),
881                value: RpcLogsResponse {
882                    signature: "".to_string(),
883                    err: None,
884                    logs: logs.to_vec(),
885                },
886            },
887            program_id_str,
888        )
889        .unwrap();
890
891        Ok(())
892    }
893
894    #[test]
895    fn test_parse_logs_response_fake_pop() -> Result<()> {
896        let logs = [
897            "Program fake111111111111111111111111111111111111112 invoke [1]",
898            "Program log: i logged success",
899            "Program log: i logged success",
900            "Program fake111111111111111111111111111111111111112 consumed 1411 of 200000 compute \
901             units",
902            "Program fake111111111111111111111111111111111111112 success",
903        ];
904
905        // Converting to Vec<String> as expected in `RpcLogsResponse`
906        let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
907
908        let program_id_str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb";
909
910        // No events returned here. Just ensuring that the function doesn't panic
911        // due an incorrectly emptied stack.
912        parse_logs_response::<MockEvent>(
913            RpcResponse {
914                context: RpcResponseContext::new(0),
915                value: RpcLogsResponse {
916                    signature: "".to_string(),
917                    err: None,
918                    logs: logs.to_vec(),
919                },
920            },
921            program_id_str,
922        )
923        .unwrap();
924
925        Ok(())
926    }
927
928    /// Regression test that registering multiple event listeners does not deadlock.
929    #[test]
930    fn multiple_listeners_no_deadlock() {
931        // Spin up a tiny mock websocket server that responds to `logsSubscribe`
932        // JSON-RPC requests with a valid subscription id.
933        let rt = tokio::runtime::Builder::new_multi_thread()
934            .enable_all()
935            .build()
936            .unwrap();
937
938        let (addr_tx, addr_rx) = std::sync::mpsc::channel();
939
940        rt.spawn(async move {
941            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
942            let addr = listener.local_addr().unwrap();
943            addr_tx.send(addr).unwrap();
944
945            static SUB_ID: AtomicU64 = AtomicU64::new(0);
946
947            loop {
948                let (stream, _) = listener.accept().await.unwrap();
949                tokio::spawn(async move {
950                    let mut ws = tokio_tungstenite::accept_async(stream).await.unwrap();
951                    while let Some(Ok(Message::Text(_))) = ws.next().await {
952                        let sub_id = SUB_ID.fetch_add(1, Ordering::Relaxed);
953                        // The PubsubClient sends sequential integer ids starting at 0.
954                        let resp =
955                            format!(r#"{{"jsonrpc":"2.0","result":{sub_id},"id":{sub_id}}}"#);
956                        ws.send(Message::Text(resp.into())).await.unwrap();
957                    }
958                });
959            }
960        });
961
962        let addr = addr_rx.recv().unwrap();
963        let ws_url = format!("ws://{}", addr);
964
965        let client = super::Client::new(
966            super::Cluster::Custom(ws_url.clone(), ws_url),
967            std::sync::Arc::new(solana_keypair::Keypair::new()),
968        );
969        let program = client.program(Pubkey::new_unique()).unwrap();
970
971        // With the old RwLock-based code, the second call would deadlock.
972        // Use a timeout to ensure the test fails instead of hanging forever.
973        let (done_tx, done_rx) = std::sync::mpsc::channel();
974        let handle = std::thread::spawn(move || {
975            #[cfg(not(feature = "async"))]
976            {
977                let _listener1 = program
978                    .on::<MockEvent>(|_ctx, _event| {})
979                    .expect("first listener");
980
981                let _listener2 = program
982                    .on::<MockEvent>(|_ctx, _event| {})
983                    .expect("second listener");
984            }
985
986            #[cfg(feature = "async")]
987            {
988                let rt = tokio::runtime::Builder::new_current_thread()
989                    .enable_all()
990                    .build()
991                    .unwrap();
992                rt.block_on(async {
993                    let _listener1 = program
994                        .on::<MockEvent>(|_ctx, _event| {})
995                        .await
996                        .expect("first listener");
997
998                    let _listener2 = program
999                        .on::<MockEvent>(|_ctx, _event| {})
1000                        .await
1001                        .expect("second listener");
1002                });
1003            }
1004
1005            let _ = done_tx.send(());
1006        });
1007
1008        // If this times out, the deadlock is still present.
1009        done_rx
1010            .recv_timeout(std::time::Duration::from_secs(5))
1011            .expect("registering two listeners should not deadlock");
1012
1013        handle.join().unwrap();
1014    }
1015}