1#![cfg_attr(docsrs, feature(doc_cfg))]
2
3#[cfg(feature = "async")]
68pub use nonblocking::ThreadSafeSigner;
69pub use {
70 anchor_lang,
71 cluster::Cluster,
72 solana_commitment_config::CommitmentConfig,
73 solana_instruction::Instruction,
74 solana_program::hash::Hash,
75 solana_pubsub_client::nonblocking::pubsub_client::PubsubClientError,
76 solana_rpc_client_api::{
77 client_error::Error as SolanaClientError, config::RpcSendTransactionConfig,
78 filter::RpcFilterType,
79 },
80 solana_signer::{Signer, SignerError},
81 solana_transaction::Transaction,
82};
83use {
84 anchor_lang::{
85 solana_program::{program_error::ProgramError, pubkey::Pubkey},
86 AccountDeserialize, Discriminator, InstructionData, ToAccountMetas,
87 },
88 futures::{Future, StreamExt},
89 regex::Regex,
90 solana_account_decoder::{UiAccount, UiAccountEncoding},
91 solana_instruction::AccountMeta,
92 solana_pubsub_client::nonblocking::pubsub_client::PubsubClient,
93 solana_rpc_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient,
94 solana_rpc_client_api::{
95 config::{
96 RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
97 RpcTransactionLogsFilter,
98 },
99 filter::Memcmp,
100 response::{Response as RpcResponse, RpcLogsResponse},
101 },
102 solana_signature::Signature,
103 std::{iter::Map, marker::PhantomData, ops::Deref, pin::Pin, sync::Arc, vec::IntoIter},
104 thiserror::Error,
105 tokio::{
106 runtime::Handle,
107 sync::{
108 mpsc::{unbounded_channel, UnboundedReceiver},
109 OnceCell,
110 },
111 task::JoinHandle,
112 },
113};
114
115mod cluster;
116
117#[cfg(not(feature = "async"))]
118mod blocking;
119#[cfg(feature = "async")]
120mod nonblocking;
121
122const PROGRAM_LOG: &str = "Program log: ";
123const PROGRAM_DATA: &str = "Program data: ";
124
125type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
126pub struct Client<C> {
130 cfg: Config<C>,
131}
132
133impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
134 pub fn new(cluster: Cluster, payer: C) -> Self {
135 Self {
136 cfg: Config {
137 cluster,
138 payer,
139 options: None,
140 },
141 }
142 }
143
144 pub fn new_with_options(cluster: Cluster, payer: C, options: CommitmentConfig) -> Self {
145 Self {
146 cfg: Config {
147 cluster,
148 payer,
149 options: Some(options),
150 },
151 }
152 }
153
154 pub fn program(
155 &self,
156 program_id: Pubkey,
157 #[cfg(feature = "mock")] rpc_client: AsyncRpcClient,
158 ) -> Result<Program<C>, ClientError> {
159 let cfg = Config {
160 cluster: self.cfg.cluster.clone(),
161 options: self.cfg.options,
162 payer: self.cfg.payer.clone(),
163 };
164
165 Program::new(
166 program_id,
167 cfg,
168 #[cfg(feature = "mock")]
169 rpc_client,
170 )
171 }
172}
173
174pub struct DynSigner(pub Arc<dyn Signer>);
178
179impl Signer for DynSigner {
180 fn pubkey(&self) -> Pubkey {
181 self.0.pubkey()
182 }
183
184 fn try_pubkey(&self) -> Result<Pubkey, SignerError> {
185 self.0.try_pubkey()
186 }
187
188 fn sign_message(&self, message: &[u8]) -> Signature {
189 self.0.sign_message(message)
190 }
191
192 fn try_sign_message(&self, message: &[u8]) -> Result<Signature, SignerError> {
193 self.0.try_sign_message(message)
194 }
195
196 fn is_interactive(&self) -> bool {
197 self.0.is_interactive()
198 }
199}
200
201#[derive(Debug)]
203pub struct Config<C> {
204 cluster: Cluster,
205 payer: C,
206 options: Option<CommitmentConfig>,
207}
208
209pub struct EventUnsubscriber<'a> {
210 handle: JoinHandle<Result<(), ClientError>>,
211 rx: UnboundedReceiver<UnsubscribeFn>,
212 #[cfg(not(feature = "async"))]
213 runtime_handle: &'a Handle,
214 _lifetime_marker: PhantomData<&'a Handle>,
215}
216
217impl EventUnsubscriber<'_> {
218 async fn unsubscribe_internal(mut self) {
219 if let Some(unsubscribe) = self.rx.recv().await {
220 unsubscribe().await;
221 }
222
223 let _ = self.handle.await;
224 }
225}
226
227pub struct Program<C> {
229 program_id: Pubkey,
230 cfg: Config<C>,
231 sub_client: OnceCell<Arc<PubsubClient>>,
232 #[cfg(not(feature = "async"))]
233 rt: tokio::runtime::Runtime,
234 internal_rpc_client: AsyncRpcClient,
235}
236
237impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
238 pub fn payer(&self) -> Pubkey {
239 self.cfg.payer.pubkey()
240 }
241
242 pub fn id(&self) -> Pubkey {
243 self.program_id
244 }
245
246 #[cfg(feature = "mock")]
247 pub fn internal_rpc(&self) -> &AsyncRpcClient {
248 &self.internal_rpc_client
249 }
250
251 async fn account_internal<T: AccountDeserialize>(
252 &self,
253 address: Pubkey,
254 ) -> Result<T, ClientError> {
255 let account = self
256 .internal_rpc_client
257 .get_account_with_commitment(&address, CommitmentConfig::processed())
258 .await
259 .map_err(Box::new)?
260 .value
261 .ok_or(ClientError::AccountNotFound)?;
262 let mut data: &[u8] = &account.data;
263 T::try_deserialize(&mut data).map_err(Into::into)
264 }
265
266 async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
267 &self,
268 filters: Vec<RpcFilterType>,
269 ) -> Result<ProgramAccountsIterator<T>, ClientError> {
270 let account_type_filter =
271 RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, T::DISCRIMINATOR));
272 let config = RpcProgramAccountsConfig {
273 filters: Some([vec![account_type_filter], filters].concat()),
274 account_config: RpcAccountInfoConfig {
275 encoding: Some(UiAccountEncoding::Base64),
276 ..RpcAccountInfoConfig::default()
277 },
278 ..RpcProgramAccountsConfig::default()
279 };
280
281 Ok(ProgramAccountsIterator {
282 inner: self
283 .internal_rpc_client
284 .get_program_ui_accounts_with_config(&self.id(), config)
285 .await
286 .map_err(Box::new)?
287 .into_iter()
288 .map(|(key, account)| {
289 let data = account
290 .data
291 .decode()
292 .expect("account was fetched with binary encoding");
293 Ok((key, T::try_deserialize(&mut data.as_slice())?))
294 }),
295 })
296 }
297
298 async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
299 &self,
300 mut f: impl FnMut(&EventContext, T) + Send + 'static,
301 ) -> Result<
302 (
303 JoinHandle<Result<(), ClientError>>,
304 UnboundedReceiver<UnsubscribeFn>,
305 ),
306 ClientError,
307 > {
308 let client = self
309 .sub_client
310 .get_or_try_init(|| async {
311 PubsubClient::new(self.cfg.cluster.ws_url())
312 .await
313 .map(Arc::new)
314 .map_err(|e| ClientError::SolanaClientPubsubError(Box::new(e)))
315 })
316 .await?
317 .clone();
318
319 let (tx, rx) = unbounded_channel::<_>();
320 let config = RpcTransactionLogsConfig {
321 commitment: self.cfg.options,
322 };
323 let program_id_str = self.program_id.to_string();
324 let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
325
326 let handle = tokio::spawn(async move {
327 let (mut notifications, unsubscribe) = client
328 .logs_subscribe(filter, config)
329 .await
330 .map_err(Box::new)?;
331
332 tx.send(unsubscribe).map_err(|e| {
333 ClientError::SolanaClientPubsubError(Box::new(PubsubClientError::RequestFailed {
334 message: "Unsubscribe failed".to_string(),
335 reason: e.to_string(),
336 }))
337 })?;
338
339 while let Some(logs) = notifications.next().await {
340 let ctx = EventContext {
341 signature: logs.value.signature.parse().unwrap(),
342 slot: logs.context.slot,
343 };
344 let events = parse_logs_response(logs, &program_id_str)?;
345 for e in events {
346 f(&ctx, e);
347 }
348 }
349 Ok::<(), ClientError>(())
350 });
351
352 Ok((handle, rx))
353 }
354}
355
356pub struct ProgramAccountsIterator<T> {
359 inner: Map<IntoIter<(Pubkey, UiAccount)>, AccountConverterFunction<T>>,
360}
361
362type AccountConverterFunction<T> = fn((Pubkey, UiAccount)) -> Result<(Pubkey, T), ClientError>;
364
365impl<T> Iterator for ProgramAccountsIterator<T> {
366 type Item = Result<(Pubkey, T), ClientError>;
367
368 fn next(&mut self) -> Option<Self::Item> {
369 self.inner.next()
370 }
371}
372
373pub fn handle_program_log<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
374 self_program_str: &str,
375 l: &str,
376) -> Result<(Option<T>, Option<String>, bool), ClientError> {
377 use {
378 anchor_lang::__private::base64,
379 base64::{engine::general_purpose::STANDARD, Engine},
380 };
381
382 if let Some(log) = l
384 .strip_prefix(PROGRAM_LOG)
385 .or_else(|| l.strip_prefix(PROGRAM_DATA))
386 {
387 let log_bytes = match STANDARD.decode(log) {
388 Ok(log_bytes) => log_bytes,
389 _ => {
390 #[cfg(feature = "debug")]
391 println!("Could not base64 decode log: {}", log);
392 return Ok((None, None, false));
393 }
394 };
395
396 let event = log_bytes
397 .starts_with(T::DISCRIMINATOR)
398 .then(|| {
399 let mut data = &log_bytes[T::DISCRIMINATOR.len()..];
400 T::deserialize(&mut data).map_err(|e| ClientError::LogParseError(e.to_string()))
401 })
402 .transpose()?;
403
404 Ok((event, None, false))
405 }
406 else {
408 let (program, did_pop) = handle_system_log(self_program_str, l);
409 Ok((None, program, did_pop))
410 }
411}
412
413pub fn handle_system_log(this_program_str: &str, log: &str) -> (Option<String>, bool) {
414 if log.starts_with(&format!("Program {this_program_str} log:")) {
415 (Some(this_program_str.to_string()), false)
416
417 } else if log.contains("invoke") && !log.ends_with("[1]") {
420 (Some("cpi".to_string()), false) } else {
422 let re = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) success$").unwrap();
423 if re.is_match(log) {
424 (None, true)
425 } else {
426 (None, false)
427 }
428 }
429}
430
431pub struct Execution {
432 stack: Vec<String>,
433}
434
435impl Execution {
436 pub fn new(logs: &mut &[String]) -> Result<Self, ClientError> {
437 let l = &logs[0];
438 *logs = &logs[1..];
439
440 let re = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap();
441 let c = re
442 .captures(l)
443 .ok_or_else(|| ClientError::LogParseError(l.to_string()))?;
444 let program = c
445 .get(1)
446 .ok_or_else(|| ClientError::LogParseError(l.to_string()))?
447 .as_str()
448 .to_string();
449 Ok(Self {
450 stack: vec![program],
451 })
452 }
453
454 pub fn program(&self) -> String {
455 assert!(!self.stack.is_empty());
456 self.stack[self.stack.len() - 1].clone()
457 }
458
459 pub fn push(&mut self, new_program: String) {
460 self.stack.push(new_program);
461 }
462
463 pub fn pop(&mut self) {
464 assert!(!self.stack.is_empty());
465 self.stack.pop().unwrap();
466 }
467}
468
469#[derive(Debug)]
470pub struct EventContext {
471 pub signature: Signature,
472 pub slot: u64,
473}
474
475#[derive(Debug, Error)]
476pub enum ClientError {
477 #[error("Account not found")]
478 AccountNotFound,
479 #[error("{0}")]
480 AnchorError(#[from] anchor_lang::error::Error),
481 #[error("{0}")]
482 ProgramError(#[from] ProgramError),
483 #[error("{0}")]
484 SolanaClientError(#[from] Box<SolanaClientError>),
485 #[error("{0}")]
486 SolanaClientPubsubError(#[from] Box<PubsubClientError>),
487 #[error("Unable to parse log: {0}")]
488 LogParseError(String),
489 #[error(transparent)]
490 IOError(#[from] std::io::Error),
491 #[error("{0}")]
492 SignerError(#[from] SignerError),
493}
494
495pub trait AsSigner {
496 fn as_signer(&self) -> &dyn Signer;
497}
498
499impl AsSigner for Box<dyn Signer + '_> {
500 fn as_signer(&self) -> &dyn Signer {
501 self.as_ref()
502 }
503}
504
505pub struct RequestBuilder<'a, C, S: 'a> {
508 cluster: String,
509 program_id: Pubkey,
510 accounts: Vec<AccountMeta>,
511 options: CommitmentConfig,
512 instructions: Vec<Instruction>,
513 payer: C,
514 instruction_data: Option<Vec<u8>>,
515 signers: Vec<S>,
516 #[cfg(not(feature = "async"))]
517 handle: &'a Handle,
518 internal_rpc_client: &'a AsyncRpcClient,
519 _phantom: PhantomData<&'a ()>,
520}
521
522impl<C: Deref<Target = impl Signer> + Clone, S: AsSigner> RequestBuilder<'_, C, S> {
524 #[must_use]
525 pub fn payer(mut self, payer: C) -> Self {
526 self.payer = payer;
527 self
528 }
529
530 #[must_use]
531 pub fn cluster(mut self, url: &str) -> Self {
532 self.cluster = url.to_string();
533 self
534 }
535
536 #[must_use]
537 pub fn instruction(mut self, ix: Instruction) -> Self {
538 self.instructions.push(ix);
539 self
540 }
541
542 #[must_use]
543 pub fn program(mut self, program_id: Pubkey) -> Self {
544 self.program_id = program_id;
545 self
546 }
547
548 #[must_use]
579 pub fn accounts(mut self, accounts: impl ToAccountMetas) -> Self {
580 let mut metas = accounts.to_account_metas(None);
581 self.accounts.append(&mut metas);
582 self
583 }
584
585 #[must_use]
586 pub fn options(mut self, options: CommitmentConfig) -> Self {
587 self.options = options;
588 self
589 }
590
591 #[must_use]
592 pub fn args(mut self, args: impl InstructionData) -> Self {
593 self.instruction_data = Some(args.data());
594 self
595 }
596
597 pub fn instructions(&self) -> Vec<Instruction> {
598 let mut instructions = self.instructions.clone();
599 if let Some(ix_data) = &self.instruction_data {
600 instructions.push(Instruction {
601 program_id: self.program_id,
602 data: ix_data.clone(),
603 accounts: self.accounts.clone(),
604 });
605 }
606
607 instructions
608 }
609
610 fn signed_transaction_with_blockhash(
611 &self,
612 latest_hash: Hash,
613 ) -> Result<Transaction, ClientError> {
614 let signers: Vec<&dyn Signer> = self.signers.iter().map(|s| s.as_signer()).collect();
615 let mut all_signers = signers;
616 all_signers.push(&*self.payer);
617
618 let mut tx = self.transaction();
619 tx.try_sign(&all_signers, latest_hash)?;
620
621 Ok(tx)
622 }
623
624 pub fn transaction(&self) -> Transaction {
625 let instructions = &self.instructions();
626 Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()))
627 }
628
629 async fn signed_transaction_internal(&self) -> Result<Transaction, ClientError> {
630 let latest_hash = self
631 .internal_rpc_client
632 .get_latest_blockhash()
633 .await
634 .map_err(Box::new)?;
635
636 let tx = self.signed_transaction_with_blockhash(latest_hash)?;
637 Ok(tx)
638 }
639
640 async fn send_internal(&self) -> Result<Signature, ClientError> {
641 let latest_hash = self
642 .internal_rpc_client
643 .get_latest_blockhash()
644 .await
645 .map_err(Box::new)?;
646 let tx = self.signed_transaction_with_blockhash(latest_hash)?;
647
648 self.internal_rpc_client
649 .send_and_confirm_transaction(&tx)
650 .await
651 .map_err(|e| Box::new(e).into())
652 }
653
654 async fn send_with_spinner_and_config_internal(
655 &self,
656 config: RpcSendTransactionConfig,
657 ) -> Result<Signature, ClientError> {
658 let latest_hash = self
659 .internal_rpc_client
660 .get_latest_blockhash()
661 .await
662 .map_err(Box::new)?;
663 let tx = self.signed_transaction_with_blockhash(latest_hash)?;
664
665 self.internal_rpc_client
666 .send_and_confirm_transaction_with_spinner_and_config(
667 &tx,
668 self.internal_rpc_client.commitment(),
669 config,
670 )
671 .await
672 .map_err(|e| Box::new(e).into())
673 }
674}
675
676fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
677 logs: RpcResponse<RpcLogsResponse>,
678 program_id_str: &str,
679) -> Result<Vec<T>, ClientError> {
680 let mut logs = &logs.value.logs[..];
681 let mut events: Vec<T> = Vec::new();
682 if !logs.is_empty() {
683 if let Ok(mut execution) = Execution::new(&mut logs) {
684 let mut logs_iter = logs.iter().peekable();
686 let regex = Regex::new(r"^Program ([1-9A-HJ-NP-Za-km-z]+) invoke \[[\d]+\]$").unwrap();
687
688 while let Some(l) = logs_iter.next() {
689 let (event, new_program, did_pop) = {
691 if program_id_str == execution.program() {
692 handle_program_log(program_id_str, l)?
693 } else {
694 let (program, did_pop) = handle_system_log(program_id_str, l);
695 (None, program, did_pop)
696 }
697 };
698 if let Some(e) = event {
700 events.push(e);
701 }
702 if let Some(new_program) = new_program {
704 execution.push(new_program);
705 }
706 if did_pop {
708 execution.pop();
709
710 if let Some(&next_log) = logs_iter.peek() {
718 if next_log.ends_with("invoke [1]") {
719 let next_instruction =
720 regex.captures(next_log).unwrap().get(1).unwrap().as_str();
721 execution.push(next_instruction.to_string());
725 }
726 };
727 }
728 }
729 }
730 }
731 Ok(events)
732}
733
734#[cfg(test)]
735mod tests {
736 use {
739 anchor_lang::prelude::*,
740 futures::{SinkExt, StreamExt},
741 solana_rpc_client_api::response::RpcResponseContext,
742 std::sync::atomic::{AtomicU64, Ordering},
743 tokio_tungstenite::tungstenite::Message,
744 };
745 #[derive(Debug, Clone, Copy)]
746 #[event]
747 pub struct MockEvent {}
748
749 use super::*;
750 #[test]
751 fn new_execution() {
752 let mut logs: &[String] =
753 &["Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw invoke [1]".to_string()];
754 let exe = Execution::new(&mut logs).unwrap();
755 assert_eq!(
756 exe.stack[0],
757 "7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw".to_string()
758 );
759 }
760
761 #[test]
762 fn handle_system_log_pop() {
763 let log = "Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw success";
764 let (program, did_pop) = handle_system_log("asdf", log);
765 assert_eq!(program, None);
766 assert!(did_pop);
767 }
768
769 #[test]
770 fn handle_system_log_no_pop() {
771 let log = "Program 7swsTUiQ6KUK4uFYquQKg4epFRsBnvbrTf2fZQCa2sTJ qwer";
772 let (program, did_pop) = handle_system_log("asdf", log);
773 assert_eq!(program, None);
774 assert!(!did_pop);
775 }
776
777 #[test]
778 fn test_parse_logs_response() -> Result<()> {
779 let logs = vec![
781 "Program VeryCoolProgram invoke [1]", "Program log: Instruction: VeryCoolEvent",
783 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
784 "Program log: Instruction: Transfer",
785 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 664387 compute \
786 units",
787 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
788 "Program VeryCoolProgram consumed 42417 of 700000 compute units",
789 "Program VeryCoolProgram success", "Program EvenCoolerProgram invoke [1]", "Program log: Instruction: EvenCoolerEvent",
792 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
793 "Program log: Instruction: TransferChecked",
794 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6200 of 630919 compute \
795 units",
796 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
797 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
798 "Program log: Instruction: Swap",
799 "Program log: INVARIANT: SWAP",
800 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
801 "Program log: Instruction: Transfer",
802 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 539321 compute \
803 units",
804 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
805 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
806 "Program log: Instruction: Transfer",
807 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 531933 compute \
808 units",
809 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
810 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 84670 of 610768 \
811 compute units",
812 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
813 "Program EvenCoolerProgram invoke [2]",
814 "Program EvenCoolerProgram consumed 2021 of 523272 compute units",
815 "Program EvenCoolerProgram success",
816 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt invoke [2]",
817 "Program log: Instruction: Swap",
818 "Program log: INVARIANT: SWAP",
819 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
820 "Program log: Instruction: Transfer",
821 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 418618 compute \
822 units",
823 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
824 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
825 "Program log: Instruction: Transfer",
826 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 411230 compute \
827 units",
828 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
829 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt consumed 102212 of 507607 \
830 compute units",
831 "Program HyaB3W9q6XdA5xwpU4XnSZV94htfmbmqJXZcEbRaJutt success",
832 "Program EvenCoolerProgram invoke [2]",
833 "Program EvenCoolerProgram consumed 2021 of 402569 compute units",
834 "Program EvenCoolerProgram success",
835 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP invoke [2]",
836 "Program log: Instruction: Swap",
837 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
838 "Program log: Instruction: Transfer",
839 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4736 of 371140 compute \
840 units",
841 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
842 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
843 "Program log: Instruction: MintTo",
844 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4492 of 341800 compute \
845 units",
846 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
847 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [3]",
848 "Program log: Instruction: Transfer",
849 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 4645 of 334370 compute \
850 units",
851 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
852 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP consumed 57610 of 386812 \
853 compute units",
854 "Program 9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP success",
855 "Program EvenCoolerProgram invoke [2]",
856 "Program EvenCoolerProgram consumed 2021 of 326438 compute units",
857 "Program EvenCoolerProgram success",
858 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA invoke [2]",
859 "Program log: Instruction: TransferChecked",
860 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA consumed 6173 of 319725 compute \
861 units",
862 "Program TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA success",
863 "Program EvenCoolerProgram consumed 345969 of 657583 compute units",
864 "Program EvenCoolerProgram success", "Program ComputeBudget111111111111111111111111111111 invoke [1]",
866 "Program ComputeBudget111111111111111111111111111111 success",
867 "Program ComputeBudget111111111111111111111111111111 invoke [1]",
868 "Program ComputeBudget111111111111111111111111111111 success",
869 ];
870
871 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
873
874 let program_id_str = "VeryCoolProgram";
875
876 parse_logs_response::<MockEvent>(
879 RpcResponse {
880 context: RpcResponseContext::new(0),
881 value: RpcLogsResponse {
882 signature: "".to_string(),
883 err: None,
884 logs: logs.to_vec(),
885 },
886 },
887 program_id_str,
888 )
889 .unwrap();
890
891 Ok(())
892 }
893
894 #[test]
895 fn test_parse_logs_response_fake_pop() -> Result<()> {
896 let logs = [
897 "Program fake111111111111111111111111111111111111112 invoke [1]",
898 "Program log: i logged success",
899 "Program log: i logged success",
900 "Program fake111111111111111111111111111111111111112 consumed 1411 of 200000 compute \
901 units",
902 "Program fake111111111111111111111111111111111111112 success",
903 ];
904
905 let logs: Vec<String> = logs.iter().map(|&l| l.to_string()).collect();
907
908 let program_id_str = "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb";
909
910 parse_logs_response::<MockEvent>(
913 RpcResponse {
914 context: RpcResponseContext::new(0),
915 value: RpcLogsResponse {
916 signature: "".to_string(),
917 err: None,
918 logs: logs.to_vec(),
919 },
920 },
921 program_id_str,
922 )
923 .unwrap();
924
925 Ok(())
926 }
927
928 #[test]
930 fn multiple_listeners_no_deadlock() {
931 let rt = tokio::runtime::Builder::new_multi_thread()
934 .enable_all()
935 .build()
936 .unwrap();
937
938 let (addr_tx, addr_rx) = std::sync::mpsc::channel();
939
940 rt.spawn(async move {
941 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
942 let addr = listener.local_addr().unwrap();
943 addr_tx.send(addr).unwrap();
944
945 static SUB_ID: AtomicU64 = AtomicU64::new(0);
946
947 loop {
948 let (stream, _) = listener.accept().await.unwrap();
949 tokio::spawn(async move {
950 let mut ws = tokio_tungstenite::accept_async(stream).await.unwrap();
951 while let Some(Ok(Message::Text(_))) = ws.next().await {
952 let sub_id = SUB_ID.fetch_add(1, Ordering::Relaxed);
953 let resp =
955 format!(r#"{{"jsonrpc":"2.0","result":{sub_id},"id":{sub_id}}}"#);
956 ws.send(Message::Text(resp.into())).await.unwrap();
957 }
958 });
959 }
960 });
961
962 let addr = addr_rx.recv().unwrap();
963 let ws_url = format!("ws://{}", addr);
964
965 let client = super::Client::new(
966 super::Cluster::Custom(ws_url.clone(), ws_url),
967 std::sync::Arc::new(solana_keypair::Keypair::new()),
968 );
969 let program = client.program(Pubkey::new_unique()).unwrap();
970
971 let (done_tx, done_rx) = std::sync::mpsc::channel();
974 let handle = std::thread::spawn(move || {
975 #[cfg(not(feature = "async"))]
976 {
977 let _listener1 = program
978 .on::<MockEvent>(|_ctx, _event| {})
979 .expect("first listener");
980
981 let _listener2 = program
982 .on::<MockEvent>(|_ctx, _event| {})
983 .expect("second listener");
984 }
985
986 #[cfg(feature = "async")]
987 {
988 let rt = tokio::runtime::Builder::new_current_thread()
989 .enable_all()
990 .build()
991 .unwrap();
992 rt.block_on(async {
993 let _listener1 = program
994 .on::<MockEvent>(|_ctx, _event| {})
995 .await
996 .expect("first listener");
997
998 let _listener2 = program
999 .on::<MockEvent>(|_ctx, _event| {})
1000 .await
1001 .expect("second listener");
1002 });
1003 }
1004
1005 let _ = done_tx.send(());
1006 });
1007
1008 done_rx
1010 .recv_timeout(std::time::Duration::from_secs(5))
1011 .expect("registering two listeners should not deadlock");
1012
1013 handle.join().unwrap();
1014 }
1015}