1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "async")]
68pub use nonblocking::ThreadSafeSigner;
69pub use {
70 anchor_lang,
71 cluster::Cluster,
72 solana_commitment_config::CommitmentConfig,
73 solana_hash::Hash,
74 solana_instruction::Instruction,
75 solana_message::AddressLookupTableAccount,
76 solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
77 solana_rpc_client_api::{
78 client_error::{Error as SolanaClientError, ErrorKind as SolanaClientErrorKind},
79 config::RpcSendTransactionConfig,
80 filter::RpcFilterType,
81 },
82 solana_signer::{Signer, SignerError},
83 solana_transaction::{versioned::VersionedTransaction, Transaction},
84};
85use {
86 anchor_lang::{
87 solana_program::{program_error::ProgramError, pubkey::Pubkey},
88 AccountDeserialize, Discriminator, InstructionData, ToAccountMetas,
89 },
90 futures::{Future, StreamExt},
91 regex::Regex,
92 solana_account_decoder::{UiAccount, UiAccountEncoding},
93 solana_instruction::AccountMeta,
94 solana_message::v0,
95 solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
96 solana_rpc_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient,
97 solana_rpc_client_api::{
98 config::{
99 RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
100 RpcTransactionLogsFilter,
101 },
102 filter::Memcmp,
103 response::{Response as RpcResponse, RpcLogsResponse},
104 },
105 solana_signature::Signature,
106 std::{
107 iter::Map,
108 marker::PhantomData,
109 ops::Deref,
110 pin::Pin,
111 sync::{Arc, LazyLock},
112 vec::IntoIter,
113 },
114 thiserror::Error,
115 tokio::{
116 runtime::Handle,
117 sync::{
118 mpsc::{unbounded_channel, UnboundedReceiver},
119 OnceCell,
120 },
121 task::JoinHandle,
122 },
123};
124
125mod cluster;
126
127#[derive(Debug, Clone, Default)]
129pub enum TxVersion<'a> {
130 #[default]
132 Legacy,
133 V0(&'a [AddressLookupTableAccount]),
135}
136
137#[cfg(not(feature = "async"))]
138mod blocking;
139#[cfg(feature = "async")]
140mod nonblocking;
141
142const PROGRAM_LOG: &str = "Program log: ";
143const PROGRAM_DATA: &str = "Program data: ";
144
145type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
146pub struct Client<C> {
150 cfg: Config<C>,
151}
152
153impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
154 pub fn new(cluster: Cluster, payer: C) -> Self {
155 Self {
156 cfg: Config {
157 cluster,
158 payer,
159 options: None,
160 },
161 }
162 }
163
164 pub fn new_with_options(cluster: Cluster, payer: C, options: CommitmentConfig) -> Self {
165 Self {
166 cfg: Config {
167 cluster,
168 payer,
169 options: Some(options),
170 },
171 }
172 }
173
174 pub fn program(
175 &self,
176 program_id: Pubkey,
177 #[cfg(feature = "mock")] rpc_client: AsyncRpcClient,
178 ) -> Result<Program<C>, ClientError> {
179 let cfg = Config {
180 cluster: self.cfg.cluster.clone(),
181 options: self.cfg.options,
182 payer: self.cfg.payer.clone(),
183 };
184
185 Program::new(
186 program_id,
187 cfg,
188 #[cfg(feature = "mock")]
189 rpc_client,
190 )
191 }
192}
193
194pub struct DynSigner(pub Arc<dyn Signer>);
198
199impl Signer for DynSigner {
200 fn pubkey(&self) -> Pubkey {
201 self.0.pubkey()
202 }
203
204 fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
205 self.0.try_pubkey()
206 }
207
208 fn sign_message(&self, message: &[u8]) -> Signature {
209 self.0.sign_message(message)
210 }
211
212 fn try_sign_message(&self, message: &[u8]) -> Result<Signature, SignerError> {
213 self.0.try_sign_message(message)
214 }
215
216 fn is_interactive(&self) -> bool {
217 self.0.is_interactive()
218 }
219}
220
221#[derive(Debug)]
223pub struct Config<C> {
224 cluster: Cluster,
225 payer: C,
226 options: Option<CommitmentConfig>,
227}
228
229pub struct EventUnsubscriber<'a> {
230 handle: JoinHandle<Result<(), ClientError>>,
231 rx: UnboundedReceiver<UnsubscribeFn>,
232 #[cfg(not(feature = "async"))]
233 runtime_handle: &'a Handle,
234 _lifetime_marker: PhantomData<&'a Handle>,
235}
236
237impl EventUnsubscriber<'_> {
238 async fn unsubscribe_internal(mut self) {
239 if let Some(unsubscribe) = self.rx.recv().await {
240 unsubscribe().await;
241 }
242
243 let _ = self.handle.await;
244 }
245}
246
247pub struct Program<C> {
249 program_id: Pubkey,
250 cfg: Config<C>,
251 sub_client: OnceCell<Arc<PubsubClient>>,
252 #[cfg(not(feature = "async"))]
253 rt: tokio::runtime::Runtime,
254 internal_rpc_client: AsyncRpcClient,
255}
256
257impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
258 pub fn payer(&self) -> Pubkey {
259 self.cfg.payer.pubkey()
260 }
261
262 pub fn id(&self) -> Pubkey {
263 self.program_id
264 }
265
266 #[cfg(feature = "mock")]
267 pub fn internal_rpc(&self) -> &AsyncRpcClient {
268 &self.internal_rpc_client
269 }
270
271 async fn account_internal<T: AccountDeserialize>(
272 &self,
273 address: Pubkey,
274 ) -> Result<T, ClientError> {
275 let account = self
276 .internal_rpc_client
277 .get_account_with_commitment(&address, CommitmentConfig::processed())
278 .await
279 .map_err(Box::new)?
280 .value
281 .ok_or(ClientError::AccountNotFound)?;
282 let mut data: &[u8] = &account.data;
283 T::try_deserialize(&mut data).map_err(Into::into)
284 }
285
286 async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
287 &self,
288 filters: Vec<RpcFilterType>,
289 ) -> Result<ProgramAccountsIterator<T>, ClientError> {
290 let account_type_filter =
291 RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, T::DISCRIMINATOR));
292 let config = RpcProgramAccountsConfig {
293 filters: Some([vec![account_type_filter], filters].concat()),
294 account_config: RpcAccountInfoConfig {
295 encoding: Some(UiAccountEncoding::Base64),
296 ..RpcAccountInfoConfig::default()
297 },
298 ..RpcProgramAccountsConfig::default()
299 };
300
301 Ok(ProgramAccountsIterator {
302 inner: self
303 .internal_rpc_client
304 .get_program_ui_accounts_with_config(&self.id(), config)
305 .await
306 .map_err(Box::new)?
307 .into_iter()
308 .map(|(key, account)| {
309 let data = account.data.decode().ok_or_else(|| {
310 ClientError::SolanaClientError(Box::new(
311 SolanaClientError::new_with_request(
312 SolanaClientErrorKind::Custom(
313 "Failed to decode account data".to_string(),
314 ),
315 solana_rpc_client_api::request::RpcRequest::GetProgramAccounts,
316 ),
317 ))
318 })?;
319 Ok((key, T::try_deserialize(&mut data.as_slice())?))
320 }),
321 })
322 }
323
324 async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
325 &self,
326 mut f: impl FnMut(&EventContext, T) + Send + 'static,
327 ) -> Result<
328 (
329 JoinHandle<Result<(), ClientError>>,
330 UnboundedReceiver<UnsubscribeFn>,
331 ),
332 ClientError,
333 > {
334 let client = self
335 .sub_client
336 .get_or_try_init(|| async {
337 PubsubClient::new(self.cfg.cluster.ws_url())
338 .await
339 .map(Arc::new)
340 .map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e)))
341 })
342 .await?
343 .clone();
344
345 let (tx, rx) = unbounded_channel::<_>();
346 let config = RpcTransactionLogsConfig {
347 commitment: self.cfg.options,
348 };
349 let program_id_str = self.program_id.to_string();
350 let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
351
352 let handle = tokio::spawn(async move {
353 let (mut notifications, unsubscribe) = client
354 .logs_subscribe(filter, config)
355 .await
356 .map_err(Box::new)?;
357
358 tx.send(unsubscribe).map_err(|e| {
359 ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed {
360 message: "Unsubscribe failed".to_string(),
361 reason: e.to_string(),
362 }))
363 })?;
364
365 while let Some(logs) = notifications.next().await {
366 let signature: Signature = logs.value.signature.parse().map_err(|e| {
367 ClientError::LogParseError(format!(
368 "Invalid signature '{}': {e}",
369 logs.value.signature
370 ))
371 })?;
372 let ctx = EventContext {
373 signature,
374 slot: logs.context.slot,
375 };
376 let events = parse_logs_response(logs, &program_id_str)?;
377 for e in events {
378 f(&ctx, e);
379 }
380 }
381 Ok::<(), ClientError>(())
382 });
383
384 Ok((handle, rx))
385 }
386}
387
388pub struct ProgramAccountsIterator<T> {
391 inner: Map<IntoIter<(Pubkey, UiAccount)>, AccountConverterFunction<T>>,
392}
393
394type AccountConverterFunction<T> = fn((Pubkey, UiAccount)) -> Result<(Pubkey, T), ClientError>;
396
397impl<T> Iterator for ProgramAccountsIterator<T> {
398 type Item = Result<(Pubkey, T), ClientError>;
399
400 fn next(&mut self) -> Option<Self::Item> {
401 self.inner.next()
402 }
403}
404
405pub fn handle_program_log<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
406 self_program_str: &str,
407 l: &str,
408) -> Result<(Option<T>, Option<String>, bool), ClientError> {
409 use {
410 anchor_lang::__private::base64,
411 base64::{engine::general_purpose::STANDARD, Engine},
412 };
413
414 if let Some(log) = l
416 .strip_prefix(PROGRAM_LOG)
417 .or_else(|| l.strip_prefix(PROGRAM_DATA))
418 {
419 let log_bytes = match STANDARD.decode(log) {
420 Ok(log_bytes) => log_bytes,
421 _ => {
422 #[cfg(feature = "debug")]
423 println!("Could not base64 decode log: {}", log);
424 return Ok((None, None, false));
425 }
426 };
427
428 let event = log_bytes
429 .starts_with(T::DISCRIMINATOR)
430 .then(|| {
431 let mut data = &log_bytes[T::DISCRIMINATOR.len()..];
432 T::deserialize(&mut data).map_err(|e| ClientError::LogParseError(e.to_string()))
433 })
434 .transpose()?;
435
436 Ok((event, None, false))
437 }
438 else {
440 let (program, did_pop) = handle_system_log(self_program_str, l);
441 Ok((None, program, did_pop))
442 }
443}
444
445pub fn handle_system_log(this_program_str: &str, log: &str) -> (Option<String>, bool) {
446 static INVOKE_RE: LazyLock<Regex> = LazyLock::new(|| {
447 Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[([\d]+)\]$").unwrap()
448 });
449 if let Some(invoke_match) = INVOKE_RE.captures(log) {
450 if invoke_match.get(1).unwrap().as_str() == this_program_str {
451 return (Some(this_program_str.to_string()), false);
452
453 } else if invoke_match.get(2).unwrap().as_str() != "1" {
456 return (Some("cpi".to_string()), false); }
458 }
459
460 if log.starts_with(&format!("Program {this_program_str} log:")) {
461 (Some(this_program_str.to_string()), false)
462 } else {
463 static SUCESS_RE: LazyLock<Regex> =
464 LazyLock::new(|| Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) success$").unwrap());
465 if SUCESS_RE.is_match(log) {
466 (None, true)
467 } else {
468 (None, false)
469 }
470 }
471}
472
473pub struct Execution {
474 stack: Vec<String>,
475}
476
477impl Execution {
478 pub fn new(logs: &mut &[String]) -> Result<Self, ClientError> {
479 let l = &logs[0];
480 *logs = &logs[1..];
481 static RE: LazyLock<Regex> = LazyLock::new(|| {
482 Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap()
483 });
484 let c = RE
485 .captures(l)
486 .ok_or_else(|| ClientError::LogParseError(l.to_string()))?;
487 let program = c
488 .get(1)
489 .ok_or_else(|| ClientError::LogParseError(l.to_string()))?
490 .as_str()
491 .to_string();
492 Ok(Self {
493 stack: vec![program],
494 })
495 }
496
497 pub fn program(&self) -> String {
498 assert!(!self.stack.is_empty());
499 self.stack[self.stack.len() - 1].clone()
500 }
501
502 pub fn push(&mut self, new_program: String) {
503 self.stack.push(new_program);
504 }
505
506 pub fn pop(&mut self) {
507 assert!(!self.stack.is_empty());
508 self.stack.pop().unwrap();
509 }
510}
511
512#[derive(Debug)]
513pub struct EventContext {
514 pub signature: Signature,
515 pub slot: u64,
516}
517
518#[derive(Debug, Error)]
519pub enum ClientError {
520 #[error("Account not found")]
521 AccountNotFound,
522 #[error("{0}")]
523 AnchorError(#[from] anchor_lang::error::Error),
524 #[error("{0}")]
525 ProgramError(#[from] ProgramError),
526 #[error("{0}")]
527 SolanaClientError(#[from] Box<SolanaClientError>),
528 #[error("{0}")]
529 SolanaClientPubsubError(#[from] Box<PubsubClientError>),
530 #[error("Unable to parse log: {0}")]
531 LogParseError(String),
532 #[error(transparent)]
533 IOError(#[from] std::io::Error),
534 #[error("{0}")]
535 SignerError(#[from] SignerError),
536}
537
538impl ClientError {
539 fn other<E>(e: E) -> Self
542 where
543 E: Into<Box<dyn std::error::Error + Send + Sync>>,
544 {
545 Self::IOError(std::io::Error::other(e))
546 }
547}
548
549pub trait AsSigner {
550 fn as_signer(&self) -> &dyn Signer;
551}
552
553impl AsSigner for Box<dyn Signer + '_> {
554 fn as_signer(&self) -> &dyn Signer {
555 self.as_ref()
556 }
557}
558
559pub struct RequestBuilder<'a, C, S: 'a> {
562 cluster: String,
563 program_id: Pubkey,
564 accounts: Vec<AccountMeta>,
565 options: CommitmentConfig,
566 instructions: Vec<Instruction>,
567 payer: C,
568 instruction_data: Option<Vec<u8>>,
569 signers: Vec<S>,
570 #[cfg(not(feature = "async"))]
571 handle: &'a Handle,
572 internal_rpc_client: &'a AsyncRpcClient,
573 _phantom: PhantomData<&'a ()>,
574}
575
576impl<C: Deref<Target = impl Signer> + Clone, S: AsSigner> RequestBuilder<'_, C, S> {
578 #[must_use]
579 pub fn payer(mut self, payer: C) -> Self {
580 self.payer = payer;
581 self
582 }
583
584 #[must_use]
585 pub fn cluster(mut self, url: &str) -> Self {
586 self.cluster = url.to_string();
587 self
588 }
589
590 #[must_use]
591 pub fn instruction(mut self, ix: Instruction) -> Self {
592 self.instructions.push(ix);
593 self
594 }
595
596 #[must_use]
597 pub fn program(mut self, program_id: Pubkey) -> Self {
598 self.program_id = program_id;
599 self
600 }
601
602 #[must_use]
633 pub fn accounts(mut self, accounts: impl ToAccountMetas) -> Self {
634 let mut metas = accounts.to_account_metas(None);
635 self.accounts.append(&mut metas);
636 self
637 }
638
639 #[must_use]
640 pub fn options(mut self, options: CommitmentConfig) -> Self {
641 self.options = options;
642 self
643 }
644
645 #[must_use]
646 pub fn args(mut self, args: impl InstructionData) -> Self {
647 self.instruction_data = Some(args.data());
648 self
649 }
650
651 pub fn instructions(&self) -> Vec<Instruction> {
652 let mut instructions = self.instructions.clone();
653 if let Some(ix_data) = &self.instruction_data {
654 instructions.push(Instruction {
655 program_id: self.program_id,
656 data: ix_data.clone(),
657 accounts: self.accounts.clone(),
658 });
659 }
660
661 instructions
662 }
663
664 pub fn transaction(&self) -> Transaction {
669 let instructions = &self.instructions();
670 Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()))
671 }
672
673 pub fn transaction_versioned(
708 &self,
709 version: TxVersion<'_>,
710 recent_blockhash: Hash,
711 ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
712 let instructions = self.instructions();
713 let payer = self.payer.pubkey();
714
715 match version {
716 TxVersion::Legacy => {
717 let message = solana_message::legacy::Message::new_with_blockhash(
718 &instructions,
719 Some(&payer),
720 &recent_blockhash,
721 );
722 Ok(solana_transaction::versioned::VersionedTransaction {
723 signatures: vec![
724 solana_signature::Signature::default();
725 message.header.num_required_signatures as usize
726 ],
727 message: solana_message::VersionedMessage::Legacy(message),
728 })
729 }
730 TxVersion::V0(address_lookup_table_accounts) => {
731 let message = v0::Message::try_compile(
732 &payer,
733 &instructions,
734 address_lookup_table_accounts,
735 recent_blockhash,
736 )
737 .map_err(ClientError::other)?;
738 Ok(solana_transaction::versioned::VersionedTransaction {
739 signatures: vec![
740 solana_signature::Signature::default();
741 message.header.num_required_signatures as usize
742 ],
743 message: solana_message::VersionedMessage::V0(message),
744 })
745 }
746 }
747 }
748
749 fn signed_transaction_with_blockhash_versioned(
750 &self,
751 version: TxVersion<'_>,
752 latest_hash: Hash,
753 ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
754 let signers: Vec<&dyn Signer> = self.signers.iter().map(|s| s.as_signer()).collect();
755 let mut all_signers = signers;
756 all_signers.push(&*self.payer);
757
758 let instructions = self.instructions();
759 let payer = self.payer.pubkey();
760
761 let message = match version {
762 TxVersion::Legacy => {
763 let msg = solana_message::legacy::Message::new_with_blockhash(
764 &instructions,
765 Some(&payer),
766 &latest_hash,
767 );
768 solana_message::VersionedMessage::Legacy(msg)
769 }
770 TxVersion::V0(address_lookup_table_accounts) => {
771 let msg = v0::Message::try_compile(
772 &payer,
773 &instructions,
774 address_lookup_table_accounts,
775 latest_hash,
776 )
777 .map_err(ClientError::other)?;
778 solana_message::VersionedMessage::V0(msg)
779 }
780 };
781
782 let tx =
783 solana_transaction::versioned::VersionedTransaction::try_new(message, &all_signers)?;
784
785 Ok(tx)
786 }
787
788 async fn signed_transaction_internal(
789 &self,
790 version: TxVersion<'_>,
791 ) -> Result<solana_transaction::versioned::VersionedTransaction, ClientError> {
792 let latest_hash = self
793 .internal_rpc_client
794 .get_latest_blockhash()
795 .await
796 .map_err(Box::new)?;
797
798 self.signed_transaction_with_blockhash_versioned(version, latest_hash)
799 }
800
801 async fn send_internal(&self, version: TxVersion<'_>) -> Result<Signature, ClientError> {
802 let latest_hash = self
803 .internal_rpc_client
804 .get_latest_blockhash()
805 .await
806 .map_err(Box::new)?;
807 let tx = self.signed_transaction_with_blockhash_versioned(version, latest_hash)?;
808
809 self.internal_rpc_client
810 .send_and_confirm_transaction(&tx)
811 .await
812 .map_err(|e| Box::new(e).into())
813 }
814
815 async fn send_with_spinner_and_config_internal(
816 &self,
817 version: TxVersion<'_>,
818 config: RpcSendTransactionConfig,
819 ) -> Result<Signature, ClientError> {
820 let latest_hash = self
821 .internal_rpc_client
822 .get_latest_blockhash()
823 .await
824 .map_err(Box::new)?;
825 let tx = self.signed_transaction_with_blockhash_versioned(version, latest_hash)?;
826
827 self.internal_rpc_client
828 .send_and_confirm_transaction_with_spinner_and_config(
829 &tx,
830 self.internal_rpc_client.commitment(),
831 config,
832 )
833 .await
834 .map_err(|e| Box::new(e).into())
835 }
836}
837
838fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
839 logs: RpcResponse<RpcLogsResponse>,
840 program_id_str: &str,
841) -> Result<Vec<T>, ClientError> {
842 let mut logs = &logs.value.logs[..];
843 let mut events: Vec<T> = Vec::new();
844 if !logs.is_empty() {
845 if let Ok(mut execution) = Execution::new(&mut logs) {
846 let mut logs_iter = logs.iter().peekable();
848 static RE: LazyLock<Regex> = LazyLock::new(|| {
849 Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[(\d+)\]$").unwrap()
850 });
851
852 while let Some(l) = logs_iter.next() {
853 let (event, new_program, did_pop) = {
855 if program_id_str == execution.program() {
856 handle_program_log(program_id_str, l)?
857 } else {
858 let (program, did_pop) = handle_system_log(program_id_str, l);
859 (None, program, did_pop)
860 }
861 };
862 if let Some(e) = event {
864 events.push(e);
865 }
866 if let Some(new_program) = new_program {
868 execution.push(new_program);
869 }
870 if did_pop {
872 execution.pop();
873
874 if let Some(&next_log) = logs_iter.peek() {
888 if let Some(caps) = RE.captures(next_log) {
889 if &caps[2] == "1" {
890 execution.push(caps[1].to_string());
891 }
892 }
893 };
894 }
895 }
896 }
897 }
898 Ok(events)
899}
900
901#[cfg(test)]
902mod tests {
903 use {
906 anchor_lang::{prelude::*, Event},
907 futures::{SinkExt, StreamExt},
908 solana_rpc_client_api::response::RpcResponseContext,
909 std::sync::atomic::{AtomicU64, Ordering},
910 tokio_tungstenite::tungstenite::Message,
911 };
912 #[derive(Debug, Clone, Copy)]
913 #[event]
914 pub struct MockEvent {}
915
916 use super::*;
917 #[test]
918 fn new_execution() {
919 let mut logs: &[String] =
920 &["Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw invoke [1]".to_string()];
921 let exe = Execution::new(&mut logs).unwrap();
922 assert_eq!(
923 exe.stack[0],
924 "7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw".to_string()
925 );
926 }
927
928 #[test]
929 fn handle_system_log_pop() {
930 let log = "Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw success";
931 let (program, did_pop) = handle_system_log("asdf", log);
932 assert_eq!(program, None);
933 assert!(did_pop);
934 }
935
936 #[test]
937 fn handle_system_log_no_pop() {
938 let log = "Program 7swsTUiQ6KUK4uFYquQKg4epFRsBnvbrTf2fZQCa2sTJ qwer";
939 let (program, did_pop) = handle_system_log("asdf", log);
940 assert_eq!(program, None);
941 assert!(!did_pop);
942 }
943
944 #[test]
945 fn test_parse_logs_response() -> Result<()> {
946 let logs = vec![
948 "Program VeryCoolProgram invoke [1]", "Program log: Instruction: VeryCoolEvent",
950 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
951 "Program log: Instruction: Transfer",
952 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 664387 compute \
953 units",
954 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
955 "Program VeryCoolProgram consumed 42417 of 700000 compute units",
956 "Program VeryCoolProgram success", "Program EvenCoolerProgram invoke [1]", "Program log: Instruction: EvenCoolerEvent",
959 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
960 "Program log: Instruction: TransferChecked",
961 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6200 of 630919 compute \
962 units",
963 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
964 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
965 "Program log: Instruction: Swap",
966 "Program log: INVARIANT: SWAP",
967 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
968 "Program log: Instruction: Transfer",
969 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 539321 compute \
970 units",
971 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
972 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
973 "Program log: Instruction: Transfer",
974 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 531933 compute \
975 units",
976 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
977 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 84670 of 610768 \
978 compute units",
979 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
980 "Program EvenCoolerProgram invoke [2]",
981 "Program EvenCoolerProgram consumed 2021 of 523272 compute units",
982 "Program EvenCoolerProgram success",
983 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
984 "Program log: Instruction: Swap",
985 "Program log: INVARIANT: SWAP",
986 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
987 "Program log: Instruction: Transfer",
988 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 418618 compute \
989 units",
990 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
991 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
992 "Program log: Instruction: Transfer",
993 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 411230 compute \
994 units",
995 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
996 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 102212 of 507607 \
997 compute units",
998 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
999 "Program EvenCoolerProgram invoke [2]",
1000 "Program EvenCoolerProgram consumed 2021 of 402569 compute units",
1001 "Program EvenCoolerProgram success",
1002 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP invoke [2]",
1003 "Program log: Instruction: Swap",
1004 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1005 "Program log: Instruction: Transfer",
1006 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 371140 compute \
1007 units",
1008 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1009 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1010 "Program log: Instruction: MintTo",
1011 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4492 of 341800 compute \
1012 units",
1013 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1014 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
1015 "Program log: Instruction: Transfer",
1016 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 334370 compute \
1017 units",
1018 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1019 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP consumed 57610 of 386812 \
1020 compute units",
1021 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP success",
1022 "Program EvenCoolerProgram invoke [2]",
1023 "Program EvenCoolerProgram consumed 2021 of 326438 compute units",
1024 "Program EvenCoolerProgram success",
1025 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
1026 "Program log: Instruction: TransferChecked",
1027 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6173 of 319725 compute \
1028 units",
1029 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1030 "Program EvenCoolerProgram consumed 345969 of 657583 compute units",
1031 "Program EvenCoolerProgram success", "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1033 "Program ComputeBudget111111111111111111111111111111 success",
1034 "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1035 "Program ComputeBudget111111111111111111111111111111 success",
1036 ];
1037
1038 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1040
1041 let program_id_str = "VeryCoolProgram";
1042
1043 parse_logs_response::<MockEvent>(
1046 RpcResponse {
1047 context: RpcResponseContext::new(0),
1048 value: RpcLogsResponse {
1049 signature: "".to_string(),
1050 err: None,
1051 logs: logs.to_vec(),
1052 },
1053 },
1054 program_id_str,
1055 )
1056 .unwrap();
1057
1058 Ok(())
1059 }
1060
1061 #[test]
1062 fn test_parse_logs_response_fake_pop() -> Result<()> {
1063 let logs = [
1064 "Program fake111111111111111111111111111111111111112 invoke [1]",
1065 "Program log: i logged success",
1066 "Program log: i logged success",
1067 "Program fake111111111111111111111111111111111111112 consumed 1411 of 200000 compute \
1068 units",
1069 "Program fake111111111111111111111111111111111111112 success",
1070 ];
1071
1072 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1074
1075 let program_id_str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb";
1076
1077 parse_logs_response::<MockEvent>(
1080 RpcResponse {
1081 context: RpcResponseContext::new(0),
1082 value: RpcLogsResponse {
1083 signature: "".to_string(),
1084 err: None,
1085 logs: logs.to_vec(),
1086 },
1087 },
1088 program_id_str,
1089 )
1090 .unwrap();
1091
1092 Ok(())
1093 }
1094
1095 #[test]
1101 fn test_parse_logs_response_log_line_ends_with_invoke_1() -> Result<()> {
1102 let logs = [
1103 "Program VeryCoolProgram invoke [1]",
1104 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
1105 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
1106 "Program log: forwarded inner instruction invoke [1]",
1109 "Program VeryCoolProgram success",
1110 ];
1111 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1112
1113 parse_logs_response::<MockEvent>(
1114 RpcResponse {
1115 context: RpcResponseContext::new(0),
1116 value: RpcLogsResponse {
1117 signature: "".to_string(),
1118 err: None,
1119 logs,
1120 },
1121 },
1122 "VeryCoolProgram",
1123 )
1124 .unwrap();
1125
1126 Ok(())
1127 }
1128
1129 #[test]
1130 fn test_parse_log_response_inner_events() -> Result<()> {
1131 use {
1132 anchor_lang::__private::base64,
1133 base64::{engine::general_purpose::STANDARD, Engine},
1134 };
1135
1136 let mock_event = MockEvent {};
1137 let program_data_log = format!("Program data: {}", STANDARD.encode(mock_event.data()));
1138
1139 let logs = vec![
1140 "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1141 "Program ComputeBudget111111111111111111111111111111 success",
1142 "Program ComputeBudget111111111111111111111111111111 invoke [1]",
1143 "Program ComputeBudget111111111111111111111111111111 success",
1144 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 invoke [1]",
1145 "Program log: Instruction: ValidateNonce",
1146 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 consumed 4839 of 239700 compute \
1147 units",
1148 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 success",
1149 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 invoke [1]",
1150 "Program log: Instruction: SellExactInPumpFunV3",
1151 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P invoke [2]",
1152 "Program log: Instruction: Sell",
1153 "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ invoke [3]",
1154 "Program log: Instruction: GetFees",
1155 "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ consumed 3136 of 187774 compute \
1156 units",
1157 "Program return: pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ \
1158 AAAAAAAAAABfAAAAAAAAAB4AAAAAAAAA",
1159 "Program pfeeUxB6jkeY1Hxd7CsFCAjcbHA9rWtchMGdZ6VojVZ success",
1160 "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb invoke [3]",
1161 "Program log: Instruction: TransferChecked",
1162 "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb consumed 2475 of 180928 compute \
1163 units",
1164 "Program TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb success",
1165 &program_data_log,
1166 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P invoke [3]",
1167 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P consumed 2060 of 166037 compute \
1168 units",
1169 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P success",
1170 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P consumed 60634 of 223605 compute \
1171 units",
1172 "Program 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P success",
1173 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 consumed 72662 of 234861 compute \
1174 units",
1175 "Program term9YPb9mzAsABaqN71A4xdbxHmpBNZavpBiQKZzN3 success",
1176 "Program 11111111111111111111111111111111 invoke [1]",
1177 "Program 11111111111111111111111111111111 success",
1178 "Program 11111111111111111111111111111111 invoke [1]",
1179 "Program 11111111111111111111111111111111 success",
1180 ];
1181
1182 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
1184
1185 let program_id_str = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P";
1186
1187 let events = parse_logs_response::<MockEvent>(
1188 RpcResponse {
1189 context: RpcResponseContext::new(0),
1190 value: RpcLogsResponse {
1191 signature: "".to_string(),
1192 err: None,
1193 logs: logs.to_vec(),
1194 },
1195 },
1196 program_id_str,
1197 )
1198 .unwrap();
1199
1200 assert_eq!(events.len(), 1);
1201
1202 Ok(())
1203 }
1204
1205 #[test]
1207 fn multiple_listeners_no_deadlock() {
1208 let rt = tokio::runtime::Builder::new_multi_thread()
1211 .enable_all()
1212 .build()
1213 .unwrap();
1214
1215 let (addr_tx, addr_rx) = std::sync::mpsc::channel();
1216
1217 rt.spawn(async move {
1218 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1219 let addr = listener.local_addr().unwrap();
1220 addr_tx.send(addr).unwrap();
1221
1222 static SUB_ID: AtomicU64 = AtomicU64::new(0);
1223
1224 loop {
1225 let (stream, _) = listener.accept().await.unwrap();
1226 tokio::spawn(async move {
1227 let mut ws = tokio_tungstenite::accept_async(stream).await.unwrap();
1228 while let Some(Ok(Message::Text(_))) = ws.next().await {
1229 let sub_id = SUB_ID.fetch_add(1, Ordering::Relaxed);
1230 let resp =
1232 format!(r#"{{"jsonrpc":"2.0","result":{sub_id},"id":{sub_id}}}"#);
1233 ws.send(Message::Text(resp.into())).await.unwrap();
1234 }
1235 });
1236 }
1237 });
1238
1239 let addr = addr_rx.recv().unwrap();
1240 let ws_url = format!("ws://{}", addr);
1241
1242 let client = super::Client::new(
1243 super::Cluster::Custom(ws_url.clone(), ws_url),
1244 std::sync::Arc::new(solana_keypair::Keypair::new()),
1245 );
1246 let program = client.program(Pubkey::new_unique()).unwrap();
1247
1248 let (done_tx, done_rx) = std::sync::mpsc::channel();
1251 let handle = std::thread::spawn(move || {
1252 #[cfg(not(feature = "async"))]
1253 {
1254 let _listener1 = program
1255 .on::<MockEvent>(|_ctx, _event| {})
1256 .expect("first listener");
1257
1258 let _listener2 = program
1259 .on::<MockEvent>(|_ctx, _event| {})
1260 .expect("second listener");
1261 }
1262
1263 #[cfg(feature = "async")]
1264 {
1265 let rt = tokio::runtime::Builder::new_current_thread()
1266 .enable_all()
1267 .build()
1268 .unwrap();
1269 rt.block_on(async {
1270 let _listener1 = program
1271 .on::<MockEvent>(|_ctx, _event| {})
1272 .await
1273 .expect("first listener");
1274
1275 let _listener2 = program
1276 .on::<MockEvent>(|_ctx, _event| {})
1277 .await
1278 .expect("second listener");
1279 });
1280 }
1281
1282 let _ = done_tx.send(());
1283 });
1284
1285 done_rx
1287 .recv_timeout(std::time::Duration::from_secs(5))
1288 .expect("registering two listeners should not deadlock");
1289
1290 handle.join().unwrap();
1291 }
1292}