1use {
4 crate::{
5 filter::filter_allows,
6 optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
7 parsed_token_accounts::{get_parsed_token_account, get_parsed_token_accounts},
8 rpc_pubsub_service::PubSubConfig,
9 rpc_subscription_tracker::{
10 AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams,
11 LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams,
12 SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionInfo,
13 SubscriptionParams, SubscriptionsTracker,
14 },
15 },
16 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
17 itertools::Either,
18 rayon::prelude::*,
19 serde::Serialize,
20 solana_account::{AccountSharedData, ReadableAccount},
21 solana_account_decoder::{
22 encode_ui_account, parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding,
23 },
24 solana_clock::Slot,
25 solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
26 solana_measure::measure::Measure,
27 solana_pubkey::Pubkey,
28 solana_rpc_client_api::response::{
29 ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
30 RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
31 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
32 },
33 solana_runtime::{
34 bank::{Bank, TransactionLogInfo},
35 bank_forks::BankForks,
36 commitment::{BlockCommitmentCache, CommitmentSlots},
37 },
38 solana_signature::Signature,
39 solana_time_utils::timestamp,
40 solana_transaction_status::{
41 BlockEncodingOptions, ConfirmedBlock, EncodeError, VersionedConfirmedBlock,
42 },
43 solana_vote::vote_transaction::VoteTransaction,
44 std::{
45 cell::RefCell,
46 collections::{HashMap, VecDeque},
47 io::Cursor,
48 str,
49 sync::{
50 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
51 Arc, Mutex, RwLock, Weak,
52 },
53 thread::{Builder, JoinHandle},
54 time::{Duration, Instant},
55 },
56 tokio::sync::broadcast,
57};
58
59mod transaction {
60 pub use solana_transaction_error::TransactionResult as Result;
61}
62
63const RECEIVE_DELAY_MILLIS: u64 = 100;
64
65fn get_transaction_logs(
66 bank: &Bank,
67 params: &LogsSubscriptionParams,
68) -> Option<Vec<TransactionLogInfo>> {
69 let pubkey = match ¶ms.kind {
70 LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
71 LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
72 };
73 let mut logs = bank.get_transaction_logs(pubkey);
74 if matches!(params.kind, LogsSubscriptionKind::All) {
75 if let Some(logs) = &mut logs {
77 logs.retain(|log| !log.is_vote);
78 }
79 }
80 logs
81}
82#[derive(Debug)]
83pub struct TimestampedNotificationEntry {
84 pub entry: NotificationEntry,
85 pub queued_at: Instant,
86}
87
88impl From<NotificationEntry> for TimestampedNotificationEntry {
89 fn from(entry: NotificationEntry) -> Self {
90 TimestampedNotificationEntry {
91 entry,
92 queued_at: Instant::now(),
93 }
94 }
95}
96
97pub enum NotificationEntry {
98 Slot(SlotInfo),
99 SlotUpdate(SlotUpdate),
100 Vote((Pubkey, VoteTransaction, Signature)),
101 Root(Slot),
102 Bank(CommitmentSlots),
103 Gossip(Slot),
104 SignaturesReceived((Slot, Vec<Signature>)),
105 Subscribed(SubscriptionParams, SubscriptionId),
106 Unsubscribed(SubscriptionParams, SubscriptionId),
107}
108
109impl std::fmt::Debug for NotificationEntry {
110 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
111 match self {
112 NotificationEntry::Root(root) => write!(f, "Root({root})"),
113 NotificationEntry::Vote(vote) => write!(f, "Vote({vote:?})"),
114 NotificationEntry::Slot(slot_info) => write!(f, "Slot({slot_info:?})"),
115 NotificationEntry::SlotUpdate(slot_update) => {
116 write!(f, "SlotUpdate({slot_update:?})")
117 }
118 NotificationEntry::Bank(commitment_slots) => {
119 write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
120 }
121 NotificationEntry::SignaturesReceived(slot_signatures) => {
122 write!(f, "SignaturesReceived({slot_signatures:?})")
123 }
124 NotificationEntry::Gossip(slot) => write!(f, "Gossip({slot:?})"),
125 NotificationEntry::Subscribed(params, id) => {
126 write!(f, "Subscribed({params:?}, {id:?})")
127 }
128 NotificationEntry::Unsubscribed(params, id) => {
129 write!(f, "Unsubscribed({params:?}, {id:?})")
130 }
131 }
132 }
133}
134
135#[allow(clippy::type_complexity)]
136fn check_commitment_and_notify<P, S, B, F, X, I>(
137 params: &P,
138 subscription: &SubscriptionInfo,
139 bank_forks: &Arc<RwLock<BankForks>>,
140 slot: Slot,
141 bank_method: B,
142 filter_results: F,
143 notifier: &RpcNotifier,
144 is_final: bool,
145) -> bool
146where
147 S: Clone + Serialize,
148 B: Fn(&Bank, &P) -> X,
149 F: Fn(X, &P, Slot, Arc<Bank>) -> (I, Slot),
150 X: Clone + Default,
151 I: IntoIterator<Item = S>,
152{
153 let mut notified = false;
154 let bank = bank_forks.read().unwrap().get(slot);
155 if let Some(bank) = bank {
156 let results = bank_method(&bank, params);
157 let mut w_last_notified_slot = subscription.last_notified_slot.write().unwrap();
158 let (filter_results, result_slot) =
159 filter_results(results, params, *w_last_notified_slot, bank);
160 for result in filter_results {
161 notifier.notify(
162 RpcResponse::from(RpcNotificationResponse {
163 context: RpcNotificationContext { slot },
164 value: result,
165 }),
166 subscription,
167 is_final,
168 );
169 *w_last_notified_slot = result_slot;
170 notified = true;
171 }
172 }
173
174 notified
175}
176
177#[derive(Debug, Clone)]
178pub struct RpcNotification {
179 pub subscription_id: SubscriptionId,
180 pub is_final: bool,
181 pub json: Weak<String>,
182 pub created_at: Instant,
183}
184
185#[derive(Debug, Clone, PartialEq)]
186struct RpcNotificationResponse<T> {
187 context: RpcNotificationContext,
188 value: T,
189}
190
191impl<T> From<RpcNotificationResponse<T>> for RpcResponse<T> {
192 fn from(notification: RpcNotificationResponse<T>) -> Self {
193 let RpcNotificationResponse {
194 context: RpcNotificationContext { slot },
195 value,
196 } = notification;
197 Self {
198 context: RpcResponseContext {
199 slot,
200 api_version: None,
201 },
202 value,
203 }
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208struct RpcNotificationContext {
209 slot: Slot,
210}
211
212const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000);
213
214struct RecentItems {
215 queue: VecDeque<Arc<String>>,
216 total_bytes: usize,
217 max_len: usize,
218 max_total_bytes: usize,
219 last_metrics_submission: Instant,
220}
221
222impl RecentItems {
223 fn new(max_len: usize, max_total_bytes: usize) -> Self {
224 Self {
225 queue: VecDeque::new(),
226 total_bytes: 0,
227 max_len,
228 max_total_bytes,
229 last_metrics_submission: Instant::now(),
230 }
231 }
232
233 fn push(&mut self, item: Arc<String>) {
234 self.total_bytes = self
235 .total_bytes
236 .checked_add(item.len())
237 .expect("total bytes overflow");
238 self.queue.push_back(item);
239
240 while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
241 let item = self.queue.pop_front().expect("can't be empty");
242 self.total_bytes = self
243 .total_bytes
244 .checked_sub(item.len())
245 .expect("total bytes underflow");
246 }
247
248 let now = Instant::now();
249 let last_metrics_ago = now.duration_since(self.last_metrics_submission);
250 if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
251 datapoint_info!(
252 "rpc_subscriptions_recent_items",
253 ("num", self.queue.len(), i64),
254 ("total_bytes", self.total_bytes, i64),
255 );
256 self.last_metrics_submission = now;
257 } else {
258 trace!(
259 "rpc_subscriptions_recent_items num={} total_bytes={}",
260 self.queue.len(),
261 self.total_bytes,
262 );
263 }
264 }
265}
266
267struct RpcNotifier {
268 sender: broadcast::Sender<RpcNotification>,
269 recent_items: Mutex<RecentItems>,
270}
271
272thread_local! {
273 static RPC_NOTIFIER_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
274}
275
276#[derive(Debug, Serialize)]
277struct NotificationParams<T> {
278 result: T,
279 subscription: SubscriptionId,
280}
281
282#[derive(Debug, Serialize)]
283struct Notification<T> {
284 jsonrpc: Option<jsonrpc_core::Version>,
285 method: &'static str,
286 params: NotificationParams<T>,
287}
288
289impl RpcNotifier {
290 fn notify<T>(&self, value: T, subscription: &SubscriptionInfo, is_final: bool)
291 where
292 T: serde::Serialize,
293 {
294 let buf_arc = RPC_NOTIFIER_BUF.with(|buf| {
295 let mut buf = buf.borrow_mut();
296 buf.clear();
297 let notification = Notification {
298 jsonrpc: Some(jsonrpc_core::Version::V2),
299 method: subscription.method(),
300 params: NotificationParams {
301 result: value,
302 subscription: subscription.id(),
303 },
304 };
305 serde_json::to_writer(Cursor::new(&mut *buf), ¬ification)
306 .expect("serialization never fails");
307 let buf_str = str::from_utf8(&buf).expect("json is always utf-8");
308 Arc::new(String::from(buf_str))
309 });
310
311 let notification = RpcNotification {
312 subscription_id: subscription.id(),
313 json: Arc::downgrade(&buf_arc),
314 is_final,
315 created_at: Instant::now(),
316 };
317 let _ = self.sender.send(notification);
320
321 inc_new_counter_info!("rpc-pubsub-messages", 1);
322 inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
323
324 self.recent_items.lock().unwrap().push(buf_arc);
325 }
326}
327
328fn filter_block_result_txs(
329 mut block: VersionedConfirmedBlock,
330 last_modified_slot: Slot,
331 params: &BlockSubscriptionParams,
332) -> Result<Option<RpcBlockUpdate>, RpcBlockUpdateError> {
333 block.transactions = match params.kind {
334 BlockSubscriptionKind::All => block.transactions,
335 BlockSubscriptionKind::MentionsAccountOrProgram(pk) => block
336 .transactions
337 .into_iter()
338 .filter(|tx| tx.account_keys().iter().any(|key| key == &pk))
339 .collect(),
340 };
341
342 if block.transactions.is_empty() {
343 if let BlockSubscriptionKind::MentionsAccountOrProgram(_) = params.kind {
344 return Ok(None);
345 }
346 }
347
348 let block = ConfirmedBlock::from(block)
349 .encode_with_options(
350 params.encoding,
351 BlockEncodingOptions {
352 transaction_details: params.transaction_details,
353 show_rewards: params.show_rewards,
354 max_supported_transaction_version: params.max_supported_transaction_version,
355 },
356 )
357 .map_err(|err| match err {
358 EncodeError::UnsupportedTransactionVersion(version) => {
359 RpcBlockUpdateError::UnsupportedTransactionVersion(version)
360 }
361 })?;
362
363 Ok(Some(RpcBlockUpdate {
367 slot: last_modified_slot,
368 block: Some(block),
369 err: None,
370 }))
371}
372
373fn filter_account_result(
374 result: Option<(AccountSharedData, Slot)>,
375 params: &AccountSubscriptionParams,
376 last_notified_slot: Slot,
377 bank: Arc<Bank>,
378) -> (Option<UiAccount>, Slot) {
379 let (account, last_modified_slot) = result.unwrap_or_default();
382
383 let account = (last_modified_slot != last_notified_slot).then(|| {
386 if is_known_spl_token_id(account.owner())
387 && params.encoding == UiAccountEncoding::JsonParsed
388 {
389 get_parsed_token_account(&bank, ¶ms.pubkey, account, None)
390 } else {
391 encode_ui_account(¶ms.pubkey, &account, params.encoding, None, None)
392 }
393 });
394 (account, last_modified_slot)
395}
396
397fn filter_signature_result(
398 result: Option<transaction::Result<()>>,
399 _params: &SignatureSubscriptionParams,
400 last_notified_slot: Slot,
401 _bank: Arc<Bank>,
402) -> (Option<RpcSignatureResult>, Slot) {
403 (
404 result.map(|result| {
405 RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
406 err: result.err().map(Into::into),
407 })
408 }),
409 last_notified_slot,
410 )
411}
412
413fn filter_program_results(
414 accounts: Vec<(Pubkey, AccountSharedData)>,
415 params: &ProgramSubscriptionParams,
416 last_notified_slot: Slot,
417 bank: Arc<Bank>,
418) -> (impl Iterator<Item = RpcKeyedAccount>, Slot) {
419 let accounts_is_empty = accounts.is_empty();
420 let encoding = params.encoding;
421 let filters = params.filters.clone();
422 let keyed_accounts = accounts.into_iter().filter(move |(_, account)| {
423 filters
424 .iter()
425 .all(|filter_type| filter_allows(filter_type, account))
426 });
427 let accounts = if is_known_spl_token_id(¶ms.pubkey)
428 && params.encoding == UiAccountEncoding::JsonParsed
429 && !accounts_is_empty
430 {
431 let accounts = get_parsed_token_accounts(bank, keyed_accounts);
432 Either::Left(accounts)
433 } else {
434 let accounts = keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount {
435 pubkey: pubkey.to_string(),
436 account: encode_ui_account(&pubkey, &account, encoding, None, None),
437 });
438 Either::Right(accounts)
439 };
440 (accounts, last_notified_slot)
441}
442
443fn filter_logs_results(
444 logs: Option<Vec<TransactionLogInfo>>,
445 _params: &LogsSubscriptionParams,
446 last_notified_slot: Slot,
447 _bank: Arc<Bank>,
448) -> (impl Iterator<Item = RpcLogsResponse>, Slot) {
449 let responses = logs.into_iter().flatten().map(|log| RpcLogsResponse {
450 signature: log.signature.to_string(),
451 err: log.result.err().map(Into::into),
452 logs: log.log_messages,
453 });
454 (responses, last_notified_slot)
455}
456
457fn initial_last_notified_slot(
458 params: &SubscriptionParams,
459 bank_forks: &RwLock<BankForks>,
460 block_commitment_cache: &RwLock<BlockCommitmentCache>,
461 optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
462) -> Option<Slot> {
463 match params {
464 SubscriptionParams::Account(params) => {
465 let slot = if params.commitment.is_finalized() {
466 block_commitment_cache
467 .read()
468 .unwrap()
469 .highest_super_majority_root()
470 } else if params.commitment.is_confirmed() {
471 optimistically_confirmed_bank.read().unwrap().bank.slot()
472 } else {
473 block_commitment_cache.read().unwrap().slot()
474 };
475
476 let bank = bank_forks.read().unwrap().get(slot)?;
477 Some(bank.get_account_modified_slot(¶ms.pubkey)?.1)
478 }
479 _ => None,
480 }
481}
482
483#[derive(Default)]
484struct PubsubNotificationStats {
485 since: Option<Instant>,
486 notification_entry_processing_count: u64,
487 notification_entry_processing_time_us: u64,
488}
489
490impl PubsubNotificationStats {
491 fn maybe_submit(&mut self) {
492 const SUBMIT_CADENCE: Duration = RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS;
493 let elapsed = self.since.as_ref().map(Instant::elapsed);
494 if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
495 return;
496 }
497 datapoint_info!(
498 "pubsub_notification_entries",
499 (
500 "notification_entry_processing_count",
501 self.notification_entry_processing_count,
502 i64
503 ),
504 (
505 "notification_entry_processing_time_us",
506 self.notification_entry_processing_time_us,
507 i64
508 ),
509 );
510 *self = Self {
511 since: Some(Instant::now()),
512 ..Self::default()
513 };
514 }
515}
516
517pub struct RpcSubscriptions {
518 notification_sender: Option<Sender<TimestampedNotificationEntry>>,
519 t_cleanup: Option<JoinHandle<()>>,
520
521 exit: Arc<AtomicBool>,
522 control: SubscriptionControl,
523}
524
525impl Drop for RpcSubscriptions {
526 fn drop(&mut self) {
527 self.shutdown().unwrap_or_else(|err| {
528 warn!("RPC Notification - shutdown error: {err:?}");
529 });
530 }
531}
532
533impl RpcSubscriptions {
534 pub fn new(
535 exit: Arc<AtomicBool>,
536 max_complete_transaction_status_slot: Arc<AtomicU64>,
537 blockstore: Arc<Blockstore>,
538 bank_forks: Arc<RwLock<BankForks>>,
539 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
540 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
541 ) -> Self {
542 Self::new_with_config(
543 exit,
544 max_complete_transaction_status_slot,
545 blockstore,
546 bank_forks,
547 block_commitment_cache,
548 optimistically_confirmed_bank,
549 &PubSubConfig::default(),
550 None,
551 )
552 }
553
554 pub fn new_for_tests(
555 exit: Arc<AtomicBool>,
556 max_complete_transaction_status_slot: Arc<AtomicU64>,
557 bank_forks: Arc<RwLock<BankForks>>,
558 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
559 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
560 ) -> Self {
561 let ledger_path = get_tmp_ledger_path!();
562 let blockstore = Blockstore::open(&ledger_path).unwrap();
563 let blockstore = Arc::new(blockstore);
564
565 Self::new_for_tests_with_blockstore(
566 exit,
567 max_complete_transaction_status_slot,
568 blockstore,
569 bank_forks,
570 block_commitment_cache,
571 optimistically_confirmed_bank,
572 )
573 }
574
575 pub fn new_for_tests_with_blockstore(
576 exit: Arc<AtomicBool>,
577 max_complete_transaction_status_slot: Arc<AtomicU64>,
578 blockstore: Arc<Blockstore>,
579 bank_forks: Arc<RwLock<BankForks>>,
580 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
581 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
582 ) -> Self {
583 let rpc_notifier_ready = Arc::new(AtomicBool::new(false));
584
585 let rpc_subscriptions = Self::new_with_config(
586 exit,
587 max_complete_transaction_status_slot,
588 blockstore,
589 bank_forks,
590 block_commitment_cache,
591 optimistically_confirmed_bank,
592 &PubSubConfig::default_for_tests(),
593 Some(rpc_notifier_ready.clone()),
594 );
595
596 let start_time = Instant::now();
598 loop {
599 if rpc_notifier_ready.load(Ordering::Relaxed) {
600 break;
601 } else if (Instant::now() - start_time).as_millis() > 5000 {
602 panic!("RPC notifier thread setup took too long");
603 }
604 }
605
606 rpc_subscriptions
607 }
608
609 pub fn new_with_config(
610 exit: Arc<AtomicBool>,
611 max_complete_transaction_status_slot: Arc<AtomicU64>,
612 blockstore: Arc<Blockstore>,
613 bank_forks: Arc<RwLock<BankForks>>,
614 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
615 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
616 config: &PubSubConfig,
617 rpc_notifier_ready: Option<Arc<AtomicBool>>,
618 ) -> Self {
619 let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();
620
621 let subscriptions = SubscriptionsTracker::new(bank_forks.clone());
622
623 let (broadcast_sender, _) = broadcast::channel(config.queue_capacity_items);
624
625 let notifier = RpcNotifier {
626 sender: broadcast_sender.clone(),
627 recent_items: Mutex::new(RecentItems::new(
628 config.queue_capacity_items,
629 config.queue_capacity_bytes,
630 )),
631 };
632
633 let t_cleanup = config.notification_threads.map(|notification_threads| {
634 let exit = exit.clone();
635 Builder::new()
636 .name("solRpcNotifier".to_string())
637 .spawn(move || {
638 let pool = rayon::ThreadPoolBuilder::new()
639 .num_threads(notification_threads.get())
640 .thread_name(|i| format!("solRpcNotify{i:02}"))
641 .build()
642 .unwrap();
643 pool.install(|| {
644 if let Some(rpc_notifier_ready) = rpc_notifier_ready {
645 rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
646 }
647 Self::process_notifications(
648 exit,
649 max_complete_transaction_status_slot,
650 blockstore,
651 notifier,
652 notification_receiver,
653 subscriptions,
654 bank_forks,
655 block_commitment_cache,
656 optimistically_confirmed_bank,
657 )
658 });
659 })
660 .unwrap()
661 });
662
663 let control = SubscriptionControl::new(
664 config.max_active_subscriptions,
665 notification_sender.clone(),
666 broadcast_sender,
667 );
668
669 Self {
670 notification_sender: config.notification_threads.map(|_| notification_sender),
671 t_cleanup,
672 exit,
673 control,
674 }
675 }
676
677 pub fn default_with_bank_forks(
679 max_complete_transaction_status_slot: Arc<AtomicU64>,
680 bank_forks: Arc<RwLock<BankForks>>,
681 ) -> Self {
682 let ledger_path = get_tmp_ledger_path!();
683 let blockstore = Blockstore::open(&ledger_path).unwrap();
684 let blockstore = Arc::new(blockstore);
685 let optimistically_confirmed_bank =
686 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
687 Self::new(
688 Arc::new(AtomicBool::new(false)),
689 max_complete_transaction_status_slot,
690 blockstore,
691 bank_forks,
692 Arc::new(RwLock::new(BlockCommitmentCache::default())),
693 optimistically_confirmed_bank,
694 )
695 }
696
697 pub fn control(&self) -> &SubscriptionControl {
698 &self.control
699 }
700
701 pub fn notify_subscribers(&self, commitment_slots: CommitmentSlots) {
704 self.enqueue_notification(NotificationEntry::Bank(commitment_slots));
705 }
706
707 pub fn notify_gossip_subscribers(&self, slot: Slot) {
710 self.enqueue_notification(NotificationEntry::Gossip(slot));
711 }
712
713 pub fn notify_slot_update(&self, slot_update: SlotUpdate) {
714 self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update));
715 }
716
717 pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
718 self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
719 self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank {
720 slot,
721 parent,
722 timestamp: timestamp(),
723 }));
724 }
725
726 pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
727 self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
728 }
729
730 pub fn notify_vote(&self, vote_pubkey: Pubkey, vote: VoteTransaction, signature: Signature) {
731 self.enqueue_notification(NotificationEntry::Vote((vote_pubkey, vote, signature)));
732 }
733
734 pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
735 rooted_slots.sort_unstable();
736 rooted_slots.into_iter().for_each(|root| {
737 self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root {
738 slot: root,
739 timestamp: timestamp(),
740 }));
741 self.enqueue_notification(NotificationEntry::Root(root));
742 });
743 }
744
745 fn enqueue_notification(&self, notification_entry: NotificationEntry) {
746 if let Some(ref notification_sender) = self.notification_sender {
747 match notification_sender.send(notification_entry.into()) {
748 Ok(()) => (),
749 Err(SendError(notification)) => {
750 warn!("Dropped RPC Notification - receiver disconnected : {notification:?}");
751 }
752 }
753 }
754 }
755
756 #[allow(clippy::too_many_arguments)]
757 fn process_notifications(
758 exit: Arc<AtomicBool>,
759 max_complete_transaction_status_slot: Arc<AtomicU64>,
760 blockstore: Arc<Blockstore>,
761 notifier: RpcNotifier,
762 notification_receiver: Receiver<TimestampedNotificationEntry>,
763 mut subscriptions: SubscriptionsTracker,
764 bank_forks: Arc<RwLock<BankForks>>,
765 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
766 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
767 ) {
768 let mut stats = PubsubNotificationStats::default();
769
770 loop {
771 if exit.load(Ordering::Relaxed) {
772 break;
773 }
774 match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
775 Ok(notification_entry) => {
776 let TimestampedNotificationEntry { entry, queued_at } = notification_entry;
777 match entry {
778 NotificationEntry::Subscribed(params, id) => {
779 subscriptions.subscribe(params.clone(), id, || {
780 initial_last_notified_slot(
781 ¶ms,
782 &bank_forks,
783 &block_commitment_cache,
784 &optimistically_confirmed_bank,
785 )
786 .unwrap_or(0)
787 });
788 }
789 NotificationEntry::Unsubscribed(params, id) => {
790 subscriptions.unsubscribe(params, id);
791 }
792 NotificationEntry::Slot(slot_info) => {
793 if let Some(sub) = subscriptions
794 .node_progress_watchers()
795 .get(&SubscriptionParams::Slot)
796 {
797 debug!("slot notify: {slot_info:?}");
798 inc_new_counter_info!("rpc-subscription-notify-slot", 1);
799 notifier.notify(slot_info, sub, false);
800 }
801 }
802 NotificationEntry::SlotUpdate(slot_update) => {
803 if let Some(sub) = subscriptions
804 .node_progress_watchers()
805 .get(&SubscriptionParams::SlotsUpdates)
806 {
807 inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1);
808 notifier.notify(slot_update, sub, false);
809 }
810 }
811 NotificationEntry::Vote((vote_pubkey, ref vote_info, signature)) => {
815 if let Some(sub) = subscriptions
816 .node_progress_watchers()
817 .get(&SubscriptionParams::Vote)
818 {
819 let rpc_vote = RpcVote {
820 vote_pubkey: vote_pubkey.to_string(),
821 slots: vote_info.slots(),
822 hash: bs58::encode(vote_info.hash()).into_string(),
823 timestamp: vote_info.timestamp(),
824 signature: signature.to_string(),
825 };
826 debug!("vote notify: {vote_info:?}");
827 inc_new_counter_info!("rpc-subscription-notify-vote", 1);
828 notifier.notify(&rpc_vote, sub, false);
829 }
830 }
831 NotificationEntry::Root(root) => {
832 if let Some(sub) = subscriptions
833 .node_progress_watchers()
834 .get(&SubscriptionParams::Root)
835 {
836 debug!("root notify: {root:?}");
837 inc_new_counter_info!("rpc-subscription-notify-root", 1);
838 notifier.notify(root, sub, false);
839 }
840 }
841 NotificationEntry::Bank(commitment_slots) => {
842 const SOURCE: &str = "bank";
843 RpcSubscriptions::notify_watchers(
844 max_complete_transaction_status_slot.clone(),
845 subscriptions.commitment_watchers(),
846 &bank_forks,
847 &blockstore,
848 &commitment_slots,
849 ¬ifier,
850 SOURCE,
851 );
852 }
853 NotificationEntry::Gossip(slot) => {
854 let commitment_slots = CommitmentSlots {
855 highest_confirmed_slot: slot,
856 ..CommitmentSlots::default()
857 };
858 const SOURCE: &str = "gossip";
859 RpcSubscriptions::notify_watchers(
860 max_complete_transaction_status_slot.clone(),
861 subscriptions.gossip_watchers(),
862 &bank_forks,
863 &blockstore,
864 &commitment_slots,
865 ¬ifier,
866 SOURCE,
867 );
868 }
869 NotificationEntry::SignaturesReceived((slot, slot_signatures)) => {
870 for slot_signature in &slot_signatures {
871 if let Some(subs) = subscriptions.by_signature().get(slot_signature)
872 {
873 for subscription in subs.values() {
874 if let SubscriptionParams::Signature(params) =
875 subscription.params()
876 {
877 if params.enable_received_notification {
878 notifier.notify(
879 RpcResponse::from(RpcNotificationResponse {
880 context: RpcNotificationContext { slot },
881 value: RpcSignatureResult::ReceivedSignature(
882 ReceivedSignatureResult::ReceivedSignature,
883 ),
884 }),
885 subscription,
886 false,
887 );
888 }
889 } else {
890 error!("invalid params type in visit_by_signature");
891 }
892 }
893 }
894 }
895 }
896 }
897 stats.notification_entry_processing_time_us +=
898 queued_at.elapsed().as_micros() as u64;
899 stats.notification_entry_processing_count += 1;
900 }
901 Err(RecvTimeoutError::Timeout) => {
902 }
904 Err(RecvTimeoutError::Disconnected) => {
905 warn!("RPC Notification thread - sender disconnected");
906 break;
907 }
908 }
909 stats.maybe_submit();
910 }
911 }
912
913 fn notify_watchers(
914 max_complete_transaction_status_slot: Arc<AtomicU64>,
915 subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
916 bank_forks: &Arc<RwLock<BankForks>>,
917 blockstore: &Blockstore,
918 commitment_slots: &CommitmentSlots,
919 notifier: &RpcNotifier,
920 source: &'static str,
921 ) {
922 let mut total_time = Measure::start("notify_watchers");
923
924 let num_accounts_found = AtomicUsize::new(0);
925 let num_accounts_notified = AtomicUsize::new(0);
926
927 let num_blocks_found = AtomicUsize::new(0);
928 let num_blocks_notified = AtomicUsize::new(0);
929
930 let num_logs_found = AtomicUsize::new(0);
931 let num_logs_notified = AtomicUsize::new(0);
932
933 let num_programs_found = AtomicUsize::new(0);
934 let num_programs_notified = AtomicUsize::new(0);
935
936 let num_signatures_found = AtomicUsize::new(0);
937 let num_signatures_notified = AtomicUsize::new(0);
938
939 let subscriptions = subscriptions.into_par_iter();
940 subscriptions.for_each(|(_id, subscription)| {
941 let slot = if let Some(commitment) = subscription.commitment() {
942 if commitment.is_finalized() {
943 Some(commitment_slots.highest_super_majority_root)
944 } else if commitment.is_confirmed() {
945 Some(commitment_slots.highest_confirmed_slot)
946 } else {
947 Some(commitment_slots.slot)
948 }
949 } else {
950 error!("missing commitment in notify_watchers");
951 None
952 };
953 match subscription.params() {
954 SubscriptionParams::Account(params) => {
955 num_accounts_found.fetch_add(1, Ordering::Relaxed);
956 if let Some(slot) = slot {
957 let notified = check_commitment_and_notify(
958 params,
959 subscription,
960 bank_forks,
961 slot,
962 |bank, params| bank.get_account_modified_slot(¶ms.pubkey),
963 filter_account_result,
964 notifier,
965 false,
966 );
967
968 if notified {
969 num_accounts_notified.fetch_add(1, Ordering::Relaxed);
970 }
971 }
972 }
973 SubscriptionParams::Block(params) => {
974 num_blocks_found.fetch_add(1, Ordering::Relaxed);
975 if let Some(slot) = slot {
976 let bank = bank_forks.read().unwrap().get(slot);
977 if let Some(bank) = bank {
978 let mut w_last_unnotified_slot =
993 subscription.last_notified_slot.write().unwrap();
994 if *w_last_unnotified_slot == 0 {
996 *w_last_unnotified_slot = slot;
997 }
998 let mut slots_to_notify: Vec<_> =
999 (*w_last_unnotified_slot..slot).collect();
1000 let ancestors = bank.proper_ancestors_set();
1001 slots_to_notify.retain(|slot| ancestors.contains(slot));
1002 slots_to_notify.push(slot);
1003 for s in slots_to_notify {
1004 if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) {
1009 break;
1010 }
1011
1012 let block_update_result = blockstore
1013 .get_complete_block(s, false)
1014 .map_err(|e| {
1015 error!("get_complete_block error: {e}");
1016 RpcBlockUpdateError::BlockStoreError
1017 })
1018 .and_then(|block| filter_block_result_txs(block, s, params));
1019
1020 match block_update_result {
1021 Ok(block_update) => {
1022 if let Some(block_update) = block_update {
1023 notifier.notify(
1024 RpcResponse::from(RpcNotificationResponse {
1025 context: RpcNotificationContext { slot: s },
1026 value: block_update,
1027 }),
1028 subscription,
1029 false,
1030 );
1031 num_blocks_notified.fetch_add(1, Ordering::Relaxed);
1032 *w_last_unnotified_slot = s + 1;
1035 }
1036 }
1037 Err(err) => {
1038 notifier.notify(
1041 RpcResponse::from(RpcNotificationResponse {
1042 context: RpcNotificationContext { slot: s },
1043 value: RpcBlockUpdate {
1044 slot,
1045 block: None,
1046 err: Some(err),
1047 },
1048 }),
1049 subscription,
1050 false,
1051 );
1052 }
1053 }
1054 }
1055 }
1056 }
1057 }
1058 SubscriptionParams::Logs(params) => {
1059 num_logs_found.fetch_add(1, Ordering::Relaxed);
1060 if let Some(slot) = slot {
1061 let notified = check_commitment_and_notify(
1062 params,
1063 subscription,
1064 bank_forks,
1065 slot,
1066 get_transaction_logs,
1067 filter_logs_results,
1068 notifier,
1069 false,
1070 );
1071
1072 if notified {
1073 num_logs_notified.fetch_add(1, Ordering::Relaxed);
1074 }
1075 }
1076 }
1077 SubscriptionParams::Program(params) => {
1078 num_programs_found.fetch_add(1, Ordering::Relaxed);
1079 if let Some(slot) = slot {
1080 let notified = check_commitment_and_notify(
1081 params,
1082 subscription,
1083 bank_forks,
1084 slot,
1085 |bank, params| {
1086 bank.get_program_accounts_modified_since_parent(¶ms.pubkey)
1087 },
1088 filter_program_results,
1089 notifier,
1090 false,
1091 );
1092
1093 if notified {
1094 num_programs_notified.fetch_add(1, Ordering::Relaxed);
1095 }
1096 }
1097 }
1098 SubscriptionParams::Signature(params) => {
1099 num_signatures_found.fetch_add(1, Ordering::Relaxed);
1100 if let Some(slot) = slot {
1101 let notified = check_commitment_and_notify(
1102 params,
1103 subscription,
1104 bank_forks,
1105 slot,
1106 |bank, params| {
1107 bank.get_signature_status_processed_since_parent(¶ms.signature)
1108 },
1109 filter_signature_result,
1110 notifier,
1111 true, );
1113
1114 if notified {
1115 num_signatures_notified.fetch_add(1, Ordering::Relaxed);
1116 }
1117 }
1118 }
1119 _ => error!("wrong subscription type in alps map"),
1120 }
1121 });
1122
1123 total_time.stop();
1124
1125 let total_notified = num_accounts_notified.load(Ordering::Relaxed)
1126 + num_logs_notified.load(Ordering::Relaxed)
1127 + num_programs_notified.load(Ordering::Relaxed)
1128 + num_signatures_notified.load(Ordering::Relaxed);
1129 let total_ms = total_time.as_ms();
1130 if total_notified > 0 || total_ms > 10 {
1131 debug!(
1132 "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / \
1133 {}",
1134 source,
1135 num_accounts_found.load(Ordering::Relaxed),
1136 num_accounts_notified.load(Ordering::Relaxed),
1137 num_logs_found.load(Ordering::Relaxed),
1138 num_logs_notified.load(Ordering::Relaxed),
1139 num_programs_found.load(Ordering::Relaxed),
1140 num_programs_notified.load(Ordering::Relaxed),
1141 num_signatures_found.load(Ordering::Relaxed),
1142 num_signatures_notified.load(Ordering::Relaxed),
1143 );
1144 datapoint_info!(
1145 "rpc_subscriptions",
1146 ("source", source, String),
1147 (
1148 "num_account_subscriptions",
1149 num_accounts_found.load(Ordering::Relaxed),
1150 i64
1151 ),
1152 (
1153 "num_account_pubkeys_notified",
1154 num_accounts_notified.load(Ordering::Relaxed),
1155 i64
1156 ),
1157 (
1158 "num_logs_subscriptions",
1159 num_logs_found.load(Ordering::Relaxed),
1160 i64
1161 ),
1162 (
1163 "num_logs_notified",
1164 num_logs_notified.load(Ordering::Relaxed),
1165 i64
1166 ),
1167 (
1168 "num_program_subscriptions",
1169 num_programs_found.load(Ordering::Relaxed),
1170 i64
1171 ),
1172 (
1173 "num_programs_notified",
1174 num_programs_notified.load(Ordering::Relaxed),
1175 i64
1176 ),
1177 (
1178 "num_signature_subscriptions",
1179 num_signatures_found.load(Ordering::Relaxed),
1180 i64
1181 ),
1182 (
1183 "num_signatures_notified",
1184 num_signatures_notified.load(Ordering::Relaxed),
1185 i64
1186 ),
1187 ("notifications_time", total_time.as_us() as i64, i64),
1188 );
1189 }
1190 }
1191
1192 fn shutdown(&mut self) -> std::thread::Result<()> {
1193 if self.t_cleanup.is_some() {
1194 info!("RPC Notification thread - shutting down");
1195 self.exit.store(true, Ordering::Relaxed);
1196 let x = self.t_cleanup.take().unwrap().join();
1197 info!("RPC Notification thread - shut down.");
1198 x
1199 } else {
1200 warn!("RPC Notification thread - already shut down.");
1201 Ok(())
1202 }
1203 }
1204
1205 #[cfg(test)]
1206 fn total(&self) -> usize {
1207 self.control.total()
1208 }
1209}
1210
1211#[cfg(test)]
1212pub(crate) mod tests {
1213 use {
1214 super::*,
1215 crate::{
1216 optimistically_confirmed_bank_tracker::{
1217 BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
1218 },
1219 rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
1220 rpc_pubsub::RpcSolPubSubInternal,
1221 rpc_pubsub_service,
1222 },
1223 serial_test::serial,
1224 solana_commitment_config::CommitmentConfig,
1225 solana_keypair::Keypair,
1226 solana_ledger::get_tmp_ledger_path_auto_delete,
1227 solana_message::Message,
1228 solana_rpc_client_api::config::{
1229 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
1230 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
1231 RpcTransactionLogsFilter,
1232 },
1233 solana_runtime::{
1234 commitment::BlockCommitment,
1235 genesis_utils::{create_genesis_config, GenesisConfigInfo},
1236 prioritization_fee_cache::PrioritizationFeeCache,
1237 },
1238 solana_signer::Signer,
1239 solana_stake_interface as stake,
1240 solana_system_interface::{instruction as system_instruction, program as system_program},
1241 solana_system_transaction as system_transaction,
1242 solana_transaction::Transaction,
1243 solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
1244 std::{collections::HashSet, sync::atomic::AtomicU64},
1245 };
1246
1247 struct AccountResult {
1248 lamports: u64,
1249 subscription: u64,
1250 data: &'static str,
1251 space: usize,
1252 }
1253
1254 fn make_account_result(
1255 non_default_account: bool,
1256 account_result: AccountResult,
1257 ) -> serde_json::Value {
1258 json!({
1259 "jsonrpc": "2.0",
1260 "method": "accountNotification",
1261 "params": {
1262 "result": {
1263 "context": { "slot": 1 },
1264 "value": {
1265 "data": account_result.data,
1266 "executable": false,
1267 "lamports": account_result.lamports,
1268 "owner": "11111111111111111111111111111111",
1269 "rentEpoch": if non_default_account {u64::MAX} else {0},
1270 "space": account_result.space,
1271 },
1272 },
1273 "subscription": account_result.subscription,
1274 }
1275 })
1276 }
1277
1278 #[test]
1279 #[serial]
1280 fn test_check_account_subscribe() {
1281 let GenesisConfigInfo {
1282 genesis_config,
1283 mint_keypair,
1284 ..
1285 } = create_genesis_config(100);
1286 let bank = Bank::new_for_tests(&genesis_config);
1287 let blockhash = bank.last_blockhash();
1288 let bank_forks = BankForks::new_rw_arc(bank);
1289 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1290 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1291 bank_forks.write().unwrap().insert(bank1);
1292 let alice = Keypair::new();
1293
1294 let exit = Arc::new(AtomicBool::new(false));
1295 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1296 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1297 exit,
1298 max_complete_transaction_status_slot,
1299 bank_forks.clone(),
1300 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1301 1, 1,
1302 ))),
1303 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
1304 ));
1305
1306 let tx0 = system_transaction::create_account(
1307 &mint_keypair,
1308 &alice,
1309 blockhash,
1310 1,
1311 0,
1312 &system_program::id(),
1313 );
1314 let expected0 = make_account_result(
1315 true,
1316 AccountResult {
1317 lamports: 1,
1318 subscription: 0,
1319 space: 0,
1320 data: "",
1321 },
1322 );
1323
1324 let tx1 = {
1325 let instruction =
1326 system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1);
1327 let message = Message::new(&[instruction], Some(&mint_keypair.pubkey()));
1328 Transaction::new(&[&alice, &mint_keypair], message, blockhash)
1329 };
1330 let expected1 = make_account_result(
1331 false,
1332 AccountResult {
1333 lamports: 0,
1334 subscription: 2,
1335 space: 0,
1336 data: "",
1337 },
1338 );
1339
1340 let tx2 = system_transaction::create_account(
1341 &mint_keypair,
1342 &alice,
1343 blockhash,
1344 1,
1345 1024,
1346 &system_program::id(),
1347 );
1348 let expected2 = make_account_result(
1349 true,
1350 AccountResult {
1351 lamports: 1,
1352 subscription: 4,
1353 space: 1024,
1354 data: "error: data too large for bs58 encoding",
1355 },
1356 );
1357
1358 let subscribe_cases = vec![
1359 (alice.pubkey(), tx0, expected0),
1360 (alice.pubkey(), tx1, expected1),
1361 (alice.pubkey(), tx2, expected2),
1362 ];
1363
1364 for (pubkey, tx, expected) in subscribe_cases {
1365 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1366
1367 let sub_id = rpc
1368 .account_subscribe(
1369 pubkey.to_string(),
1370 Some(RpcAccountInfoConfig {
1371 commitment: Some(CommitmentConfig::processed()),
1372 encoding: None,
1373 data_slice: None,
1374 min_context_slot: None,
1375 }),
1376 )
1377 .unwrap();
1378
1379 subscriptions
1380 .control
1381 .assert_subscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1382 pubkey,
1383 commitment: CommitmentConfig::processed(),
1384 data_slice: None,
1385 encoding: UiAccountEncoding::Binary,
1386 }));
1387
1388 rpc.block_until_processed(&subscriptions);
1389
1390 bank_forks
1391 .read()
1392 .unwrap()
1393 .get(1)
1394 .unwrap()
1395 .process_transaction(&tx)
1396 .unwrap();
1397 let commitment_slots = CommitmentSlots {
1398 slot: 1,
1399 ..CommitmentSlots::default()
1400 };
1401 subscriptions.notify_subscribers(commitment_slots);
1402 let response = receiver.recv();
1403
1404 assert_eq!(
1405 expected,
1406 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1407 );
1408 rpc.account_unsubscribe(sub_id).unwrap();
1409
1410 subscriptions
1411 .control
1412 .assert_unsubscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1413 pubkey,
1414 commitment: CommitmentConfig::processed(),
1415 data_slice: None,
1416 encoding: UiAccountEncoding::Binary,
1417 }));
1418 }
1419 }
1420
1421 #[test]
1422 #[serial]
1423 fn test_check_confirmed_block_subscribe() {
1424 let exit = Arc::new(AtomicBool::new(false));
1425 let GenesisConfigInfo {
1426 genesis_config,
1427 mint_keypair,
1428 ..
1429 } = create_genesis_config(10_000);
1430 let bank = Bank::new_for_tests(&genesis_config);
1431 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1432 let bank_forks = BankForks::new_rw_arc(bank);
1433 let optimistically_confirmed_bank =
1434 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1435 let ledger_path = get_tmp_ledger_path_auto_delete!();
1436 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1437 let blockstore = Arc::new(blockstore);
1438 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1439 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1440 exit,
1441 max_complete_transaction_status_slot,
1442 blockstore.clone(),
1443 bank_forks.clone(),
1444 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1445 optimistically_confirmed_bank,
1446 ));
1447 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1448 let filter = RpcBlockSubscribeFilter::All;
1449 let config = RpcBlockSubscribeConfig {
1450 commitment: Some(CommitmentConfig::confirmed()),
1451 encoding: Some(UiTransactionEncoding::Json),
1452 transaction_details: Some(TransactionDetails::Signatures),
1453 show_rewards: None,
1454 max_supported_transaction_version: None,
1455 };
1456 let params = BlockSubscriptionParams {
1457 kind: BlockSubscriptionKind::All,
1458 commitment: config.commitment.unwrap(),
1459 encoding: config.encoding.unwrap(),
1460 transaction_details: config.transaction_details.unwrap(),
1461 show_rewards: config.show_rewards.unwrap_or_default(),
1462 max_supported_transaction_version: config.max_supported_transaction_version,
1463 };
1464 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1465
1466 subscriptions
1467 .control
1468 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1469
1470 let bank = bank_forks.read().unwrap().working_bank();
1471 let keypair1 = Keypair::new();
1472 let keypair2 = Keypair::new();
1473 let keypair3 = Keypair::new();
1474 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1475 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1476 .unwrap();
1477 populate_blockstore_for_tests(
1478 create_test_transaction_entries(
1479 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1480 bank.clone(),
1481 )
1482 .0,
1483 bank,
1484 blockstore.clone(),
1485 max_complete_transaction_status_slot,
1486 );
1487
1488 let slot = 0;
1489 subscriptions.notify_gossip_subscribers(slot);
1490 let actual_resp = receiver.recv();
1491 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1492
1493 let confirmed_block =
1494 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1495 let block = confirmed_block
1496 .encode_with_options(
1497 params.encoding,
1498 BlockEncodingOptions {
1499 transaction_details: params.transaction_details,
1500 show_rewards: false,
1501 max_supported_transaction_version: None,
1502 },
1503 )
1504 .unwrap();
1505 let expected_resp = RpcBlockUpdate {
1506 slot,
1507 block: Some(block),
1508 err: None,
1509 };
1510 let expected_resp = json!({
1511 "jsonrpc": "2.0",
1512 "method": "blockNotification",
1513 "params": {
1514 "result": {
1515 "context": { "slot": slot },
1516 "value": expected_resp,
1517 },
1518 "subscription": 0,
1519 }
1520 });
1521 assert_eq!(expected_resp, actual_resp);
1522
1523 subscriptions.notify_subscribers(CommitmentSlots {
1525 slot,
1526 root: slot,
1527 highest_confirmed_slot: slot,
1528 highest_super_majority_root: slot,
1529 });
1530 let should_err = receiver.recv_timeout(Duration::from_millis(300));
1531 assert!(should_err.is_err());
1532
1533 rpc.slot_unsubscribe(sub_id).unwrap();
1534 subscriptions
1535 .control
1536 .assert_unsubscribed(&SubscriptionParams::Block(params));
1537 }
1538
1539 #[test]
1540 #[serial]
1541 fn test_check_confirmed_block_subscribe_with_mentions() {
1542 let exit = Arc::new(AtomicBool::new(false));
1543 let GenesisConfigInfo {
1544 genesis_config,
1545 mint_keypair,
1546 ..
1547 } = create_genesis_config(10_000);
1548 let bank = Bank::new_for_tests(&genesis_config);
1549 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1550 let bank_forks = BankForks::new_rw_arc(bank);
1551 let optimistically_confirmed_bank =
1552 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1553 let ledger_path = get_tmp_ledger_path_auto_delete!();
1554 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1555 let blockstore = Arc::new(blockstore);
1556 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1557 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1558 exit,
1559 max_complete_transaction_status_slot,
1560 blockstore.clone(),
1561 bank_forks.clone(),
1562 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1563 optimistically_confirmed_bank,
1564 ));
1565 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1566 let keypair1 = Keypair::new();
1567 let filter =
1568 RpcBlockSubscribeFilter::MentionsAccountOrProgram(keypair1.pubkey().to_string());
1569 let config = RpcBlockSubscribeConfig {
1570 commitment: Some(CommitmentConfig::confirmed()),
1571 encoding: Some(UiTransactionEncoding::Json),
1572 transaction_details: Some(TransactionDetails::Signatures),
1573 show_rewards: None,
1574 max_supported_transaction_version: None,
1575 };
1576 let params = BlockSubscriptionParams {
1577 kind: BlockSubscriptionKind::MentionsAccountOrProgram(keypair1.pubkey()),
1578 commitment: config.commitment.unwrap(),
1579 encoding: config.encoding.unwrap(),
1580 transaction_details: config.transaction_details.unwrap(),
1581 show_rewards: config.show_rewards.unwrap_or_default(),
1582 max_supported_transaction_version: config.max_supported_transaction_version,
1583 };
1584 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1585
1586 subscriptions
1587 .control
1588 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1589
1590 let bank = bank_forks.read().unwrap().working_bank();
1591 let keypair2 = Keypair::new();
1592 let keypair3 = Keypair::new();
1593 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1594 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1595 .unwrap();
1596 populate_blockstore_for_tests(
1597 create_test_transaction_entries(
1598 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1599 bank.clone(),
1600 )
1601 .0,
1602 bank,
1603 blockstore.clone(),
1604 max_complete_transaction_status_slot,
1605 );
1606
1607 let slot = 0;
1608 subscriptions.notify_gossip_subscribers(slot);
1609 let actual_resp = receiver.recv();
1610 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1611
1612 let mut confirmed_block =
1614 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1615 confirmed_block.transactions.retain(|tx_with_meta| {
1616 tx_with_meta
1617 .account_keys()
1618 .iter()
1619 .any(|key| key == &keypair1.pubkey())
1620 });
1621 let block = confirmed_block
1622 .encode_with_options(
1623 params.encoding,
1624 BlockEncodingOptions {
1625 transaction_details: params.transaction_details,
1626 show_rewards: false,
1627 max_supported_transaction_version: None,
1628 },
1629 )
1630 .unwrap();
1631 let expected_resp = RpcBlockUpdate {
1632 slot,
1633 block: Some(block),
1634 err: None,
1635 };
1636 let expected_resp = json!({
1637 "jsonrpc": "2.0",
1638 "method": "blockNotification",
1639 "params": {
1640 "result": {
1641 "context": { "slot": slot },
1642 "value": expected_resp,
1643 },
1644 "subscription": 0,
1645 }
1646 });
1647 assert_eq!(expected_resp, actual_resp);
1648
1649 rpc.slot_unsubscribe(sub_id).unwrap();
1650 subscriptions
1651 .control
1652 .assert_unsubscribed(&SubscriptionParams::Block(params));
1653 }
1654
1655 #[test]
1656 #[serial]
1657 fn test_check_finalized_block_subscribe() {
1658 let exit = Arc::new(AtomicBool::new(false));
1659 let GenesisConfigInfo {
1660 genesis_config,
1661 mint_keypair,
1662 ..
1663 } = create_genesis_config(10_000);
1664 let bank = Bank::new_for_tests(&genesis_config);
1665 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1666 let bank_forks = BankForks::new_rw_arc(bank);
1667 let optimistically_confirmed_bank =
1668 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1669 let ledger_path = get_tmp_ledger_path_auto_delete!();
1670 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1671 let blockstore = Arc::new(blockstore);
1672 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1673 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1674 exit,
1675 max_complete_transaction_status_slot,
1676 blockstore.clone(),
1677 bank_forks.clone(),
1678 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1679 optimistically_confirmed_bank,
1680 ));
1681 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1682 let filter = RpcBlockSubscribeFilter::All;
1683 let config = RpcBlockSubscribeConfig {
1684 commitment: Some(CommitmentConfig::finalized()),
1685 encoding: Some(UiTransactionEncoding::Json),
1686 transaction_details: Some(TransactionDetails::Signatures),
1687 show_rewards: None,
1688 max_supported_transaction_version: None,
1689 };
1690 let params = BlockSubscriptionParams {
1691 kind: BlockSubscriptionKind::All,
1692 commitment: config.commitment.unwrap(),
1693 encoding: config.encoding.unwrap(),
1694 transaction_details: config.transaction_details.unwrap(),
1695 show_rewards: config.show_rewards.unwrap_or_default(),
1696 max_supported_transaction_version: config.max_supported_transaction_version,
1697 };
1698 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1699 subscriptions
1700 .control
1701 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1702
1703 let bank = bank_forks.read().unwrap().working_bank();
1704 let keypair1 = Keypair::new();
1705 let keypair2 = Keypair::new();
1706 let keypair3 = Keypair::new();
1707 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1708 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1709 .unwrap();
1710 populate_blockstore_for_tests(
1711 create_test_transaction_entries(
1712 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1713 bank.clone(),
1714 )
1715 .0,
1716 bank,
1717 blockstore.clone(),
1718 max_complete_transaction_status_slot,
1719 );
1720
1721 let slot = 0;
1722 subscriptions.notify_subscribers(CommitmentSlots {
1723 slot,
1724 root: slot,
1725 highest_confirmed_slot: slot,
1726 highest_super_majority_root: slot,
1727 });
1728 let actual_resp = receiver.recv();
1729 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1730
1731 let confirmed_block =
1732 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1733 let block = confirmed_block
1734 .encode_with_options(
1735 params.encoding,
1736 BlockEncodingOptions {
1737 transaction_details: params.transaction_details,
1738 show_rewards: false,
1739 max_supported_transaction_version: None,
1740 },
1741 )
1742 .unwrap();
1743 let expected_resp = RpcBlockUpdate {
1744 slot,
1745 block: Some(block),
1746 err: None,
1747 };
1748 let expected_resp = json!({
1749 "jsonrpc": "2.0",
1750 "method": "blockNotification",
1751 "params": {
1752 "result": {
1753 "context": { "slot": slot },
1754 "value": expected_resp,
1755 },
1756 "subscription": 0,
1757 }
1758 });
1759 assert_eq!(expected_resp, actual_resp);
1760
1761 subscriptions.notify_gossip_subscribers(slot);
1763 let should_err = receiver.recv_timeout(Duration::from_millis(300));
1764 assert!(should_err.is_err());
1765
1766 rpc.slot_unsubscribe(sub_id).unwrap();
1767 subscriptions
1768 .control
1769 .assert_unsubscribed(&SubscriptionParams::Block(params));
1770 }
1771
1772 #[test]
1773 #[serial]
1774 fn test_check_program_subscribe() {
1775 let GenesisConfigInfo {
1776 genesis_config,
1777 mint_keypair,
1778 ..
1779 } = create_genesis_config(100);
1780 let bank = Bank::new_for_tests(&genesis_config);
1781 let blockhash = bank.last_blockhash();
1782 let bank_forks = BankForks::new_rw_arc(bank);
1783 let alice = Keypair::new();
1784 let tx = system_transaction::create_account(
1785 &mint_keypair,
1786 &alice,
1787 blockhash,
1788 1,
1789 16,
1790 &stake::program::id(),
1791 );
1792 bank_forks
1793 .read()
1794 .unwrap()
1795 .get(0)
1796 .unwrap()
1797 .process_transaction(&tx)
1798 .unwrap();
1799
1800 let exit = Arc::new(AtomicBool::new(false));
1801 let optimistically_confirmed_bank =
1802 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1803 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1804 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1805 exit,
1806 max_complete_transaction_status_slot,
1807 bank_forks,
1808 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1809 optimistically_confirmed_bank,
1810 ));
1811 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1812 let sub_id = rpc
1813 .program_subscribe(
1814 stake::program::id().to_string(),
1815 Some(RpcProgramAccountsConfig {
1816 account_config: RpcAccountInfoConfig {
1817 commitment: Some(CommitmentConfig::processed()),
1818 ..RpcAccountInfoConfig::default()
1819 },
1820 ..RpcProgramAccountsConfig::default()
1821 }),
1822 )
1823 .unwrap();
1824
1825 subscriptions
1826 .control
1827 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1828 pubkey: stake::program::id(),
1829 filters: Vec::new(),
1830 commitment: CommitmentConfig::processed(),
1831 data_slice: None,
1832 encoding: UiAccountEncoding::Binary,
1833 with_context: false,
1834 }));
1835
1836 subscriptions.notify_subscribers(CommitmentSlots::default());
1837 let response = receiver.recv();
1838 let expected = json!({
1839 "jsonrpc": "2.0",
1840 "method": "programNotification",
1841 "params": {
1842 "result": {
1843 "context": { "slot": 0 },
1844 "value": {
1845 "account": {
1846 "data": "1111111111111111",
1847 "executable": false,
1848 "lamports": 1,
1849 "owner": "Stake11111111111111111111111111111111111111",
1850 "rentEpoch": u64::MAX,
1851 "space": 16,
1852 },
1853 "pubkey": alice.pubkey().to_string(),
1854 },
1855 },
1856 "subscription": 0,
1857 }
1858 });
1859 assert_eq!(
1860 expected,
1861 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1862 );
1863
1864 rpc.program_unsubscribe(sub_id).unwrap();
1865 subscriptions
1866 .control
1867 .assert_unsubscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1868 pubkey: stake::program::id(),
1869 filters: Vec::new(),
1870 commitment: CommitmentConfig::processed(),
1871 data_slice: None,
1872 encoding: UiAccountEncoding::Binary,
1873 with_context: false,
1874 }));
1875 }
1876
1877 #[test]
1878 #[serial]
1879 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() {
1880 let GenesisConfigInfo {
1884 genesis_config,
1885 mint_keypair,
1886 ..
1887 } = create_genesis_config(100);
1888 let bank = Bank::new_for_tests(&genesis_config);
1889
1890 let blockhash = bank.last_blockhash();
1891 let bank_forks = BankForks::new_rw_arc(bank);
1892
1893 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1894 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1895 bank_forks.write().unwrap().insert(bank1);
1896 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
1897
1898 let alice = Keypair::new();
1900 let tx = system_transaction::create_account(
1901 &mint_keypair,
1902 &alice,
1903 blockhash,
1904 1,
1905 16,
1906 &stake::program::id(),
1907 );
1908
1909 bank1.process_transaction(&tx).unwrap();
1910
1911 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
1912 bank_forks.write().unwrap().insert(bank2);
1913
1914 let bob = Keypair::new();
1916 let tx = system_transaction::create_account(
1917 &mint_keypair,
1918 &bob,
1919 blockhash,
1920 2,
1921 16,
1922 &stake::program::id(),
1923 );
1924 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
1925
1926 bank2.process_transaction(&tx).unwrap();
1927
1928 let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
1929 bank_forks.write().unwrap().insert(bank3);
1930
1931 let joe = Keypair::new();
1933 let tx = system_transaction::create_account(
1934 &mint_keypair,
1935 &joe,
1936 blockhash,
1937 3,
1938 16,
1939 &stake::program::id(),
1940 );
1941 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
1942
1943 bank3.process_transaction(&tx).unwrap();
1944
1945 let exit = Arc::new(AtomicBool::new(false));
1947 let optimistically_confirmed_bank =
1948 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1949 let mut pending_optimistically_confirmed_banks = HashSet::new();
1950 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1951 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1952 exit,
1953 max_complete_transaction_status_slot,
1954 bank_forks.clone(),
1955 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1956 1, 1,
1957 ))),
1958 optimistically_confirmed_bank.clone(),
1959 ));
1960
1961 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1962
1963 let sub_id = rpc
1964 .program_subscribe(
1965 stake::program::id().to_string(),
1966 Some(RpcProgramAccountsConfig {
1967 account_config: RpcAccountInfoConfig {
1968 commitment: Some(CommitmentConfig::confirmed()),
1969 ..RpcAccountInfoConfig::default()
1970 },
1971 ..RpcProgramAccountsConfig::default()
1972 }),
1973 )
1974 .unwrap();
1975
1976 subscriptions
1977 .control
1978 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1979 pubkey: stake::program::id(),
1980 filters: Vec::new(),
1981 encoding: UiAccountEncoding::Binary,
1982 data_slice: None,
1983 commitment: CommitmentConfig::confirmed(),
1984 with_context: false,
1985 }));
1986
1987 let mut highest_confirmed_slot: Slot = 0;
1988 let mut highest_root_slot: Slot = 0;
1989 let mut last_notified_confirmed_slot: Slot = 0;
1990 OptimisticallyConfirmedBankTracker::process_notification(
1993 (
1994 BankNotification::OptimisticallyConfirmed(3),
1995 None, ),
1997 &bank_forks,
1998 &optimistically_confirmed_bank,
1999 &subscriptions,
2000 &mut pending_optimistically_confirmed_banks,
2001 &mut last_notified_confirmed_slot,
2002 &mut highest_confirmed_slot,
2003 &mut highest_root_slot,
2004 &None,
2005 &PrioritizationFeeCache::default(),
2006 &None, );
2008
2009 let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2011 json!({
2012 "jsonrpc": "2.0",
2013 "method": "programNotification",
2014 "params": {
2015 "result": {
2016 "context": { "slot": slot },
2017 "value": {
2018 "account": {
2019 "data": "1111111111111111",
2020 "executable": false,
2021 "lamports": lamports,
2022 "owner": "Stake11111111111111111111111111111111111111",
2023 "rentEpoch": u64::MAX,
2024 "space": 16,
2025 },
2026 "pubkey": pubkey,
2027 },
2028 },
2029 "subscription": subscription,
2030 }
2031 })
2032 };
2033
2034 let response = receiver.recv();
2035 let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2036 assert_eq!(
2037 expected,
2038 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2039 );
2040
2041 let response = receiver.recv();
2042 let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2043 assert_eq!(
2044 expected,
2045 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2046 );
2047
2048 bank3.freeze();
2049 OptimisticallyConfirmedBankTracker::process_notification(
2050 (
2051 BankNotification::Frozen(bank3),
2052 None, ),
2054 &bank_forks,
2055 &optimistically_confirmed_bank,
2056 &subscriptions,
2057 &mut pending_optimistically_confirmed_banks,
2058 &mut last_notified_confirmed_slot,
2059 &mut highest_confirmed_slot,
2060 &mut highest_root_slot,
2061 &None,
2062 &PrioritizationFeeCache::default(),
2063 &None, );
2065
2066 let response = receiver.recv();
2067 let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2068 assert_eq!(
2069 expected,
2070 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2071 );
2072 rpc.program_unsubscribe(sub_id).unwrap();
2073 }
2074
2075 #[test]
2076 #[serial]
2077 #[should_panic]
2078 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications(
2079 ) {
2080 let GenesisConfigInfo {
2084 genesis_config,
2085 mint_keypair,
2086 ..
2087 } = create_genesis_config(100);
2088 let bank = Bank::new_for_tests(&genesis_config);
2089
2090 let blockhash = bank.last_blockhash();
2091 let bank_forks = BankForks::new_rw_arc(bank);
2092
2093 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2094 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2095 bank_forks.write().unwrap().insert(bank1);
2096 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2097
2098 let alice = Keypair::new();
2100 let tx = system_transaction::create_account(
2101 &mint_keypair,
2102 &alice,
2103 blockhash,
2104 1,
2105 16,
2106 &stake::program::id(),
2107 );
2108
2109 bank1.process_transaction(&tx).unwrap();
2110
2111 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2112 bank_forks.write().unwrap().insert(bank2);
2113
2114 let bob = Keypair::new();
2116 let tx = system_transaction::create_account(
2117 &mint_keypair,
2118 &bob,
2119 blockhash,
2120 2,
2121 16,
2122 &stake::program::id(),
2123 );
2124 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2125
2126 bank2.process_transaction(&tx).unwrap();
2127
2128 let exit = Arc::new(AtomicBool::new(false));
2130 let optimistically_confirmed_bank =
2131 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2132 let mut pending_optimistically_confirmed_banks = HashSet::new();
2133 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2134 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2135 exit,
2136 max_complete_transaction_status_slot,
2137 bank_forks.clone(),
2138 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2139 1, 1,
2140 ))),
2141 optimistically_confirmed_bank.clone(),
2142 ));
2143 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2144 rpc.program_subscribe(
2145 stake::program::id().to_string(),
2146 Some(RpcProgramAccountsConfig {
2147 account_config: RpcAccountInfoConfig {
2148 commitment: Some(CommitmentConfig::confirmed()),
2149 ..RpcAccountInfoConfig::default()
2150 },
2151 ..RpcProgramAccountsConfig::default()
2152 }),
2153 )
2154 .unwrap();
2155
2156 subscriptions
2157 .control
2158 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2159 pubkey: stake::program::id(),
2160 filters: Vec::new(),
2161 encoding: UiAccountEncoding::Binary,
2162 data_slice: None,
2163 commitment: CommitmentConfig::confirmed(),
2164 with_context: false,
2165 }));
2166
2167 let mut highest_confirmed_slot: Slot = 0;
2168 let mut highest_root_slot: Slot = 0;
2169 let mut last_notified_confirmed_slot: Slot = 0;
2170 OptimisticallyConfirmedBankTracker::process_notification(
2173 (
2174 BankNotification::OptimisticallyConfirmed(3),
2175 None, ),
2177 &bank_forks,
2178 &optimistically_confirmed_bank,
2179 &subscriptions,
2180 &mut pending_optimistically_confirmed_banks,
2181 &mut last_notified_confirmed_slot,
2182 &mut highest_confirmed_slot,
2183 &mut highest_root_slot,
2184 &None,
2185 &PrioritizationFeeCache::default(),
2186 &None, );
2188
2189 let _response = receiver.recv();
2191 }
2192
2193 #[test]
2194 #[serial]
2195 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() {
2196 let GenesisConfigInfo {
2201 genesis_config,
2202 mint_keypair,
2203 ..
2204 } = create_genesis_config(100);
2205 let bank = Bank::new_for_tests(&genesis_config);
2206
2207 let blockhash = bank.last_blockhash();
2208 let bank_forks = BankForks::new_rw_arc(bank);
2209
2210 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2211 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2212 bank_forks.write().unwrap().insert(bank1);
2213 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2214
2215 let alice = Keypair::new();
2217 let tx = system_transaction::create_account(
2218 &mint_keypair,
2219 &alice,
2220 blockhash,
2221 1,
2222 16,
2223 &stake::program::id(),
2224 );
2225
2226 bank1.process_transaction(&tx).unwrap();
2227
2228 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2229 bank_forks.write().unwrap().insert(bank2);
2230
2231 let bob = Keypair::new();
2233 let tx = system_transaction::create_account(
2234 &mint_keypair,
2235 &bob,
2236 blockhash,
2237 2,
2238 16,
2239 &stake::program::id(),
2240 );
2241 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2242
2243 bank2.process_transaction(&tx).unwrap();
2244
2245 let exit = Arc::new(AtomicBool::new(false));
2247 let optimistically_confirmed_bank =
2248 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2249 let mut pending_optimistically_confirmed_banks = HashSet::new();
2250 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2251 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2252 exit,
2253 max_complete_transaction_status_slot,
2254 bank_forks.clone(),
2255 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2256 1, 1,
2257 ))),
2258 optimistically_confirmed_bank.clone(),
2259 ));
2260 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2261 let sub_id = rpc
2262 .program_subscribe(
2263 stake::program::id().to_string(),
2264 Some(RpcProgramAccountsConfig {
2265 account_config: RpcAccountInfoConfig {
2266 commitment: Some(CommitmentConfig::confirmed()),
2267 ..RpcAccountInfoConfig::default()
2268 },
2269 ..RpcProgramAccountsConfig::default()
2270 }),
2271 )
2272 .unwrap();
2273
2274 subscriptions
2275 .control
2276 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2277 pubkey: stake::program::id(),
2278 filters: Vec::new(),
2279 encoding: UiAccountEncoding::Binary,
2280 data_slice: None,
2281 commitment: CommitmentConfig::confirmed(),
2282 with_context: false,
2283 }));
2284
2285 let mut highest_confirmed_slot: Slot = 0;
2286 let mut highest_root_slot: Slot = 0;
2287 let mut last_notified_confirmed_slot: Slot = 0;
2288 OptimisticallyConfirmedBankTracker::process_notification(
2292 (
2293 BankNotification::OptimisticallyConfirmed(3),
2294 None, ),
2296 &bank_forks,
2297 &optimistically_confirmed_bank,
2298 &subscriptions,
2299 &mut pending_optimistically_confirmed_banks,
2300 &mut last_notified_confirmed_slot,
2301 &mut highest_confirmed_slot,
2302 &mut highest_root_slot,
2303 &None,
2304 &PrioritizationFeeCache::default(),
2305 &None, );
2307
2308 let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2310 json!({
2311 "jsonrpc": "2.0",
2312 "method": "programNotification",
2313 "params": {
2314 "result": {
2315 "context": { "slot": slot },
2316 "value": {
2317 "account": {
2318 "data": "1111111111111111",
2319 "executable": false,
2320 "lamports": lamports,
2321 "owner": "Stake11111111111111111111111111111111111111",
2322 "rentEpoch": u64::MAX,
2323 "space": 16,
2324 },
2325 "pubkey": pubkey,
2326 },
2327 },
2328 "subscription": subscription,
2329 }
2330 })
2331 };
2332
2333 let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
2334 bank_forks.write().unwrap().insert(bank3);
2335
2336 let joe = Keypair::new();
2338 let tx = system_transaction::create_account(
2339 &mint_keypair,
2340 &joe,
2341 blockhash,
2342 3,
2343 16,
2344 &stake::program::id(),
2345 );
2346 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
2347
2348 bank3.process_transaction(&tx).unwrap();
2349 bank3.freeze();
2350 OptimisticallyConfirmedBankTracker::process_notification(
2351 (
2352 BankNotification::Frozen(bank3),
2353 None, ),
2355 &bank_forks,
2356 &optimistically_confirmed_bank,
2357 &subscriptions,
2358 &mut pending_optimistically_confirmed_banks,
2359 &mut last_notified_confirmed_slot,
2360 &mut highest_confirmed_slot,
2361 &mut highest_root_slot,
2362 &None,
2363 &PrioritizationFeeCache::default(),
2364 &None, );
2366
2367 let response = receiver.recv();
2368 let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2369 assert_eq!(
2370 expected,
2371 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2372 );
2373
2374 let response = receiver.recv();
2375 let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2376 assert_eq!(
2377 expected,
2378 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2379 );
2380
2381 let response = receiver.recv();
2382 let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2383 assert_eq!(
2384 expected,
2385 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2386 );
2387 rpc.program_unsubscribe(sub_id).unwrap();
2388 }
2389
2390 #[test]
2391 #[serial]
2392 fn test_check_signature_subscribe() {
2393 let GenesisConfigInfo {
2394 genesis_config,
2395 mint_keypair,
2396 ..
2397 } = create_genesis_config(100);
2398 let bank = Bank::new_for_tests(&genesis_config);
2399 let blockhash = bank.last_blockhash();
2400 let bank_forks = BankForks::new_rw_arc(bank);
2401 let alice = Keypair::new();
2402
2403 let past_bank_tx =
2404 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
2405 let unprocessed_tx =
2406 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
2407 let processed_tx =
2408 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
2409
2410 bank_forks
2411 .read()
2412 .unwrap()
2413 .get(0)
2414 .unwrap()
2415 .process_transaction(&past_bank_tx)
2416 .unwrap();
2417
2418 let next_bank = Bank::new_from_parent(
2419 bank_forks.read().unwrap().get(0).unwrap(),
2420 &solana_pubkey::new_rand(),
2421 1,
2422 );
2423 bank_forks.write().unwrap().insert(next_bank);
2424
2425 bank_forks
2426 .read()
2427 .unwrap()
2428 .get(1)
2429 .unwrap()
2430 .process_transaction(&processed_tx)
2431 .unwrap();
2432 let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
2433
2434 let mut cache0 = BlockCommitment::default();
2435 cache0.increase_confirmation_stake(1, 10);
2436 let cache1 = BlockCommitment::default();
2437
2438 let mut block_commitment = HashMap::new();
2439 block_commitment.entry(0).or_insert(cache0);
2440 block_commitment.entry(1).or_insert(cache1);
2441 let block_commitment_cache = BlockCommitmentCache::new(
2442 block_commitment,
2443 10,
2444 CommitmentSlots {
2445 slot: bank1.slot(),
2446 ..CommitmentSlots::default()
2447 },
2448 );
2449
2450 let exit = Arc::new(AtomicBool::new(false));
2451 let optimistically_confirmed_bank =
2452 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2453 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2454 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2455 exit,
2456 max_complete_transaction_status_slot,
2457 bank_forks,
2458 Arc::new(RwLock::new(block_commitment_cache)),
2459 optimistically_confirmed_bank,
2460 ));
2461
2462 let (past_bank_rpc1, mut past_bank_receiver1) =
2463 rpc_pubsub_service::test_connection(&subscriptions);
2464 let (past_bank_rpc2, mut past_bank_receiver2) =
2465 rpc_pubsub_service::test_connection(&subscriptions);
2466 let (processed_rpc, mut processed_receiver) =
2467 rpc_pubsub_service::test_connection(&subscriptions);
2468 let (another_rpc, _another_receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2469 let (processed_rpc3, mut processed_receiver3) =
2470 rpc_pubsub_service::test_connection(&subscriptions);
2471
2472 let past_bank_sub_id1 = past_bank_rpc1
2473 .signature_subscribe(
2474 past_bank_tx.signatures[0].to_string(),
2475 Some(RpcSignatureSubscribeConfig {
2476 commitment: Some(CommitmentConfig::processed()),
2477 enable_received_notification: Some(false),
2478 }),
2479 )
2480 .unwrap();
2481 let past_bank_sub_id2 = past_bank_rpc2
2482 .signature_subscribe(
2483 past_bank_tx.signatures[0].to_string(),
2484 Some(RpcSignatureSubscribeConfig {
2485 commitment: Some(CommitmentConfig::finalized()),
2486 enable_received_notification: Some(false),
2487 }),
2488 )
2489 .unwrap();
2490 let processed_sub_id = processed_rpc
2491 .signature_subscribe(
2492 processed_tx.signatures[0].to_string(),
2493 Some(RpcSignatureSubscribeConfig {
2494 commitment: Some(CommitmentConfig::processed()),
2495 enable_received_notification: Some(false),
2496 }),
2497 )
2498 .unwrap();
2499 another_rpc
2500 .signature_subscribe(
2501 unprocessed_tx.signatures[0].to_string(),
2502 Some(RpcSignatureSubscribeConfig {
2503 commitment: Some(CommitmentConfig::processed()),
2504 enable_received_notification: Some(false),
2505 }),
2506 )
2507 .unwrap();
2508
2509 let processed_sub_id3 = processed_rpc3
2511 .signature_subscribe(
2512 unprocessed_tx.signatures[0].to_string(),
2513 Some(RpcSignatureSubscribeConfig {
2514 commitment: Some(CommitmentConfig::processed()),
2515 enable_received_notification: Some(true),
2516 }),
2517 )
2518 .unwrap();
2519
2520 assert!(subscriptions
2521 .control
2522 .signature_subscribed(&unprocessed_tx.signatures[0]));
2523 assert!(subscriptions
2524 .control
2525 .signature_subscribed(&processed_tx.signatures[0]));
2526
2527 let mut commitment_slots = CommitmentSlots::default();
2528 let received_slot = 1;
2529 commitment_slots.slot = received_slot;
2530 subscriptions
2531 .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
2532 subscriptions.notify_subscribers(commitment_slots);
2533 let expected_res =
2534 RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
2535 let received_expected_res =
2536 RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
2537 struct Notification {
2538 slot: Slot,
2539 id: u64,
2540 }
2541
2542 let expected_notification =
2543 |exp: Notification, expected_res: &RpcSignatureResult| -> String {
2544 let json = json!({
2545 "jsonrpc": "2.0",
2546 "method": "signatureNotification",
2547 "params": {
2548 "result": {
2549 "context": { "slot": exp.slot },
2550 "value": expected_res,
2551 },
2552 "subscription": exp.id,
2553 }
2554 });
2555 serde_json::to_string(&json).unwrap()
2556 };
2557
2558 let expected = expected_notification(
2561 Notification {
2562 slot: 1,
2563 id: past_bank_sub_id1.into(),
2564 },
2565 &expected_res,
2566 );
2567 let response = past_bank_receiver1.recv();
2568 assert_eq!(expected, response);
2569
2570 let expected = expected_notification(
2573 Notification {
2574 slot: 0,
2575 id: past_bank_sub_id2.into(),
2576 },
2577 &expected_res,
2578 );
2579 let response = past_bank_receiver2.recv();
2580 assert_eq!(expected, response);
2581
2582 let expected = expected_notification(
2583 Notification {
2584 slot: 1,
2585 id: processed_sub_id.into(),
2586 },
2587 &expected_res,
2588 );
2589 let response = processed_receiver.recv();
2590 assert_eq!(expected, response);
2591
2592 let expected = expected_notification(
2594 Notification {
2595 slot: received_slot,
2596 id: processed_sub_id3.into(),
2597 },
2598 &received_expected_res,
2599 );
2600 let response = processed_receiver3.recv();
2601 assert_eq!(expected, response);
2602
2603 assert!(!subscriptions
2606 .control
2607 .signature_subscribed(&processed_tx.signatures[0]));
2608 assert!(!subscriptions
2609 .control
2610 .signature_subscribed(&past_bank_tx.signatures[0]));
2611
2612 assert!(subscriptions
2614 .control
2615 .signature_subscribed(&unprocessed_tx.signatures[0]));
2616 }
2617
2618 #[test]
2619 #[serial]
2620 fn test_check_slot_subscribe() {
2621 let exit = Arc::new(AtomicBool::new(false));
2622 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2623 let bank = Bank::new_for_tests(&genesis_config);
2624 let bank_forks = BankForks::new_rw_arc(bank);
2625 let optimistically_confirmed_bank =
2626 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2627 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2628 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2629 exit,
2630 max_complete_transaction_status_slot,
2631 bank_forks,
2632 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2633 optimistically_confirmed_bank,
2634 ));
2635 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2636 let sub_id = rpc.slot_subscribe().unwrap();
2637
2638 subscriptions
2639 .control
2640 .assert_subscribed(&SubscriptionParams::Slot);
2641
2642 subscriptions.notify_slot(0, 0, 0);
2643 let response = receiver.recv();
2644
2645 let expected_res = SlotInfo {
2646 parent: 0,
2647 slot: 0,
2648 root: 0,
2649 };
2650 let expected_res_str = serde_json::to_string(&expected_res).unwrap();
2651
2652 let expected = format!(
2653 r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2654 );
2655 assert_eq!(expected, response);
2656
2657 rpc.slot_unsubscribe(sub_id).unwrap();
2658 subscriptions
2659 .control
2660 .assert_unsubscribed(&SubscriptionParams::Slot);
2661 }
2662
2663 #[test]
2664 #[serial]
2665 fn test_check_root_subscribe() {
2666 let exit = Arc::new(AtomicBool::new(false));
2667 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2668 let bank = Bank::new_for_tests(&genesis_config);
2669 let bank_forks = BankForks::new_rw_arc(bank);
2670 let optimistically_confirmed_bank =
2671 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2672 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2673 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2674 exit,
2675 max_complete_transaction_status_slot,
2676 bank_forks,
2677 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2678 optimistically_confirmed_bank,
2679 ));
2680 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2681 let sub_id = rpc.root_subscribe().unwrap();
2682
2683 subscriptions
2684 .control
2685 .assert_subscribed(&SubscriptionParams::Root);
2686
2687 subscriptions.notify_roots(vec![2, 1, 3]);
2688
2689 for expected_root in 1..=3 {
2690 let response = receiver.recv();
2691
2692 let expected_res_str =
2693 serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap();
2694 let expected = format!(
2695 r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2696 );
2697 assert_eq!(expected, response);
2698 }
2699
2700 rpc.root_unsubscribe(sub_id).unwrap();
2701 subscriptions
2702 .control
2703 .assert_unsubscribed(&SubscriptionParams::Root);
2704 }
2705
2706 #[test]
2707 #[serial]
2708 fn test_gossip_separate_account_notifications() {
2709 let GenesisConfigInfo {
2710 genesis_config,
2711 mint_keypair,
2712 ..
2713 } = create_genesis_config(100);
2714 let bank = Bank::new_for_tests(&genesis_config);
2715 let blockhash = bank.last_blockhash();
2716 let bank_forks = BankForks::new_rw_arc(bank);
2717 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2718 let bank1 = Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 1);
2719 bank_forks.write().unwrap().insert(bank1);
2720 let bank2 = Bank::new_from_parent(bank0, &Pubkey::default(), 2);
2721 bank_forks.write().unwrap().insert(bank2);
2722
2723 let alice = Keypair::from_base58_string("sfLnS4rZ5a8gXke3aGxCgM6usFAVPxLUaBSRdssGY9uS5eoiEWQ41CqDcpXbcekpKsie8Lyy3LNFdhEvjUE1wd9");
2724
2725 let optimistically_confirmed_bank =
2726 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2727 let mut pending_optimistically_confirmed_banks = HashSet::new();
2728
2729 let exit = Arc::new(AtomicBool::new(false));
2730 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2731 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2732 exit,
2733 max_complete_transaction_status_slot,
2734 bank_forks.clone(),
2735 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2736 1, 1,
2737 ))),
2738 optimistically_confirmed_bank.clone(),
2739 ));
2740 let (rpc0, mut receiver0) = rpc_pubsub_service::test_connection(&subscriptions);
2741 let (rpc1, mut receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
2742 let sub_id0 = rpc0
2743 .account_subscribe(
2744 alice.pubkey().to_string(),
2745 Some(RpcAccountInfoConfig {
2746 commitment: Some(CommitmentConfig::confirmed()),
2747 encoding: None,
2748 data_slice: None,
2749 min_context_slot: None,
2750 }),
2751 )
2752 .unwrap();
2753
2754 assert!(subscriptions.control.account_subscribed(&alice.pubkey()));
2755 rpc0.block_until_processed(&subscriptions);
2756
2757 let tx = system_transaction::create_account(
2758 &mint_keypair,
2759 &alice,
2760 blockhash,
2761 1,
2762 16,
2763 &stake::program::id(),
2764 );
2765
2766 let bank1 = bank_forks.write().unwrap().get(1).unwrap();
2768 bank1.process_transaction(&tx).unwrap();
2769 bank1.freeze();
2770
2771 bank_forks
2773 .read()
2774 .unwrap()
2775 .get(2)
2776 .unwrap()
2777 .process_transaction(&tx)
2778 .unwrap();
2779
2780 let mut highest_confirmed_slot: Slot = 0;
2782 let mut highest_root_slot: Slot = 0;
2783 let mut last_notified_confirmed_slot: Slot = 0;
2784 OptimisticallyConfirmedBankTracker::process_notification(
2785 (BankNotification::OptimisticallyConfirmed(2), None),
2786 &bank_forks,
2787 &optimistically_confirmed_bank,
2788 &subscriptions,
2789 &mut pending_optimistically_confirmed_banks,
2790 &mut last_notified_confirmed_slot,
2791 &mut highest_confirmed_slot,
2792 &mut highest_root_slot,
2793 &None,
2794 &PrioritizationFeeCache::default(),
2795 &None, );
2797
2798 highest_confirmed_slot = 0;
2800 OptimisticallyConfirmedBankTracker::process_notification(
2801 (
2802 BankNotification::OptimisticallyConfirmed(1),
2803 None, ),
2805 &bank_forks,
2806 &optimistically_confirmed_bank,
2807 &subscriptions,
2808 &mut pending_optimistically_confirmed_banks,
2809 &mut last_notified_confirmed_slot,
2810 &mut highest_confirmed_slot,
2811 &mut highest_root_slot,
2812 &None,
2813 &PrioritizationFeeCache::default(),
2814 &None, );
2816
2817 let response = receiver0.recv();
2818 let expected = json!({
2819 "jsonrpc": "2.0",
2820 "method": "accountNotification",
2821 "params": {
2822 "result": {
2823 "context": { "slot": 1 },
2824 "value": {
2825 "data": "1111111111111111",
2826 "executable": false,
2827 "lamports": 1,
2828 "owner": "Stake11111111111111111111111111111111111111",
2829 "rentEpoch": u64::MAX,
2830 "space": 16,
2831 },
2832 },
2833 "subscription": 0,
2834 }
2835 });
2836 assert_eq!(
2837 expected,
2838 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2839 );
2840 rpc0.account_unsubscribe(sub_id0).unwrap();
2841 rpc0.block_until_processed(&subscriptions);
2842
2843 let sub_id1 = rpc1
2844 .account_subscribe(
2845 alice.pubkey().to_string(),
2846 Some(RpcAccountInfoConfig {
2847 commitment: Some(CommitmentConfig::confirmed()),
2848 encoding: None,
2849 data_slice: None,
2850 min_context_slot: None,
2851 }),
2852 )
2853 .unwrap();
2854 rpc1.block_until_processed(&subscriptions);
2855
2856 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2857 bank2.freeze();
2858 highest_confirmed_slot = 0;
2859 OptimisticallyConfirmedBankTracker::process_notification(
2860 (
2861 BankNotification::Frozen(bank2),
2862 None, ),
2864 &bank_forks,
2865 &optimistically_confirmed_bank,
2866 &subscriptions,
2867 &mut pending_optimistically_confirmed_banks,
2868 &mut last_notified_confirmed_slot,
2869 &mut highest_confirmed_slot,
2870 &mut highest_root_slot,
2871 &None,
2872 &PrioritizationFeeCache::default(),
2873 &None, );
2875 let response = receiver1.recv();
2876 let expected = json!({
2877 "jsonrpc": "2.0",
2878 "method": "accountNotification",
2879 "params": {
2880 "result": {
2881 "context": { "slot": 2 },
2882 "value": {
2883 "data": "1111111111111111",
2884 "executable": false,
2885 "lamports": 1,
2886 "owner": "Stake11111111111111111111111111111111111111",
2887 "rentEpoch": u64::MAX,
2888 "space": 16,
2889 },
2890 },
2891 "subscription": 3,
2892 }
2893 });
2894 assert_eq!(
2895 expected,
2896 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2897 );
2898 rpc1.account_unsubscribe(sub_id1).unwrap();
2899
2900 assert!(!subscriptions.control.account_subscribed(&alice.pubkey()));
2901 }
2902
2903 fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value {
2904 json!({
2905 "jsonrpc": "2.0",
2906 "method": "logsNotification",
2907 "params": {
2908 "result": {
2909 "context": {
2910 "slot": 0
2911 },
2912 "value": {
2913 "signature": signature,
2914 "err": null,
2915 "logs": [
2916 "Program 11111111111111111111111111111111 invoke [1]",
2917 "Program 11111111111111111111111111111111 success"
2918 ]
2919 }
2920 },
2921 "subscription": subscription_id
2922 }
2923 })
2924 }
2925
2926 #[test]
2927 #[serial]
2928 fn test_logs_subscribe() {
2929 let GenesisConfigInfo {
2930 genesis_config,
2931 mint_keypair,
2932 ..
2933 } = create_genesis_config(100);
2934 let bank = Bank::new_for_tests(&genesis_config);
2935 let blockhash = bank.last_blockhash();
2936 let bank_forks = BankForks::new_rw_arc(bank);
2937
2938 let alice = Keypair::new();
2939
2940 let exit = Arc::new(AtomicBool::new(false));
2941 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2942 let optimistically_confirmed_bank =
2943 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2944 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2945 exit,
2946 max_complete_transaction_status_slot,
2947 bank_forks.clone(),
2948 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2949 optimistically_confirmed_bank,
2950 ));
2951
2952 let sub_config = RpcTransactionLogsConfig {
2953 commitment: Some(CommitmentConfig::processed()),
2954 };
2955
2956 let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions);
2957 let sub_id_for_all = rpc_all
2958 .logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone()))
2959 .unwrap();
2960 assert!(subscriptions.control.logs_subscribed(None));
2961
2962 let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions);
2963 let sub_id_for_alice = rpc_alice
2964 .logs_subscribe(
2965 RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]),
2966 Some(sub_config),
2967 )
2968 .unwrap();
2969 assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
2970 rpc_alice.block_until_processed(&subscriptions);
2971
2972 let tx = system_transaction::create_account(
2973 &mint_keypair,
2974 &alice,
2975 blockhash,
2976 1,
2977 0,
2978 &system_program::id(),
2979 );
2980
2981 assert!(bank_forks
2982 .read()
2983 .unwrap()
2984 .get(0)
2985 .unwrap()
2986 .process_transaction_with_metadata(tx.clone())
2987 .is_ok());
2988
2989 subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0));
2990
2991 let expected_response_all =
2992 make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all));
2993 let response_all = receiver_all.recv();
2994 assert_eq!(
2995 expected_response_all,
2996 serde_json::from_str::<serde_json::Value>(&response_all).unwrap(),
2997 );
2998 let expected_response_alice =
2999 make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice));
3000 let response_alice = receiver_alice.recv();
3001 assert_eq!(
3002 expected_response_alice,
3003 serde_json::from_str::<serde_json::Value>(&response_alice).unwrap(),
3004 );
3005
3006 rpc_all.logs_unsubscribe(sub_id_for_all).unwrap();
3007 assert!(!subscriptions.control.logs_subscribed(None));
3008 rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap();
3009 assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
3010 }
3011
3012 #[test]
3013 fn test_total_subscriptions() {
3014 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
3015 let bank = Bank::new_for_tests(&genesis_config);
3016 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
3017 let bank_forks = BankForks::new_rw_arc(bank);
3018 let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
3019 max_complete_transaction_status_slot,
3020 bank_forks,
3021 ));
3022
3023 let (rpc1, _receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
3024 let sub_id1 = rpc1
3025 .account_subscribe(Pubkey::default().to_string(), None)
3026 .unwrap();
3027
3028 assert_eq!(subscriptions.total(), 1);
3029
3030 let (rpc2, _receiver2) = rpc_pubsub_service::test_connection(&subscriptions);
3031 let sub_id2 = rpc2
3032 .program_subscribe(Pubkey::default().to_string(), None)
3033 .unwrap();
3034
3035 assert_eq!(subscriptions.total(), 2);
3036
3037 let (rpc3, _receiver3) = rpc_pubsub_service::test_connection(&subscriptions);
3038 let sub_id3 = rpc3
3039 .logs_subscribe(RpcTransactionLogsFilter::All, None)
3040 .unwrap();
3041 assert_eq!(subscriptions.total(), 3);
3042
3043 let (rpc4, _receiver4) = rpc_pubsub_service::test_connection(&subscriptions);
3044 let sub_id4 = rpc4
3045 .signature_subscribe(Signature::default().to_string(), None)
3046 .unwrap();
3047
3048 assert_eq!(subscriptions.total(), 4);
3049
3050 let (rpc5, _receiver5) = rpc_pubsub_service::test_connection(&subscriptions);
3051 let sub_id5 = rpc5.slot_subscribe().unwrap();
3052
3053 assert_eq!(subscriptions.total(), 5);
3054
3055 let (rpc6, _receiver6) = rpc_pubsub_service::test_connection(&subscriptions);
3056 let sub_id6 = rpc6.vote_subscribe().unwrap();
3057
3058 assert_eq!(subscriptions.total(), 6);
3059
3060 let (rpc7, _receiver7) = rpc_pubsub_service::test_connection(&subscriptions);
3061 let sub_id7 = rpc7.root_subscribe().unwrap();
3062
3063 assert_eq!(subscriptions.total(), 7);
3064
3065 let (rpc8, _receiver8) = rpc_pubsub_service::test_connection(&subscriptions);
3067 let sub_id8 = rpc8
3068 .account_subscribe(Pubkey::default().to_string(), None)
3069 .unwrap();
3070 assert_eq!(subscriptions.total(), 7);
3071
3072 rpc1.account_unsubscribe(sub_id1).unwrap();
3073 assert_eq!(subscriptions.total(), 7);
3074
3075 rpc8.account_unsubscribe(sub_id8).unwrap();
3076 assert_eq!(subscriptions.total(), 6);
3077
3078 rpc2.program_unsubscribe(sub_id2).unwrap();
3079 assert_eq!(subscriptions.total(), 5);
3080
3081 rpc3.logs_unsubscribe(sub_id3).unwrap();
3082 assert_eq!(subscriptions.total(), 4);
3083
3084 rpc4.signature_unsubscribe(sub_id4).unwrap();
3085 assert_eq!(subscriptions.total(), 3);
3086
3087 rpc5.slot_unsubscribe(sub_id5).unwrap();
3088 assert_eq!(subscriptions.total(), 2);
3089
3090 rpc6.vote_unsubscribe(sub_id6).unwrap();
3091 assert_eq!(subscriptions.total(), 1);
3092
3093 rpc7.root_unsubscribe(sub_id7).unwrap();
3094 assert_eq!(subscriptions.total(), 0);
3095 }
3096}