1use {
4 crate::{
5 filter::filter_allows,
6 optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
7 parsed_token_accounts::{get_parsed_token_account, get_parsed_token_accounts},
8 rpc_pubsub_service::PubSubConfig,
9 rpc_subscription_tracker::{
10 AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams,
11 LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams,
12 SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionInfo,
13 SubscriptionParams, SubscriptionsTracker,
14 },
15 },
16 crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
17 itertools::Either,
18 rayon::prelude::*,
19 serde::Serialize,
20 solana_account_decoder::{
21 encode_ui_account, parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding,
22 },
23 solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
24 solana_measure::measure::Measure,
25 solana_rpc_client_api::response::{
26 ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
27 RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
28 RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
29 },
30 solana_runtime::{
31 bank::{Bank, TransactionLogInfo},
32 bank_forks::BankForks,
33 commitment::{BlockCommitmentCache, CommitmentSlots},
34 },
35 solana_sdk::{
36 account::{AccountSharedData, ReadableAccount},
37 clock::Slot,
38 pubkey::Pubkey,
39 signature::Signature,
40 timing::timestamp,
41 transaction,
42 },
43 solana_transaction_status::{
44 BlockEncodingOptions, ConfirmedBlock, EncodeError, VersionedConfirmedBlock,
45 },
46 solana_vote::vote_transaction::VoteTransaction,
47 std::{
48 cell::RefCell,
49 collections::{HashMap, VecDeque},
50 io::Cursor,
51 str,
52 sync::{
53 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
54 Arc, Mutex, RwLock, Weak,
55 },
56 thread::{Builder, JoinHandle},
57 time::{Duration, Instant},
58 },
59 tokio::sync::broadcast,
60};
61
62const RECEIVE_DELAY_MILLIS: u64 = 100;
63
64fn get_transaction_logs(
65 bank: &Bank,
66 params: &LogsSubscriptionParams,
67) -> Option<Vec<TransactionLogInfo>> {
68 let pubkey = match ¶ms.kind {
69 LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
70 LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
71 };
72 let mut logs = bank.get_transaction_logs(pubkey);
73 if matches!(params.kind, LogsSubscriptionKind::All) {
74 if let Some(logs) = &mut logs {
76 logs.retain(|log| !log.is_vote);
77 }
78 }
79 logs
80}
81#[derive(Debug)]
82pub struct TimestampedNotificationEntry {
83 pub entry: NotificationEntry,
84 pub queued_at: Instant,
85}
86
87impl From<NotificationEntry> for TimestampedNotificationEntry {
88 fn from(entry: NotificationEntry) -> Self {
89 TimestampedNotificationEntry {
90 entry,
91 queued_at: Instant::now(),
92 }
93 }
94}
95
96pub enum NotificationEntry {
97 Slot(SlotInfo),
98 SlotUpdate(SlotUpdate),
99 Vote((Pubkey, VoteTransaction, Signature)),
100 Root(Slot),
101 Bank(CommitmentSlots),
102 Gossip(Slot),
103 SignaturesReceived((Slot, Vec<Signature>)),
104 Subscribed(SubscriptionParams, SubscriptionId),
105 Unsubscribed(SubscriptionParams, SubscriptionId),
106}
107
108impl std::fmt::Debug for NotificationEntry {
109 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110 match self {
111 NotificationEntry::Root(root) => write!(f, "Root({root})"),
112 NotificationEntry::Vote(vote) => write!(f, "Vote({vote:?})"),
113 NotificationEntry::Slot(slot_info) => write!(f, "Slot({slot_info:?})"),
114 NotificationEntry::SlotUpdate(slot_update) => {
115 write!(f, "SlotUpdate({slot_update:?})")
116 }
117 NotificationEntry::Bank(commitment_slots) => {
118 write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
119 }
120 NotificationEntry::SignaturesReceived(slot_signatures) => {
121 write!(f, "SignaturesReceived({slot_signatures:?})")
122 }
123 NotificationEntry::Gossip(slot) => write!(f, "Gossip({slot:?})"),
124 NotificationEntry::Subscribed(params, id) => {
125 write!(f, "Subscribed({params:?}, {id:?})")
126 }
127 NotificationEntry::Unsubscribed(params, id) => {
128 write!(f, "Unsubscribed({params:?}, {id:?})")
129 }
130 }
131 }
132}
133
134#[allow(clippy::type_complexity)]
135fn check_commitment_and_notify<P, S, B, F, X, I>(
136 params: &P,
137 subscription: &SubscriptionInfo,
138 bank_forks: &Arc<RwLock<BankForks>>,
139 slot: Slot,
140 bank_method: B,
141 filter_results: F,
142 notifier: &RpcNotifier,
143 is_final: bool,
144) -> bool
145where
146 S: Clone + Serialize,
147 B: Fn(&Bank, &P) -> X,
148 F: Fn(X, &P, Slot, Arc<Bank>) -> (I, Slot),
149 X: Clone + Default,
150 I: IntoIterator<Item = S>,
151{
152 let mut notified = false;
153 let bank = bank_forks.read().unwrap().get(slot);
154 if let Some(bank) = bank {
155 let results = bank_method(&bank, params);
156 let mut w_last_notified_slot = subscription.last_notified_slot.write().unwrap();
157 let (filter_results, result_slot) =
158 filter_results(results, params, *w_last_notified_slot, bank);
159 for result in filter_results {
160 notifier.notify(
161 RpcResponse::from(RpcNotificationResponse {
162 context: RpcNotificationContext { slot },
163 value: result,
164 }),
165 subscription,
166 is_final,
167 );
168 *w_last_notified_slot = result_slot;
169 notified = true;
170 }
171 }
172
173 notified
174}
175
176#[derive(Debug, Clone)]
177pub struct RpcNotification {
178 pub subscription_id: SubscriptionId,
179 pub is_final: bool,
180 pub json: Weak<String>,
181 pub created_at: Instant,
182}
183
184#[derive(Debug, Clone, PartialEq)]
185struct RpcNotificationResponse<T> {
186 context: RpcNotificationContext,
187 value: T,
188}
189
190impl<T> From<RpcNotificationResponse<T>> for RpcResponse<T> {
191 fn from(notification: RpcNotificationResponse<T>) -> Self {
192 let RpcNotificationResponse {
193 context: RpcNotificationContext { slot },
194 value,
195 } = notification;
196 Self {
197 context: RpcResponseContext {
198 slot,
199 api_version: None,
200 },
201 value,
202 }
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct RpcNotificationContext {
208 slot: Slot,
209}
210
211const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000);
212
213struct RecentItems {
214 queue: VecDeque<Arc<String>>,
215 total_bytes: usize,
216 max_len: usize,
217 max_total_bytes: usize,
218 last_metrics_submission: Instant,
219}
220
221impl RecentItems {
222 fn new(max_len: usize, max_total_bytes: usize) -> Self {
223 Self {
224 queue: VecDeque::new(),
225 total_bytes: 0,
226 max_len,
227 max_total_bytes,
228 last_metrics_submission: Instant::now(),
229 }
230 }
231
232 fn push(&mut self, item: Arc<String>) {
233 self.total_bytes = self
234 .total_bytes
235 .checked_add(item.len())
236 .expect("total bytes overflow");
237 self.queue.push_back(item);
238
239 while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
240 let item = self.queue.pop_front().expect("can't be empty");
241 self.total_bytes = self
242 .total_bytes
243 .checked_sub(item.len())
244 .expect("total bytes underflow");
245 }
246
247 let now = Instant::now();
248 let last_metrics_ago = now.duration_since(self.last_metrics_submission);
249 if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
250 datapoint_info!(
251 "rpc_subscriptions_recent_items",
252 ("num", self.queue.len(), i64),
253 ("total_bytes", self.total_bytes, i64),
254 );
255 self.last_metrics_submission = now;
256 } else {
257 trace!(
258 "rpc_subscriptions_recent_items num={} total_bytes={}",
259 self.queue.len(),
260 self.total_bytes,
261 );
262 }
263 }
264}
265
266struct RpcNotifier {
267 sender: broadcast::Sender<RpcNotification>,
268 recent_items: Mutex<RecentItems>,
269}
270
271thread_local! {
272 static RPC_NOTIFIER_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
273}
274
275#[derive(Debug, Serialize)]
276struct NotificationParams<T> {
277 result: T,
278 subscription: SubscriptionId,
279}
280
281#[derive(Debug, Serialize)]
282struct Notification<T> {
283 jsonrpc: Option<jsonrpc_core::Version>,
284 method: &'static str,
285 params: NotificationParams<T>,
286}
287
288impl RpcNotifier {
289 fn notify<T>(&self, value: T, subscription: &SubscriptionInfo, is_final: bool)
290 where
291 T: serde::Serialize,
292 {
293 let buf_arc = RPC_NOTIFIER_BUF.with(|buf| {
294 let mut buf = buf.borrow_mut();
295 buf.clear();
296 let notification = Notification {
297 jsonrpc: Some(jsonrpc_core::Version::V2),
298 method: subscription.method(),
299 params: NotificationParams {
300 result: value,
301 subscription: subscription.id(),
302 },
303 };
304 serde_json::to_writer(Cursor::new(&mut *buf), ¬ification)
305 .expect("serialization never fails");
306 let buf_str = str::from_utf8(&buf).expect("json is always utf-8");
307 Arc::new(String::from(buf_str))
308 });
309
310 let notification = RpcNotification {
311 subscription_id: subscription.id(),
312 json: Arc::downgrade(&buf_arc),
313 is_final,
314 created_at: Instant::now(),
315 };
316 let _ = self.sender.send(notification);
319
320 inc_new_counter_info!("rpc-pubsub-messages", 1);
321 inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
322
323 self.recent_items.lock().unwrap().push(buf_arc);
324 }
325}
326
327fn filter_block_result_txs(
328 mut block: VersionedConfirmedBlock,
329 last_modified_slot: Slot,
330 params: &BlockSubscriptionParams,
331) -> Result<Option<RpcBlockUpdate>, RpcBlockUpdateError> {
332 block.transactions = match params.kind {
333 BlockSubscriptionKind::All => block.transactions,
334 BlockSubscriptionKind::MentionsAccountOrProgram(pk) => block
335 .transactions
336 .into_iter()
337 .filter(|tx| tx.account_keys().iter().any(|key| key == &pk))
338 .collect(),
339 };
340
341 if block.transactions.is_empty() {
342 if let BlockSubscriptionKind::MentionsAccountOrProgram(_) = params.kind {
343 return Ok(None);
344 }
345 }
346
347 let block = ConfirmedBlock::from(block)
348 .encode_with_options(
349 params.encoding,
350 BlockEncodingOptions {
351 transaction_details: params.transaction_details,
352 show_rewards: params.show_rewards,
353 max_supported_transaction_version: params.max_supported_transaction_version,
354 },
355 )
356 .map_err(|err| match err {
357 EncodeError::UnsupportedTransactionVersion(version) => {
358 RpcBlockUpdateError::UnsupportedTransactionVersion(version)
359 }
360 })?;
361
362 Ok(Some(RpcBlockUpdate {
366 slot: last_modified_slot,
367 block: Some(block),
368 err: None,
369 }))
370}
371
372fn filter_account_result(
373 result: Option<(AccountSharedData, Slot)>,
374 params: &AccountSubscriptionParams,
375 last_notified_slot: Slot,
376 bank: Arc<Bank>,
377) -> (Option<UiAccount>, Slot) {
378 let (account, last_modified_slot) = result.unwrap_or_default();
381
382 let account = (last_modified_slot != last_notified_slot).then(|| {
385 if is_known_spl_token_id(account.owner())
386 && params.encoding == UiAccountEncoding::JsonParsed
387 {
388 get_parsed_token_account(&bank, ¶ms.pubkey, account, None)
389 } else {
390 encode_ui_account(¶ms.pubkey, &account, params.encoding, None, None)
391 }
392 });
393 (account, last_modified_slot)
394}
395
396fn filter_signature_result(
397 result: Option<transaction::Result<()>>,
398 _params: &SignatureSubscriptionParams,
399 last_notified_slot: Slot,
400 _bank: Arc<Bank>,
401) -> (Option<RpcSignatureResult>, Slot) {
402 (
403 result.map(|result| {
404 RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() })
405 }),
406 last_notified_slot,
407 )
408}
409
410fn filter_program_results(
411 accounts: Vec<(Pubkey, AccountSharedData)>,
412 params: &ProgramSubscriptionParams,
413 last_notified_slot: Slot,
414 bank: Arc<Bank>,
415) -> (impl Iterator<Item = RpcKeyedAccount>, Slot) {
416 let accounts_is_empty = accounts.is_empty();
417 let encoding = params.encoding;
418 let filters = params.filters.clone();
419 let keyed_accounts = accounts.into_iter().filter(move |(_, account)| {
420 filters
421 .iter()
422 .all(|filter_type| filter_allows(filter_type, account))
423 });
424 let accounts = if is_known_spl_token_id(¶ms.pubkey)
425 && params.encoding == UiAccountEncoding::JsonParsed
426 && !accounts_is_empty
427 {
428 let accounts = get_parsed_token_accounts(bank, keyed_accounts);
429 Either::Left(accounts)
430 } else {
431 let accounts = keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount {
432 pubkey: pubkey.to_string(),
433 account: encode_ui_account(&pubkey, &account, encoding, None, None),
434 });
435 Either::Right(accounts)
436 };
437 (accounts, last_notified_slot)
438}
439
440fn filter_logs_results(
441 logs: Option<Vec<TransactionLogInfo>>,
442 _params: &LogsSubscriptionParams,
443 last_notified_slot: Slot,
444 _bank: Arc<Bank>,
445) -> (impl Iterator<Item = RpcLogsResponse>, Slot) {
446 let responses = logs.into_iter().flatten().map(|log| RpcLogsResponse {
447 signature: log.signature.to_string(),
448 err: log.result.err(),
449 logs: log.log_messages,
450 });
451 (responses, last_notified_slot)
452}
453
454fn initial_last_notified_slot(
455 params: &SubscriptionParams,
456 bank_forks: &RwLock<BankForks>,
457 block_commitment_cache: &RwLock<BlockCommitmentCache>,
458 optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
459) -> Option<Slot> {
460 match params {
461 SubscriptionParams::Account(params) => {
462 let slot = if params.commitment.is_finalized() {
463 block_commitment_cache
464 .read()
465 .unwrap()
466 .highest_super_majority_root()
467 } else if params.commitment.is_confirmed() {
468 optimistically_confirmed_bank.read().unwrap().bank.slot()
469 } else {
470 block_commitment_cache.read().unwrap().slot()
471 };
472
473 let bank = bank_forks.read().unwrap().get(slot)?;
474 Some(bank.get_account_modified_slot(¶ms.pubkey)?.1)
475 }
476 _ => None,
477 }
478}
479
480#[derive(Default)]
481struct PubsubNotificationStats {
482 since: Option<Instant>,
483 notification_entry_processing_count: u64,
484 notification_entry_processing_time_us: u64,
485}
486
487impl PubsubNotificationStats {
488 fn maybe_submit(&mut self) {
489 const SUBMIT_CADENCE: Duration = RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS;
490 let elapsed = self.since.as_ref().map(Instant::elapsed);
491 if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
492 return;
493 }
494 datapoint_info!(
495 "pubsub_notification_entries",
496 (
497 "notification_entry_processing_count",
498 self.notification_entry_processing_count,
499 i64
500 ),
501 (
502 "notification_entry_processing_time_us",
503 self.notification_entry_processing_time_us,
504 i64
505 ),
506 );
507 *self = Self {
508 since: Some(Instant::now()),
509 ..Self::default()
510 };
511 }
512}
513
514pub struct RpcSubscriptions {
515 notification_sender: Option<Sender<TimestampedNotificationEntry>>,
516 t_cleanup: Option<JoinHandle<()>>,
517
518 exit: Arc<AtomicBool>,
519 control: SubscriptionControl,
520}
521
522impl Drop for RpcSubscriptions {
523 fn drop(&mut self) {
524 self.shutdown().unwrap_or_else(|err| {
525 warn!("RPC Notification - shutdown error: {:?}", err);
526 });
527 }
528}
529
530impl RpcSubscriptions {
531 pub fn new(
532 exit: Arc<AtomicBool>,
533 max_complete_transaction_status_slot: Arc<AtomicU64>,
534 max_complete_rewards_slot: Arc<AtomicU64>,
535 blockstore: Arc<Blockstore>,
536 bank_forks: Arc<RwLock<BankForks>>,
537 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
538 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
539 ) -> Self {
540 Self::new_with_config(
541 exit,
542 max_complete_transaction_status_slot,
543 max_complete_rewards_slot,
544 blockstore,
545 bank_forks,
546 block_commitment_cache,
547 optimistically_confirmed_bank,
548 &PubSubConfig::default(),
549 None,
550 )
551 }
552
553 pub fn new_for_tests(
554 exit: Arc<AtomicBool>,
555 max_complete_transaction_status_slot: Arc<AtomicU64>,
556 max_complete_rewards_slot: Arc<AtomicU64>,
557 bank_forks: Arc<RwLock<BankForks>>,
558 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
559 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
560 ) -> Self {
561 let ledger_path = get_tmp_ledger_path!();
562 let blockstore = Blockstore::open(&ledger_path).unwrap();
563 let blockstore = Arc::new(blockstore);
564
565 Self::new_for_tests_with_blockstore(
566 exit,
567 max_complete_transaction_status_slot,
568 max_complete_rewards_slot,
569 blockstore,
570 bank_forks,
571 block_commitment_cache,
572 optimistically_confirmed_bank,
573 )
574 }
575
576 pub fn new_for_tests_with_blockstore(
577 exit: Arc<AtomicBool>,
578 max_complete_transaction_status_slot: Arc<AtomicU64>,
579 max_complete_rewards_slot: Arc<AtomicU64>,
580 blockstore: Arc<Blockstore>,
581 bank_forks: Arc<RwLock<BankForks>>,
582 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
583 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
584 ) -> Self {
585 let rpc_notifier_ready = Arc::new(AtomicBool::new(false));
586
587 let rpc_subscriptions = Self::new_with_config(
588 exit,
589 max_complete_transaction_status_slot,
590 max_complete_rewards_slot,
591 blockstore,
592 bank_forks,
593 block_commitment_cache,
594 optimistically_confirmed_bank,
595 &PubSubConfig::default_for_tests(),
596 Some(rpc_notifier_ready.clone()),
597 );
598
599 let start_time = Instant::now();
601 loop {
602 if rpc_notifier_ready.load(Ordering::Relaxed) {
603 break;
604 } else if (Instant::now() - start_time).as_millis() > 5000 {
605 panic!("RPC notifier thread setup took too long");
606 }
607 }
608
609 rpc_subscriptions
610 }
611
612 pub fn new_with_config(
613 exit: Arc<AtomicBool>,
614 max_complete_transaction_status_slot: Arc<AtomicU64>,
615 max_complete_rewards_slot: Arc<AtomicU64>,
616 blockstore: Arc<Blockstore>,
617 bank_forks: Arc<RwLock<BankForks>>,
618 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
619 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
620 config: &PubSubConfig,
621 rpc_notifier_ready: Option<Arc<AtomicBool>>,
622 ) -> Self {
623 let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();
624
625 let subscriptions = SubscriptionsTracker::new(bank_forks.clone());
626
627 let (broadcast_sender, _) = broadcast::channel(config.queue_capacity_items);
628
629 let notifier = RpcNotifier {
630 sender: broadcast_sender.clone(),
631 recent_items: Mutex::new(RecentItems::new(
632 config.queue_capacity_items,
633 config.queue_capacity_bytes,
634 )),
635 };
636
637 let t_cleanup = config.notification_threads.map(|notification_threads| {
638 let exit = exit.clone();
639 Builder::new()
640 .name("solRpcNotifier".to_string())
641 .spawn(move || {
642 let pool = rayon::ThreadPoolBuilder::new()
643 .num_threads(notification_threads.get())
644 .thread_name(|i| format!("solRpcNotify{i:02}"))
645 .build()
646 .unwrap();
647 pool.install(|| {
648 if let Some(rpc_notifier_ready) = rpc_notifier_ready {
649 rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
650 }
651 Self::process_notifications(
652 exit,
653 max_complete_transaction_status_slot,
654 max_complete_rewards_slot,
655 blockstore,
656 notifier,
657 notification_receiver,
658 subscriptions,
659 bank_forks,
660 block_commitment_cache,
661 optimistically_confirmed_bank,
662 )
663 });
664 })
665 .unwrap()
666 });
667
668 let control = SubscriptionControl::new(
669 config.max_active_subscriptions,
670 notification_sender.clone(),
671 broadcast_sender,
672 );
673
674 Self {
675 notification_sender: config.notification_threads.map(|_| notification_sender),
676 t_cleanup,
677 exit,
678 control,
679 }
680 }
681
682 pub fn default_with_bank_forks(
684 max_complete_transaction_status_slot: Arc<AtomicU64>,
685 max_complete_rewards_slot: Arc<AtomicU64>,
686 bank_forks: Arc<RwLock<BankForks>>,
687 ) -> Self {
688 let ledger_path = get_tmp_ledger_path!();
689 let blockstore = Blockstore::open(&ledger_path).unwrap();
690 let blockstore = Arc::new(blockstore);
691 let optimistically_confirmed_bank =
692 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
693 Self::new(
694 Arc::new(AtomicBool::new(false)),
695 max_complete_transaction_status_slot,
696 max_complete_rewards_slot,
697 blockstore,
698 bank_forks,
699 Arc::new(RwLock::new(BlockCommitmentCache::default())),
700 optimistically_confirmed_bank,
701 )
702 }
703
704 pub fn control(&self) -> &SubscriptionControl {
705 &self.control
706 }
707
708 pub fn notify_subscribers(&self, commitment_slots: CommitmentSlots) {
711 self.enqueue_notification(NotificationEntry::Bank(commitment_slots));
712 }
713
714 pub fn notify_gossip_subscribers(&self, slot: Slot) {
717 self.enqueue_notification(NotificationEntry::Gossip(slot));
718 }
719
720 pub fn notify_slot_update(&self, slot_update: SlotUpdate) {
721 self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update));
722 }
723
724 pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
725 self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
726 self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank {
727 slot,
728 parent,
729 timestamp: timestamp(),
730 }));
731 }
732
733 pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
734 self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
735 }
736
737 pub fn notify_vote(&self, vote_pubkey: Pubkey, vote: VoteTransaction, signature: Signature) {
738 self.enqueue_notification(NotificationEntry::Vote((vote_pubkey, vote, signature)));
739 }
740
741 pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
742 rooted_slots.sort_unstable();
743 rooted_slots.into_iter().for_each(|root| {
744 self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root {
745 slot: root,
746 timestamp: timestamp(),
747 }));
748 self.enqueue_notification(NotificationEntry::Root(root));
749 });
750 }
751
752 fn enqueue_notification(&self, notification_entry: NotificationEntry) {
753 if let Some(ref notification_sender) = self.notification_sender {
754 match notification_sender.send(notification_entry.into()) {
755 Ok(()) => (),
756 Err(SendError(notification)) => {
757 warn!(
758 "Dropped RPC Notification - receiver disconnected : {:?}",
759 notification
760 );
761 }
762 }
763 }
764 }
765
766 #[allow(clippy::too_many_arguments)]
767 fn process_notifications(
768 exit: Arc<AtomicBool>,
769 max_complete_transaction_status_slot: Arc<AtomicU64>,
770 max_complete_rewards_slot: Arc<AtomicU64>,
771 blockstore: Arc<Blockstore>,
772 notifier: RpcNotifier,
773 notification_receiver: Receiver<TimestampedNotificationEntry>,
774 mut subscriptions: SubscriptionsTracker,
775 bank_forks: Arc<RwLock<BankForks>>,
776 block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
777 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
778 ) {
779 let mut stats = PubsubNotificationStats::default();
780
781 loop {
782 if exit.load(Ordering::Relaxed) {
783 break;
784 }
785 match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
786 Ok(notification_entry) => {
787 let TimestampedNotificationEntry { entry, queued_at } = notification_entry;
788 match entry {
789 NotificationEntry::Subscribed(params, id) => {
790 subscriptions.subscribe(params.clone(), id, || {
791 initial_last_notified_slot(
792 ¶ms,
793 &bank_forks,
794 &block_commitment_cache,
795 &optimistically_confirmed_bank,
796 )
797 .unwrap_or(0)
798 });
799 }
800 NotificationEntry::Unsubscribed(params, id) => {
801 subscriptions.unsubscribe(params, id);
802 }
803 NotificationEntry::Slot(slot_info) => {
804 if let Some(sub) = subscriptions
805 .node_progress_watchers()
806 .get(&SubscriptionParams::Slot)
807 {
808 debug!("slot notify: {:?}", slot_info);
809 inc_new_counter_info!("rpc-subscription-notify-slot", 1);
810 notifier.notify(slot_info, sub, false);
811 }
812 }
813 NotificationEntry::SlotUpdate(slot_update) => {
814 if let Some(sub) = subscriptions
815 .node_progress_watchers()
816 .get(&SubscriptionParams::SlotsUpdates)
817 {
818 inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1);
819 notifier.notify(slot_update, sub, false);
820 }
821 }
822 NotificationEntry::Vote((vote_pubkey, ref vote_info, signature)) => {
826 if let Some(sub) = subscriptions
827 .node_progress_watchers()
828 .get(&SubscriptionParams::Vote)
829 {
830 let rpc_vote = RpcVote {
831 vote_pubkey: vote_pubkey.to_string(),
832 slots: vote_info.slots(),
833 hash: bs58::encode(vote_info.hash()).into_string(),
834 timestamp: vote_info.timestamp(),
835 signature: signature.to_string(),
836 };
837 debug!("vote notify: {:?}", vote_info);
838 inc_new_counter_info!("rpc-subscription-notify-vote", 1);
839 notifier.notify(&rpc_vote, sub, false);
840 }
841 }
842 NotificationEntry::Root(root) => {
843 if let Some(sub) = subscriptions
844 .node_progress_watchers()
845 .get(&SubscriptionParams::Root)
846 {
847 debug!("root notify: {:?}", root);
848 inc_new_counter_info!("rpc-subscription-notify-root", 1);
849 notifier.notify(root, sub, false);
850 }
851 }
852 NotificationEntry::Bank(commitment_slots) => {
853 const SOURCE: &str = "bank";
854 RpcSubscriptions::notify_watchers(
855 max_complete_transaction_status_slot.clone(),
856 max_complete_rewards_slot.clone(),
857 subscriptions.commitment_watchers(),
858 &bank_forks,
859 &blockstore,
860 &commitment_slots,
861 ¬ifier,
862 SOURCE,
863 );
864 }
865 NotificationEntry::Gossip(slot) => {
866 let commitment_slots = CommitmentSlots {
867 highest_confirmed_slot: slot,
868 ..CommitmentSlots::default()
869 };
870 const SOURCE: &str = "gossip";
871 RpcSubscriptions::notify_watchers(
872 max_complete_transaction_status_slot.clone(),
873 max_complete_rewards_slot.clone(),
874 subscriptions.gossip_watchers(),
875 &bank_forks,
876 &blockstore,
877 &commitment_slots,
878 ¬ifier,
879 SOURCE,
880 );
881 }
882 NotificationEntry::SignaturesReceived((slot, slot_signatures)) => {
883 for slot_signature in &slot_signatures {
884 if let Some(subs) = subscriptions.by_signature().get(slot_signature)
885 {
886 for subscription in subs.values() {
887 if let SubscriptionParams::Signature(params) =
888 subscription.params()
889 {
890 if params.enable_received_notification {
891 notifier.notify(
892 RpcResponse::from(RpcNotificationResponse {
893 context: RpcNotificationContext { slot },
894 value: RpcSignatureResult::ReceivedSignature(
895 ReceivedSignatureResult::ReceivedSignature,
896 ),
897 }),
898 subscription,
899 false,
900 );
901 }
902 } else {
903 error!("invalid params type in visit_by_signature");
904 }
905 }
906 }
907 }
908 }
909 }
910 stats.notification_entry_processing_time_us +=
911 queued_at.elapsed().as_micros() as u64;
912 stats.notification_entry_processing_count += 1;
913 }
914 Err(RecvTimeoutError::Timeout) => {
915 }
917 Err(RecvTimeoutError::Disconnected) => {
918 warn!("RPC Notification thread - sender disconnected");
919 break;
920 }
921 }
922 stats.maybe_submit();
923 }
924 }
925
926 fn notify_watchers(
927 max_complete_transaction_status_slot: Arc<AtomicU64>,
928 max_complete_rewards_slot: Arc<AtomicU64>,
929 subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
930 bank_forks: &Arc<RwLock<BankForks>>,
931 blockstore: &Blockstore,
932 commitment_slots: &CommitmentSlots,
933 notifier: &RpcNotifier,
934 source: &'static str,
935 ) {
936 let mut total_time = Measure::start("notify_watchers");
937
938 let num_accounts_found = AtomicUsize::new(0);
939 let num_accounts_notified = AtomicUsize::new(0);
940
941 let num_blocks_found = AtomicUsize::new(0);
942 let num_blocks_notified = AtomicUsize::new(0);
943
944 let num_logs_found = AtomicUsize::new(0);
945 let num_logs_notified = AtomicUsize::new(0);
946
947 let num_programs_found = AtomicUsize::new(0);
948 let num_programs_notified = AtomicUsize::new(0);
949
950 let num_signatures_found = AtomicUsize::new(0);
951 let num_signatures_notified = AtomicUsize::new(0);
952
953 let subscriptions = subscriptions.into_par_iter();
954 subscriptions.for_each(|(_id, subscription)| {
955 let slot = if let Some(commitment) = subscription.commitment() {
956 if commitment.is_finalized() {
957 Some(commitment_slots.highest_super_majority_root)
958 } else if commitment.is_confirmed() {
959 Some(commitment_slots.highest_confirmed_slot)
960 } else {
961 Some(commitment_slots.slot)
962 }
963 } else {
964 error!("missing commitment in notify_watchers");
965 None
966 };
967 match subscription.params() {
968 SubscriptionParams::Account(params) => {
969 num_accounts_found.fetch_add(1, Ordering::Relaxed);
970 if let Some(slot) = slot {
971 let notified = check_commitment_and_notify(
972 params,
973 subscription,
974 bank_forks,
975 slot,
976 |bank, params| bank.get_account_modified_slot(¶ms.pubkey),
977 filter_account_result,
978 notifier,
979 false,
980 );
981
982 if notified {
983 num_accounts_notified.fetch_add(1, Ordering::Relaxed);
984 }
985 }
986 }
987 SubscriptionParams::Block(params) => {
988 num_blocks_found.fetch_add(1, Ordering::Relaxed);
989 if let Some(slot) = slot {
990 let bank = bank_forks.read().unwrap().get(slot);
991 if let Some(bank) = bank {
992 let mut w_last_unnotified_slot =
1007 subscription.last_notified_slot.write().unwrap();
1008 if *w_last_unnotified_slot == 0 {
1010 *w_last_unnotified_slot = slot;
1011 }
1012 let mut slots_to_notify: Vec<_> =
1013 (*w_last_unnotified_slot..slot).collect();
1014 let ancestors = bank.proper_ancestors_set();
1015 slots_to_notify.retain(|slot| ancestors.contains(slot));
1016 slots_to_notify.push(slot);
1017 for s in slots_to_notify {
1018 if s > max_complete_transaction_status_slot.load(Ordering::SeqCst)
1023 || s > max_complete_rewards_slot.load(Ordering::SeqCst)
1024 {
1025 break;
1026 }
1027
1028 let block_update_result = blockstore
1029 .get_complete_block(s, false)
1030 .map_err(|e| {
1031 error!("get_complete_block error: {}", e);
1032 RpcBlockUpdateError::BlockStoreError
1033 })
1034 .and_then(|block| filter_block_result_txs(block, s, params));
1035
1036 match block_update_result {
1037 Ok(block_update) => {
1038 if let Some(block_update) = block_update {
1039 notifier.notify(
1040 RpcResponse::from(RpcNotificationResponse {
1041 context: RpcNotificationContext { slot: s },
1042 value: block_update,
1043 }),
1044 subscription,
1045 false,
1046 );
1047 num_blocks_notified.fetch_add(1, Ordering::Relaxed);
1048 *w_last_unnotified_slot = s + 1;
1051 }
1052 }
1053 Err(err) => {
1054 notifier.notify(
1057 RpcResponse::from(RpcNotificationResponse {
1058 context: RpcNotificationContext { slot: s },
1059 value: RpcBlockUpdate {
1060 slot,
1061 block: None,
1062 err: Some(err),
1063 },
1064 }),
1065 subscription,
1066 false,
1067 );
1068 }
1069 }
1070 }
1071 }
1072 }
1073 }
1074 SubscriptionParams::Logs(params) => {
1075 num_logs_found.fetch_add(1, Ordering::Relaxed);
1076 if let Some(slot) = slot {
1077 let notified = check_commitment_and_notify(
1078 params,
1079 subscription,
1080 bank_forks,
1081 slot,
1082 get_transaction_logs,
1083 filter_logs_results,
1084 notifier,
1085 false,
1086 );
1087
1088 if notified {
1089 num_logs_notified.fetch_add(1, Ordering::Relaxed);
1090 }
1091 }
1092 }
1093 SubscriptionParams::Program(params) => {
1094 num_programs_found.fetch_add(1, Ordering::Relaxed);
1095 if let Some(slot) = slot {
1096 let notified = check_commitment_and_notify(
1097 params,
1098 subscription,
1099 bank_forks,
1100 slot,
1101 |bank, params| {
1102 bank.get_program_accounts_modified_since_parent(¶ms.pubkey)
1103 },
1104 filter_program_results,
1105 notifier,
1106 false,
1107 );
1108
1109 if notified {
1110 num_programs_notified.fetch_add(1, Ordering::Relaxed);
1111 }
1112 }
1113 }
1114 SubscriptionParams::Signature(params) => {
1115 num_signatures_found.fetch_add(1, Ordering::Relaxed);
1116 if let Some(slot) = slot {
1117 let notified = check_commitment_and_notify(
1118 params,
1119 subscription,
1120 bank_forks,
1121 slot,
1122 |bank, params| {
1123 bank.get_signature_status_processed_since_parent(¶ms.signature)
1124 },
1125 filter_signature_result,
1126 notifier,
1127 true, );
1129
1130 if notified {
1131 num_signatures_notified.fetch_add(1, Ordering::Relaxed);
1132 }
1133 }
1134 }
1135 _ => error!("wrong subscription type in alps map"),
1136 }
1137 });
1138
1139 total_time.stop();
1140
1141 let total_notified = num_accounts_notified.load(Ordering::Relaxed)
1142 + num_logs_notified.load(Ordering::Relaxed)
1143 + num_programs_notified.load(Ordering::Relaxed)
1144 + num_signatures_notified.load(Ordering::Relaxed);
1145 let total_ms = total_time.as_ms();
1146 if total_notified > 0 || total_ms > 10 {
1147 debug!(
1148 "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}",
1149 source,
1150 num_accounts_found.load(Ordering::Relaxed),
1151 num_accounts_notified.load(Ordering::Relaxed),
1152 num_logs_found.load(Ordering::Relaxed),
1153 num_logs_notified.load(Ordering::Relaxed),
1154 num_programs_found.load(Ordering::Relaxed),
1155 num_programs_notified.load(Ordering::Relaxed),
1156 num_signatures_found.load(Ordering::Relaxed),
1157 num_signatures_notified.load(Ordering::Relaxed),
1158 );
1159 datapoint_info!(
1160 "rpc_subscriptions",
1161 ("source", source, String),
1162 (
1163 "num_account_subscriptions",
1164 num_accounts_found.load(Ordering::Relaxed),
1165 i64
1166 ),
1167 (
1168 "num_account_pubkeys_notified",
1169 num_accounts_notified.load(Ordering::Relaxed),
1170 i64
1171 ),
1172 (
1173 "num_logs_subscriptions",
1174 num_logs_found.load(Ordering::Relaxed),
1175 i64
1176 ),
1177 (
1178 "num_logs_notified",
1179 num_logs_notified.load(Ordering::Relaxed),
1180 i64
1181 ),
1182 (
1183 "num_program_subscriptions",
1184 num_programs_found.load(Ordering::Relaxed),
1185 i64
1186 ),
1187 (
1188 "num_programs_notified",
1189 num_programs_notified.load(Ordering::Relaxed),
1190 i64
1191 ),
1192 (
1193 "num_signature_subscriptions",
1194 num_signatures_found.load(Ordering::Relaxed),
1195 i64
1196 ),
1197 (
1198 "num_signatures_notified",
1199 num_signatures_notified.load(Ordering::Relaxed),
1200 i64
1201 ),
1202 ("notifications_time", total_time.as_us() as i64, i64),
1203 );
1204 }
1205 }
1206
1207 fn shutdown(&mut self) -> std::thread::Result<()> {
1208 if self.t_cleanup.is_some() {
1209 info!("RPC Notification thread - shutting down");
1210 self.exit.store(true, Ordering::Relaxed);
1211 let x = self.t_cleanup.take().unwrap().join();
1212 info!("RPC Notification thread - shut down.");
1213 x
1214 } else {
1215 warn!("RPC Notification thread - already shut down.");
1216 Ok(())
1217 }
1218 }
1219
1220 #[cfg(test)]
1221 fn total(&self) -> usize {
1222 self.control.total()
1223 }
1224}
1225
1226#[cfg(test)]
1227pub(crate) mod tests {
1228 use {
1229 super::*,
1230 crate::{
1231 optimistically_confirmed_bank_tracker::{
1232 BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
1233 },
1234 rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
1235 rpc_pubsub::RpcSolPubSubInternal,
1236 rpc_pubsub_service,
1237 },
1238 serial_test::serial,
1239 solana_ledger::get_tmp_ledger_path_auto_delete,
1240 solana_rpc_client_api::config::{
1241 RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
1242 RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
1243 RpcTransactionLogsFilter,
1244 },
1245 solana_runtime::{
1246 commitment::BlockCommitment,
1247 genesis_utils::{create_genesis_config, GenesisConfigInfo},
1248 prioritization_fee_cache::PrioritizationFeeCache,
1249 },
1250 solana_sdk::{
1251 commitment_config::CommitmentConfig,
1252 message::Message,
1253 signature::{Keypair, Signer},
1254 stake, system_instruction, system_program, system_transaction,
1255 transaction::Transaction,
1256 },
1257 solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
1258 std::{
1259 collections::HashSet,
1260 sync::atomic::{AtomicU64, Ordering::Relaxed},
1261 },
1262 };
1263
1264 struct AccountResult {
1265 lamports: u64,
1266 subscription: u64,
1267 data: &'static str,
1268 space: usize,
1269 }
1270
1271 fn make_account_result(
1272 non_default_account: bool,
1273 account_result: AccountResult,
1274 ) -> serde_json::Value {
1275 json!({
1276 "jsonrpc": "2.0",
1277 "method": "accountNotification",
1278 "params": {
1279 "result": {
1280 "context": { "slot": 1 },
1281 "value": {
1282 "data": account_result.data,
1283 "executable": false,
1284 "lamports": account_result.lamports,
1285 "owner": "11111111111111111111111111111111",
1286 "rentEpoch": if non_default_account {u64::MAX} else {0},
1287 "space": account_result.space,
1288 },
1289 },
1290 "subscription": account_result.subscription,
1291 }
1292 })
1293 }
1294
1295 #[test]
1296 #[serial]
1297 fn test_check_account_subscribe() {
1298 let GenesisConfigInfo {
1299 genesis_config,
1300 mint_keypair,
1301 ..
1302 } = create_genesis_config(100);
1303 let bank = Bank::new_for_tests(&genesis_config);
1304 let blockhash = bank.last_blockhash();
1305 let bank_forks = BankForks::new_rw_arc(bank);
1306 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1307 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1308 bank_forks.write().unwrap().insert(bank1);
1309 let alice = Keypair::new();
1310
1311 let exit = Arc::new(AtomicBool::new(false));
1312 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1313 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1314 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1315 exit,
1316 max_complete_transaction_status_slot,
1317 max_complete_rewards_slot,
1318 bank_forks.clone(),
1319 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1320 1, 1,
1321 ))),
1322 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
1323 ));
1324
1325 let tx0 = system_transaction::create_account(
1326 &mint_keypair,
1327 &alice,
1328 blockhash,
1329 1,
1330 0,
1331 &system_program::id(),
1332 );
1333 let expected0 = make_account_result(
1334 true,
1335 AccountResult {
1336 lamports: 1,
1337 subscription: 0,
1338 space: 0,
1339 data: "",
1340 },
1341 );
1342
1343 let tx1 = {
1344 let instruction =
1345 system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1);
1346 let message = Message::new(&[instruction], Some(&mint_keypair.pubkey()));
1347 Transaction::new(&[&alice, &mint_keypair], message, blockhash)
1348 };
1349 let expected1 = make_account_result(
1350 false,
1351 AccountResult {
1352 lamports: 0,
1353 subscription: 2,
1354 space: 0,
1355 data: "",
1356 },
1357 );
1358
1359 let tx2 = system_transaction::create_account(
1360 &mint_keypair,
1361 &alice,
1362 blockhash,
1363 1,
1364 1024,
1365 &system_program::id(),
1366 );
1367 let expected2 = make_account_result(
1368 true,
1369 AccountResult {
1370 lamports: 1,
1371 subscription: 4,
1372 space: 1024,
1373 data: "error: data too large for bs58 encoding",
1374 },
1375 );
1376
1377 let subscribe_cases = vec![
1378 (alice.pubkey(), tx0, expected0),
1379 (alice.pubkey(), tx1, expected1),
1380 (alice.pubkey(), tx2, expected2),
1381 ];
1382
1383 for (pubkey, tx, expected) in subscribe_cases {
1384 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1385
1386 let sub_id = rpc
1387 .account_subscribe(
1388 pubkey.to_string(),
1389 Some(RpcAccountInfoConfig {
1390 commitment: Some(CommitmentConfig::processed()),
1391 encoding: None,
1392 data_slice: None,
1393 min_context_slot: None,
1394 }),
1395 )
1396 .unwrap();
1397
1398 subscriptions
1399 .control
1400 .assert_subscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1401 pubkey,
1402 commitment: CommitmentConfig::processed(),
1403 data_slice: None,
1404 encoding: UiAccountEncoding::Binary,
1405 }));
1406
1407 rpc.block_until_processed(&subscriptions);
1408
1409 bank_forks
1410 .read()
1411 .unwrap()
1412 .get(1)
1413 .unwrap()
1414 .process_transaction(&tx)
1415 .unwrap();
1416 let commitment_slots = CommitmentSlots {
1417 slot: 1,
1418 ..CommitmentSlots::default()
1419 };
1420 subscriptions.notify_subscribers(commitment_slots);
1421 let response = receiver.recv();
1422
1423 assert_eq!(
1424 expected,
1425 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1426 );
1427 rpc.account_unsubscribe(sub_id).unwrap();
1428
1429 subscriptions
1430 .control
1431 .assert_unsubscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1432 pubkey,
1433 commitment: CommitmentConfig::processed(),
1434 data_slice: None,
1435 encoding: UiAccountEncoding::Binary,
1436 }));
1437 }
1438 }
1439
1440 #[test]
1441 #[serial]
1442 fn test_check_confirmed_block_subscribe() {
1443 let exit = Arc::new(AtomicBool::new(false));
1444 let GenesisConfigInfo {
1445 genesis_config,
1446 mint_keypair,
1447 ..
1448 } = create_genesis_config(10_000);
1449 let bank = Bank::new_for_tests(&genesis_config);
1450 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1451 let bank_forks = BankForks::new_rw_arc(bank);
1452 let optimistically_confirmed_bank =
1453 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1454 let ledger_path = get_tmp_ledger_path_auto_delete!();
1455 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1456 let blockstore = Arc::new(blockstore);
1457 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1458 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1459 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1460 exit,
1461 max_complete_transaction_status_slot,
1462 max_complete_rewards_slot,
1463 blockstore.clone(),
1464 bank_forks.clone(),
1465 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1466 optimistically_confirmed_bank,
1467 ));
1468 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1469 let filter = RpcBlockSubscribeFilter::All;
1470 let config = RpcBlockSubscribeConfig {
1471 commitment: Some(CommitmentConfig::confirmed()),
1472 encoding: Some(UiTransactionEncoding::Json),
1473 transaction_details: Some(TransactionDetails::Signatures),
1474 show_rewards: None,
1475 max_supported_transaction_version: None,
1476 };
1477 let params = BlockSubscriptionParams {
1478 kind: BlockSubscriptionKind::All,
1479 commitment: config.commitment.unwrap(),
1480 encoding: config.encoding.unwrap(),
1481 transaction_details: config.transaction_details.unwrap(),
1482 show_rewards: config.show_rewards.unwrap_or_default(),
1483 max_supported_transaction_version: config.max_supported_transaction_version,
1484 };
1485 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1486
1487 subscriptions
1488 .control
1489 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1490
1491 let bank = bank_forks.read().unwrap().working_bank();
1492 let keypair1 = Keypair::new();
1493 let keypair2 = Keypair::new();
1494 let keypair3 = Keypair::new();
1495 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1496 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1497 .unwrap();
1498 populate_blockstore_for_tests(
1499 create_test_transaction_entries(
1500 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1501 bank.clone(),
1502 )
1503 .0,
1504 bank,
1505 blockstore.clone(),
1506 max_complete_transaction_status_slot,
1507 );
1508
1509 let slot = 0;
1510 subscriptions.notify_gossip_subscribers(slot);
1511 let actual_resp = receiver.recv();
1512 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1513
1514 let confirmed_block =
1515 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1516 let block = confirmed_block
1517 .encode_with_options(
1518 params.encoding,
1519 BlockEncodingOptions {
1520 transaction_details: params.transaction_details,
1521 show_rewards: false,
1522 max_supported_transaction_version: None,
1523 },
1524 )
1525 .unwrap();
1526 let expected_resp = RpcBlockUpdate {
1527 slot,
1528 block: Some(block),
1529 err: None,
1530 };
1531 let expected_resp = json!({
1532 "jsonrpc": "2.0",
1533 "method": "blockNotification",
1534 "params": {
1535 "result": {
1536 "context": { "slot": slot },
1537 "value": expected_resp,
1538 },
1539 "subscription": 0,
1540 }
1541 });
1542 assert_eq!(expected_resp, actual_resp);
1543
1544 subscriptions.notify_subscribers(CommitmentSlots {
1546 slot,
1547 root: slot,
1548 highest_confirmed_slot: slot,
1549 highest_super_majority_root: slot,
1550 });
1551 let should_err = receiver.recv_timeout(Duration::from_millis(300));
1552 assert!(should_err.is_err());
1553
1554 rpc.slot_unsubscribe(sub_id).unwrap();
1555 subscriptions
1556 .control
1557 .assert_unsubscribed(&SubscriptionParams::Block(params));
1558 }
1559
1560 #[test]
1561 #[serial]
1562 fn test_check_confirmed_block_subscribe_with_mentions() {
1563 let exit = Arc::new(AtomicBool::new(false));
1564 let GenesisConfigInfo {
1565 genesis_config,
1566 mint_keypair,
1567 ..
1568 } = create_genesis_config(10_000);
1569 let bank = Bank::new_for_tests(&genesis_config);
1570 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1571 let bank_forks = BankForks::new_rw_arc(bank);
1572 let optimistically_confirmed_bank =
1573 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1574 let ledger_path = get_tmp_ledger_path_auto_delete!();
1575 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1576 let blockstore = Arc::new(blockstore);
1577 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1578 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1579 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1580 exit,
1581 max_complete_transaction_status_slot,
1582 max_complete_rewards_slot,
1583 blockstore.clone(),
1584 bank_forks.clone(),
1585 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1586 optimistically_confirmed_bank,
1587 ));
1588 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1589 let keypair1 = Keypair::new();
1590 let filter =
1591 RpcBlockSubscribeFilter::MentionsAccountOrProgram(keypair1.pubkey().to_string());
1592 let config = RpcBlockSubscribeConfig {
1593 commitment: Some(CommitmentConfig::confirmed()),
1594 encoding: Some(UiTransactionEncoding::Json),
1595 transaction_details: Some(TransactionDetails::Signatures),
1596 show_rewards: None,
1597 max_supported_transaction_version: None,
1598 };
1599 let params = BlockSubscriptionParams {
1600 kind: BlockSubscriptionKind::MentionsAccountOrProgram(keypair1.pubkey()),
1601 commitment: config.commitment.unwrap(),
1602 encoding: config.encoding.unwrap(),
1603 transaction_details: config.transaction_details.unwrap(),
1604 show_rewards: config.show_rewards.unwrap_or_default(),
1605 max_supported_transaction_version: config.max_supported_transaction_version,
1606 };
1607 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1608
1609 subscriptions
1610 .control
1611 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1612
1613 let bank = bank_forks.read().unwrap().working_bank();
1614 let keypair2 = Keypair::new();
1615 let keypair3 = Keypair::new();
1616 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1617 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1618 .unwrap();
1619 populate_blockstore_for_tests(
1620 create_test_transaction_entries(
1621 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1622 bank.clone(),
1623 )
1624 .0,
1625 bank,
1626 blockstore.clone(),
1627 max_complete_transaction_status_slot,
1628 );
1629
1630 let slot = 0;
1631 subscriptions.notify_gossip_subscribers(slot);
1632 let actual_resp = receiver.recv();
1633 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1634
1635 let mut confirmed_block =
1637 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1638 confirmed_block.transactions.retain(|tx_with_meta| {
1639 tx_with_meta
1640 .account_keys()
1641 .iter()
1642 .any(|key| key == &keypair1.pubkey())
1643 });
1644 let block = confirmed_block
1645 .encode_with_options(
1646 params.encoding,
1647 BlockEncodingOptions {
1648 transaction_details: params.transaction_details,
1649 show_rewards: false,
1650 max_supported_transaction_version: None,
1651 },
1652 )
1653 .unwrap();
1654 let expected_resp = RpcBlockUpdate {
1655 slot,
1656 block: Some(block),
1657 err: None,
1658 };
1659 let expected_resp = json!({
1660 "jsonrpc": "2.0",
1661 "method": "blockNotification",
1662 "params": {
1663 "result": {
1664 "context": { "slot": slot },
1665 "value": expected_resp,
1666 },
1667 "subscription": 0,
1668 }
1669 });
1670 assert_eq!(expected_resp, actual_resp);
1671
1672 rpc.slot_unsubscribe(sub_id).unwrap();
1673 subscriptions
1674 .control
1675 .assert_unsubscribed(&SubscriptionParams::Block(params));
1676 }
1677
1678 #[test]
1679 #[serial]
1680 fn test_check_finalized_block_subscribe() {
1681 let exit = Arc::new(AtomicBool::new(false));
1682 let GenesisConfigInfo {
1683 genesis_config,
1684 mint_keypair,
1685 ..
1686 } = create_genesis_config(10_000);
1687 let bank = Bank::new_for_tests(&genesis_config);
1688 let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1689 let bank_forks = BankForks::new_rw_arc(bank);
1690 let optimistically_confirmed_bank =
1691 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1692 let ledger_path = get_tmp_ledger_path_auto_delete!();
1693 let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1694 let blockstore = Arc::new(blockstore);
1695 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1696 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1697 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1698 exit,
1699 max_complete_transaction_status_slot,
1700 max_complete_rewards_slot,
1701 blockstore.clone(),
1702 bank_forks.clone(),
1703 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1704 optimistically_confirmed_bank,
1705 ));
1706 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1707 let filter = RpcBlockSubscribeFilter::All;
1708 let config = RpcBlockSubscribeConfig {
1709 commitment: Some(CommitmentConfig::finalized()),
1710 encoding: Some(UiTransactionEncoding::Json),
1711 transaction_details: Some(TransactionDetails::Signatures),
1712 show_rewards: None,
1713 max_supported_transaction_version: None,
1714 };
1715 let params = BlockSubscriptionParams {
1716 kind: BlockSubscriptionKind::All,
1717 commitment: config.commitment.unwrap(),
1718 encoding: config.encoding.unwrap(),
1719 transaction_details: config.transaction_details.unwrap(),
1720 show_rewards: config.show_rewards.unwrap_or_default(),
1721 max_supported_transaction_version: config.max_supported_transaction_version,
1722 };
1723 let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1724 subscriptions
1725 .control
1726 .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1727
1728 let bank = bank_forks.read().unwrap().working_bank();
1729 let keypair1 = Keypair::new();
1730 let keypair2 = Keypair::new();
1731 let keypair3 = Keypair::new();
1732 let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1733 bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1734 .unwrap();
1735 populate_blockstore_for_tests(
1736 create_test_transaction_entries(
1737 vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1738 bank.clone(),
1739 )
1740 .0,
1741 bank,
1742 blockstore.clone(),
1743 max_complete_transaction_status_slot,
1744 );
1745
1746 let slot = 0;
1747 subscriptions.notify_subscribers(CommitmentSlots {
1748 slot,
1749 root: slot,
1750 highest_confirmed_slot: slot,
1751 highest_super_majority_root: slot,
1752 });
1753 let actual_resp = receiver.recv();
1754 let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1755
1756 let confirmed_block =
1757 ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1758 let block = confirmed_block
1759 .encode_with_options(
1760 params.encoding,
1761 BlockEncodingOptions {
1762 transaction_details: params.transaction_details,
1763 show_rewards: false,
1764 max_supported_transaction_version: None,
1765 },
1766 )
1767 .unwrap();
1768 let expected_resp = RpcBlockUpdate {
1769 slot,
1770 block: Some(block),
1771 err: None,
1772 };
1773 let expected_resp = json!({
1774 "jsonrpc": "2.0",
1775 "method": "blockNotification",
1776 "params": {
1777 "result": {
1778 "context": { "slot": slot },
1779 "value": expected_resp,
1780 },
1781 "subscription": 0,
1782 }
1783 });
1784 assert_eq!(expected_resp, actual_resp);
1785
1786 subscriptions.notify_gossip_subscribers(slot);
1788 let should_err = receiver.recv_timeout(Duration::from_millis(300));
1789 assert!(should_err.is_err());
1790
1791 rpc.slot_unsubscribe(sub_id).unwrap();
1792 subscriptions
1793 .control
1794 .assert_unsubscribed(&SubscriptionParams::Block(params));
1795 }
1796
1797 #[test]
1798 #[serial]
1799 fn test_check_program_subscribe() {
1800 let GenesisConfigInfo {
1801 genesis_config,
1802 mint_keypair,
1803 ..
1804 } = create_genesis_config(100);
1805 let bank = Bank::new_for_tests(&genesis_config);
1806 let blockhash = bank.last_blockhash();
1807 let bank_forks = BankForks::new_rw_arc(bank);
1808 let alice = Keypair::new();
1809 let tx = system_transaction::create_account(
1810 &mint_keypair,
1811 &alice,
1812 blockhash,
1813 1,
1814 16,
1815 &stake::program::id(),
1816 );
1817 bank_forks
1818 .read()
1819 .unwrap()
1820 .get(0)
1821 .unwrap()
1822 .process_transaction(&tx)
1823 .unwrap();
1824
1825 let exit = Arc::new(AtomicBool::new(false));
1826 let optimistically_confirmed_bank =
1827 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1828 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1829 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1830 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1831 exit,
1832 max_complete_transaction_status_slot,
1833 max_complete_rewards_slot,
1834 bank_forks,
1835 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1836 optimistically_confirmed_bank,
1837 ));
1838 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1839 let sub_id = rpc
1840 .program_subscribe(
1841 stake::program::id().to_string(),
1842 Some(RpcProgramAccountsConfig {
1843 account_config: RpcAccountInfoConfig {
1844 commitment: Some(CommitmentConfig::processed()),
1845 ..RpcAccountInfoConfig::default()
1846 },
1847 ..RpcProgramAccountsConfig::default()
1848 }),
1849 )
1850 .unwrap();
1851
1852 subscriptions
1853 .control
1854 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1855 pubkey: stake::program::id(),
1856 filters: Vec::new(),
1857 commitment: CommitmentConfig::processed(),
1858 data_slice: None,
1859 encoding: UiAccountEncoding::Binary,
1860 with_context: false,
1861 }));
1862
1863 subscriptions.notify_subscribers(CommitmentSlots::default());
1864 let response = receiver.recv();
1865 let expected = json!({
1866 "jsonrpc": "2.0",
1867 "method": "programNotification",
1868 "params": {
1869 "result": {
1870 "context": { "slot": 0 },
1871 "value": {
1872 "account": {
1873 "data": "1111111111111111",
1874 "executable": false,
1875 "lamports": 1,
1876 "owner": "Stake11111111111111111111111111111111111111",
1877 "rentEpoch": u64::MAX,
1878 "space": 16,
1879 },
1880 "pubkey": alice.pubkey().to_string(),
1881 },
1882 },
1883 "subscription": 0,
1884 }
1885 });
1886 assert_eq!(
1887 expected,
1888 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1889 );
1890
1891 rpc.program_unsubscribe(sub_id).unwrap();
1892 subscriptions
1893 .control
1894 .assert_unsubscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1895 pubkey: stake::program::id(),
1896 filters: Vec::new(),
1897 commitment: CommitmentConfig::processed(),
1898 data_slice: None,
1899 encoding: UiAccountEncoding::Binary,
1900 with_context: false,
1901 }));
1902 }
1903
1904 #[test]
1905 #[serial]
1906 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() {
1907 let GenesisConfigInfo {
1911 genesis_config,
1912 mint_keypair,
1913 ..
1914 } = create_genesis_config(100);
1915 let bank = Bank::new_for_tests(&genesis_config);
1916 bank.lazy_rent_collection.store(true, Relaxed);
1917
1918 let blockhash = bank.last_blockhash();
1919 let bank_forks = BankForks::new_rw_arc(bank);
1920
1921 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1922 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1923 bank_forks.write().unwrap().insert(bank1);
1924 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
1925
1926 let alice = Keypair::new();
1928 let tx = system_transaction::create_account(
1929 &mint_keypair,
1930 &alice,
1931 blockhash,
1932 1,
1933 16,
1934 &stake::program::id(),
1935 );
1936
1937 bank1.process_transaction(&tx).unwrap();
1938
1939 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
1940 bank_forks.write().unwrap().insert(bank2);
1941
1942 let bob = Keypair::new();
1944 let tx = system_transaction::create_account(
1945 &mint_keypair,
1946 &bob,
1947 blockhash,
1948 2,
1949 16,
1950 &stake::program::id(),
1951 );
1952 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
1953
1954 bank2.process_transaction(&tx).unwrap();
1955
1956 let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
1957 bank_forks.write().unwrap().insert(bank3);
1958
1959 let joe = Keypair::new();
1961 let tx = system_transaction::create_account(
1962 &mint_keypair,
1963 &joe,
1964 blockhash,
1965 3,
1966 16,
1967 &stake::program::id(),
1968 );
1969 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
1970
1971 bank3.process_transaction(&tx).unwrap();
1972
1973 let exit = Arc::new(AtomicBool::new(false));
1975 let optimistically_confirmed_bank =
1976 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1977 let mut pending_optimistically_confirmed_banks = HashSet::new();
1978 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1979 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1980 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1981 exit,
1982 max_complete_transaction_status_slot,
1983 max_complete_rewards_slot,
1984 bank_forks.clone(),
1985 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1986 1, 1,
1987 ))),
1988 optimistically_confirmed_bank.clone(),
1989 ));
1990
1991 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1992
1993 let sub_id = rpc
1994 .program_subscribe(
1995 stake::program::id().to_string(),
1996 Some(RpcProgramAccountsConfig {
1997 account_config: RpcAccountInfoConfig {
1998 commitment: Some(CommitmentConfig::confirmed()),
1999 ..RpcAccountInfoConfig::default()
2000 },
2001 ..RpcProgramAccountsConfig::default()
2002 }),
2003 )
2004 .unwrap();
2005
2006 subscriptions
2007 .control
2008 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2009 pubkey: stake::program::id(),
2010 filters: Vec::new(),
2011 encoding: UiAccountEncoding::Binary,
2012 data_slice: None,
2013 commitment: CommitmentConfig::confirmed(),
2014 with_context: false,
2015 }));
2016
2017 let mut highest_confirmed_slot: Slot = 0;
2018 let mut highest_root_slot: Slot = 0;
2019 let mut last_notified_confirmed_slot: Slot = 0;
2020 OptimisticallyConfirmedBankTracker::process_notification(
2023 BankNotification::OptimisticallyConfirmed(3),
2024 &bank_forks,
2025 &optimistically_confirmed_bank,
2026 &subscriptions,
2027 &mut pending_optimistically_confirmed_banks,
2028 &mut last_notified_confirmed_slot,
2029 &mut highest_confirmed_slot,
2030 &mut highest_root_slot,
2031 &None,
2032 &PrioritizationFeeCache::default(),
2033 );
2034
2035 let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2037 json!({
2038 "jsonrpc": "2.0",
2039 "method": "programNotification",
2040 "params": {
2041 "result": {
2042 "context": { "slot": slot },
2043 "value": {
2044 "account": {
2045 "data": "1111111111111111",
2046 "executable": false,
2047 "lamports": lamports,
2048 "owner": "Stake11111111111111111111111111111111111111",
2049 "rentEpoch": u64::MAX,
2050 "space": 16,
2051 },
2052 "pubkey": pubkey,
2053 },
2054 },
2055 "subscription": subscription,
2056 }
2057 })
2058 };
2059
2060 let response = receiver.recv();
2061 let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2062 assert_eq!(
2063 expected,
2064 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2065 );
2066
2067 let response = receiver.recv();
2068 let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2069 assert_eq!(
2070 expected,
2071 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2072 );
2073
2074 bank3.freeze();
2075 OptimisticallyConfirmedBankTracker::process_notification(
2076 BankNotification::Frozen(bank3),
2077 &bank_forks,
2078 &optimistically_confirmed_bank,
2079 &subscriptions,
2080 &mut pending_optimistically_confirmed_banks,
2081 &mut last_notified_confirmed_slot,
2082 &mut highest_confirmed_slot,
2083 &mut highest_root_slot,
2084 &None,
2085 &PrioritizationFeeCache::default(),
2086 );
2087
2088 let response = receiver.recv();
2089 let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2090 assert_eq!(
2091 expected,
2092 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2093 );
2094 rpc.program_unsubscribe(sub_id).unwrap();
2095 }
2096
2097 #[test]
2098 #[serial]
2099 #[should_panic]
2100 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications(
2101 ) {
2102 let GenesisConfigInfo {
2106 genesis_config,
2107 mint_keypair,
2108 ..
2109 } = create_genesis_config(100);
2110 let bank = Bank::new_for_tests(&genesis_config);
2111 bank.lazy_rent_collection.store(true, Relaxed);
2112
2113 let blockhash = bank.last_blockhash();
2114 let bank_forks = BankForks::new_rw_arc(bank);
2115
2116 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2117 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2118 bank_forks.write().unwrap().insert(bank1);
2119 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2120
2121 let alice = Keypair::new();
2123 let tx = system_transaction::create_account(
2124 &mint_keypair,
2125 &alice,
2126 blockhash,
2127 1,
2128 16,
2129 &stake::program::id(),
2130 );
2131
2132 bank1.process_transaction(&tx).unwrap();
2133
2134 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2135 bank_forks.write().unwrap().insert(bank2);
2136
2137 let bob = Keypair::new();
2139 let tx = system_transaction::create_account(
2140 &mint_keypair,
2141 &bob,
2142 blockhash,
2143 2,
2144 16,
2145 &stake::program::id(),
2146 );
2147 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2148
2149 bank2.process_transaction(&tx).unwrap();
2150
2151 let exit = Arc::new(AtomicBool::new(false));
2153 let optimistically_confirmed_bank =
2154 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2155 let mut pending_optimistically_confirmed_banks = HashSet::new();
2156 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2157 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2158 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2159 exit,
2160 max_complete_transaction_status_slot,
2161 max_complete_rewards_slot,
2162 bank_forks.clone(),
2163 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2164 1, 1,
2165 ))),
2166 optimistically_confirmed_bank.clone(),
2167 ));
2168 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2169 rpc.program_subscribe(
2170 stake::program::id().to_string(),
2171 Some(RpcProgramAccountsConfig {
2172 account_config: RpcAccountInfoConfig {
2173 commitment: Some(CommitmentConfig::confirmed()),
2174 ..RpcAccountInfoConfig::default()
2175 },
2176 ..RpcProgramAccountsConfig::default()
2177 }),
2178 )
2179 .unwrap();
2180
2181 subscriptions
2182 .control
2183 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2184 pubkey: stake::program::id(),
2185 filters: Vec::new(),
2186 encoding: UiAccountEncoding::Binary,
2187 data_slice: None,
2188 commitment: CommitmentConfig::confirmed(),
2189 with_context: false,
2190 }));
2191
2192 let mut highest_confirmed_slot: Slot = 0;
2193 let mut highest_root_slot: Slot = 0;
2194 let mut last_notified_confirmed_slot: Slot = 0;
2195 OptimisticallyConfirmedBankTracker::process_notification(
2198 BankNotification::OptimisticallyConfirmed(3),
2199 &bank_forks,
2200 &optimistically_confirmed_bank,
2201 &subscriptions,
2202 &mut pending_optimistically_confirmed_banks,
2203 &mut last_notified_confirmed_slot,
2204 &mut highest_confirmed_slot,
2205 &mut highest_root_slot,
2206 &None,
2207 &PrioritizationFeeCache::default(),
2208 );
2209
2210 let _response = receiver.recv();
2212 }
2213
2214 #[test]
2215 #[serial]
2216 fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() {
2217 let GenesisConfigInfo {
2222 genesis_config,
2223 mint_keypair,
2224 ..
2225 } = create_genesis_config(100);
2226 let bank = Bank::new_for_tests(&genesis_config);
2227 bank.lazy_rent_collection.store(true, Relaxed);
2228
2229 let blockhash = bank.last_blockhash();
2230 let bank_forks = BankForks::new_rw_arc(bank);
2231
2232 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2233 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2234 bank_forks.write().unwrap().insert(bank1);
2235 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2236
2237 let alice = Keypair::new();
2239 let tx = system_transaction::create_account(
2240 &mint_keypair,
2241 &alice,
2242 blockhash,
2243 1,
2244 16,
2245 &stake::program::id(),
2246 );
2247
2248 bank1.process_transaction(&tx).unwrap();
2249
2250 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2251 bank_forks.write().unwrap().insert(bank2);
2252
2253 let bob = Keypair::new();
2255 let tx = system_transaction::create_account(
2256 &mint_keypair,
2257 &bob,
2258 blockhash,
2259 2,
2260 16,
2261 &stake::program::id(),
2262 );
2263 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2264
2265 bank2.process_transaction(&tx).unwrap();
2266
2267 let exit = Arc::new(AtomicBool::new(false));
2269 let optimistically_confirmed_bank =
2270 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2271 let mut pending_optimistically_confirmed_banks = HashSet::new();
2272 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2273 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2274 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2275 exit,
2276 max_complete_transaction_status_slot,
2277 max_complete_rewards_slot,
2278 bank_forks.clone(),
2279 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2280 1, 1,
2281 ))),
2282 optimistically_confirmed_bank.clone(),
2283 ));
2284 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2285 let sub_id = rpc
2286 .program_subscribe(
2287 stake::program::id().to_string(),
2288 Some(RpcProgramAccountsConfig {
2289 account_config: RpcAccountInfoConfig {
2290 commitment: Some(CommitmentConfig::confirmed()),
2291 ..RpcAccountInfoConfig::default()
2292 },
2293 ..RpcProgramAccountsConfig::default()
2294 }),
2295 )
2296 .unwrap();
2297
2298 subscriptions
2299 .control
2300 .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2301 pubkey: stake::program::id(),
2302 filters: Vec::new(),
2303 encoding: UiAccountEncoding::Binary,
2304 data_slice: None,
2305 commitment: CommitmentConfig::confirmed(),
2306 with_context: false,
2307 }));
2308
2309 let mut highest_confirmed_slot: Slot = 0;
2310 let mut highest_root_slot: Slot = 0;
2311 let mut last_notified_confirmed_slot: Slot = 0;
2312 OptimisticallyConfirmedBankTracker::process_notification(
2316 BankNotification::OptimisticallyConfirmed(3),
2317 &bank_forks,
2318 &optimistically_confirmed_bank,
2319 &subscriptions,
2320 &mut pending_optimistically_confirmed_banks,
2321 &mut last_notified_confirmed_slot,
2322 &mut highest_confirmed_slot,
2323 &mut highest_root_slot,
2324 &None,
2325 &PrioritizationFeeCache::default(),
2326 );
2327
2328 let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2330 json!({
2331 "jsonrpc": "2.0",
2332 "method": "programNotification",
2333 "params": {
2334 "result": {
2335 "context": { "slot": slot },
2336 "value": {
2337 "account": {
2338 "data": "1111111111111111",
2339 "executable": false,
2340 "lamports": lamports,
2341 "owner": "Stake11111111111111111111111111111111111111",
2342 "rentEpoch": u64::MAX,
2343 "space": 16,
2344 },
2345 "pubkey": pubkey,
2346 },
2347 },
2348 "subscription": subscription,
2349 }
2350 })
2351 };
2352
2353 let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
2354 bank_forks.write().unwrap().insert(bank3);
2355
2356 let joe = Keypair::new();
2358 let tx = system_transaction::create_account(
2359 &mint_keypair,
2360 &joe,
2361 blockhash,
2362 3,
2363 16,
2364 &stake::program::id(),
2365 );
2366 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
2367
2368 bank3.process_transaction(&tx).unwrap();
2369 bank3.freeze();
2370 OptimisticallyConfirmedBankTracker::process_notification(
2371 BankNotification::Frozen(bank3),
2372 &bank_forks,
2373 &optimistically_confirmed_bank,
2374 &subscriptions,
2375 &mut pending_optimistically_confirmed_banks,
2376 &mut last_notified_confirmed_slot,
2377 &mut highest_confirmed_slot,
2378 &mut highest_root_slot,
2379 &None,
2380 &PrioritizationFeeCache::default(),
2381 );
2382
2383 let response = receiver.recv();
2384 let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2385 assert_eq!(
2386 expected,
2387 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2388 );
2389
2390 let response = receiver.recv();
2391 let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2392 assert_eq!(
2393 expected,
2394 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2395 );
2396
2397 let response = receiver.recv();
2398 let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2399 assert_eq!(
2400 expected,
2401 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2402 );
2403 rpc.program_unsubscribe(sub_id).unwrap();
2404 }
2405
2406 #[test]
2407 #[serial]
2408 fn test_check_signature_subscribe() {
2409 let GenesisConfigInfo {
2410 genesis_config,
2411 mint_keypair,
2412 ..
2413 } = create_genesis_config(100);
2414 let bank = Bank::new_for_tests(&genesis_config);
2415 let blockhash = bank.last_blockhash();
2416 let bank_forks = BankForks::new_rw_arc(bank);
2417 let alice = Keypair::new();
2418
2419 let past_bank_tx =
2420 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
2421 let unprocessed_tx =
2422 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
2423 let processed_tx =
2424 system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
2425
2426 bank_forks
2427 .read()
2428 .unwrap()
2429 .get(0)
2430 .unwrap()
2431 .process_transaction(&past_bank_tx)
2432 .unwrap();
2433
2434 let next_bank = Bank::new_from_parent(
2435 bank_forks.read().unwrap().get(0).unwrap(),
2436 &solana_pubkey::new_rand(),
2437 1,
2438 );
2439 bank_forks.write().unwrap().insert(next_bank);
2440
2441 bank_forks
2442 .read()
2443 .unwrap()
2444 .get(1)
2445 .unwrap()
2446 .process_transaction(&processed_tx)
2447 .unwrap();
2448 let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
2449
2450 let mut cache0 = BlockCommitment::default();
2451 cache0.increase_confirmation_stake(1, 10);
2452 let cache1 = BlockCommitment::default();
2453
2454 let mut block_commitment = HashMap::new();
2455 block_commitment.entry(0).or_insert(cache0);
2456 block_commitment.entry(1).or_insert(cache1);
2457 let block_commitment_cache = BlockCommitmentCache::new(
2458 block_commitment,
2459 10,
2460 CommitmentSlots {
2461 slot: bank1.slot(),
2462 ..CommitmentSlots::default()
2463 },
2464 );
2465
2466 let exit = Arc::new(AtomicBool::new(false));
2467 let optimistically_confirmed_bank =
2468 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2469 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2470 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2471 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2472 exit,
2473 max_complete_transaction_status_slot,
2474 max_complete_rewards_slot,
2475 bank_forks,
2476 Arc::new(RwLock::new(block_commitment_cache)),
2477 optimistically_confirmed_bank,
2478 ));
2479
2480 let (past_bank_rpc1, mut past_bank_receiver1) =
2481 rpc_pubsub_service::test_connection(&subscriptions);
2482 let (past_bank_rpc2, mut past_bank_receiver2) =
2483 rpc_pubsub_service::test_connection(&subscriptions);
2484 let (processed_rpc, mut processed_receiver) =
2485 rpc_pubsub_service::test_connection(&subscriptions);
2486 let (another_rpc, _another_receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2487 let (processed_rpc3, mut processed_receiver3) =
2488 rpc_pubsub_service::test_connection(&subscriptions);
2489
2490 let past_bank_sub_id1 = past_bank_rpc1
2491 .signature_subscribe(
2492 past_bank_tx.signatures[0].to_string(),
2493 Some(RpcSignatureSubscribeConfig {
2494 commitment: Some(CommitmentConfig::processed()),
2495 enable_received_notification: Some(false),
2496 }),
2497 )
2498 .unwrap();
2499 let past_bank_sub_id2 = past_bank_rpc2
2500 .signature_subscribe(
2501 past_bank_tx.signatures[0].to_string(),
2502 Some(RpcSignatureSubscribeConfig {
2503 commitment: Some(CommitmentConfig::finalized()),
2504 enable_received_notification: Some(false),
2505 }),
2506 )
2507 .unwrap();
2508 let processed_sub_id = processed_rpc
2509 .signature_subscribe(
2510 processed_tx.signatures[0].to_string(),
2511 Some(RpcSignatureSubscribeConfig {
2512 commitment: Some(CommitmentConfig::processed()),
2513 enable_received_notification: Some(false),
2514 }),
2515 )
2516 .unwrap();
2517 another_rpc
2518 .signature_subscribe(
2519 unprocessed_tx.signatures[0].to_string(),
2520 Some(RpcSignatureSubscribeConfig {
2521 commitment: Some(CommitmentConfig::processed()),
2522 enable_received_notification: Some(false),
2523 }),
2524 )
2525 .unwrap();
2526
2527 let processed_sub_id3 = processed_rpc3
2529 .signature_subscribe(
2530 unprocessed_tx.signatures[0].to_string(),
2531 Some(RpcSignatureSubscribeConfig {
2532 commitment: Some(CommitmentConfig::processed()),
2533 enable_received_notification: Some(true),
2534 }),
2535 )
2536 .unwrap();
2537
2538 assert!(subscriptions
2539 .control
2540 .signature_subscribed(&unprocessed_tx.signatures[0]));
2541 assert!(subscriptions
2542 .control
2543 .signature_subscribed(&processed_tx.signatures[0]));
2544
2545 let mut commitment_slots = CommitmentSlots::default();
2546 let received_slot = 1;
2547 commitment_slots.slot = received_slot;
2548 subscriptions
2549 .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
2550 subscriptions.notify_subscribers(commitment_slots);
2551 let expected_res =
2552 RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
2553 let received_expected_res =
2554 RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
2555 struct Notification {
2556 slot: Slot,
2557 id: u64,
2558 }
2559
2560 let expected_notification =
2561 |exp: Notification, expected_res: &RpcSignatureResult| -> String {
2562 let json = json!({
2563 "jsonrpc": "2.0",
2564 "method": "signatureNotification",
2565 "params": {
2566 "result": {
2567 "context": { "slot": exp.slot },
2568 "value": expected_res,
2569 },
2570 "subscription": exp.id,
2571 }
2572 });
2573 serde_json::to_string(&json).unwrap()
2574 };
2575
2576 let expected = expected_notification(
2579 Notification {
2580 slot: 1,
2581 id: past_bank_sub_id1.into(),
2582 },
2583 &expected_res,
2584 );
2585 let response = past_bank_receiver1.recv();
2586 assert_eq!(expected, response);
2587
2588 let expected = expected_notification(
2591 Notification {
2592 slot: 0,
2593 id: past_bank_sub_id2.into(),
2594 },
2595 &expected_res,
2596 );
2597 let response = past_bank_receiver2.recv();
2598 assert_eq!(expected, response);
2599
2600 let expected = expected_notification(
2601 Notification {
2602 slot: 1,
2603 id: processed_sub_id.into(),
2604 },
2605 &expected_res,
2606 );
2607 let response = processed_receiver.recv();
2608 assert_eq!(expected, response);
2609
2610 let expected = expected_notification(
2612 Notification {
2613 slot: received_slot,
2614 id: processed_sub_id3.into(),
2615 },
2616 &received_expected_res,
2617 );
2618 let response = processed_receiver3.recv();
2619 assert_eq!(expected, response);
2620
2621 assert!(!subscriptions
2624 .control
2625 .signature_subscribed(&processed_tx.signatures[0]));
2626 assert!(!subscriptions
2627 .control
2628 .signature_subscribed(&past_bank_tx.signatures[0]));
2629
2630 assert!(subscriptions
2632 .control
2633 .signature_subscribed(&unprocessed_tx.signatures[0]));
2634 }
2635
2636 #[test]
2637 #[serial]
2638 fn test_check_slot_subscribe() {
2639 let exit = Arc::new(AtomicBool::new(false));
2640 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2641 let bank = Bank::new_for_tests(&genesis_config);
2642 let bank_forks = BankForks::new_rw_arc(bank);
2643 let optimistically_confirmed_bank =
2644 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2645 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2646 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2647 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2648 exit,
2649 max_complete_transaction_status_slot,
2650 max_complete_rewards_slot,
2651 bank_forks,
2652 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2653 optimistically_confirmed_bank,
2654 ));
2655 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2656 let sub_id = rpc.slot_subscribe().unwrap();
2657
2658 subscriptions
2659 .control
2660 .assert_subscribed(&SubscriptionParams::Slot);
2661
2662 subscriptions.notify_slot(0, 0, 0);
2663 let response = receiver.recv();
2664
2665 let expected_res = SlotInfo {
2666 parent: 0,
2667 slot: 0,
2668 root: 0,
2669 };
2670 let expected_res_str = serde_json::to_string(&expected_res).unwrap();
2671
2672 let expected = format!(
2673 r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2674 );
2675 assert_eq!(expected, response);
2676
2677 rpc.slot_unsubscribe(sub_id).unwrap();
2678 subscriptions
2679 .control
2680 .assert_unsubscribed(&SubscriptionParams::Slot);
2681 }
2682
2683 #[test]
2684 #[serial]
2685 fn test_check_root_subscribe() {
2686 let exit = Arc::new(AtomicBool::new(false));
2687 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2688 let bank = Bank::new_for_tests(&genesis_config);
2689 let bank_forks = BankForks::new_rw_arc(bank);
2690 let optimistically_confirmed_bank =
2691 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2692 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2693 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2694 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2695 exit,
2696 max_complete_transaction_status_slot,
2697 max_complete_rewards_slot,
2698 bank_forks,
2699 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2700 optimistically_confirmed_bank,
2701 ));
2702 let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2703 let sub_id = rpc.root_subscribe().unwrap();
2704
2705 subscriptions
2706 .control
2707 .assert_subscribed(&SubscriptionParams::Root);
2708
2709 subscriptions.notify_roots(vec![2, 1, 3]);
2710
2711 for expected_root in 1..=3 {
2712 let response = receiver.recv();
2713
2714 let expected_res_str =
2715 serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap();
2716 let expected = format!(
2717 r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2718 );
2719 assert_eq!(expected, response);
2720 }
2721
2722 rpc.root_unsubscribe(sub_id).unwrap();
2723 subscriptions
2724 .control
2725 .assert_unsubscribed(&SubscriptionParams::Root);
2726 }
2727
2728 #[test]
2729 #[serial]
2730 fn test_gossip_separate_account_notifications() {
2731 let GenesisConfigInfo {
2732 genesis_config,
2733 mint_keypair,
2734 ..
2735 } = create_genesis_config(100);
2736 let bank = Bank::new_for_tests(&genesis_config);
2737 let blockhash = bank.last_blockhash();
2738 let bank_forks = BankForks::new_rw_arc(bank);
2739 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2740 let bank1 = Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 1);
2741 bank_forks.write().unwrap().insert(bank1);
2742 let bank2 = Bank::new_from_parent(bank0, &Pubkey::default(), 2);
2743 bank_forks.write().unwrap().insert(bank2);
2744
2745 let alice = Keypair::from_base58_string("sfLnS4rZ5a8gXke3aGxCgM6usFAVPxLUaBSRdssGY9uS5eoiEWQ41CqDcpXbcekpKsie8Lyy3LNFdhEvjUE1wd9");
2747
2748 let optimistically_confirmed_bank =
2749 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2750 let mut pending_optimistically_confirmed_banks = HashSet::new();
2751
2752 let exit = Arc::new(AtomicBool::new(false));
2753 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2754 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2755 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2756 exit,
2757 max_complete_transaction_status_slot,
2758 max_complete_rewards_slot,
2759 bank_forks.clone(),
2760 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2761 1, 1,
2762 ))),
2763 optimistically_confirmed_bank.clone(),
2764 ));
2765 let (rpc0, mut receiver0) = rpc_pubsub_service::test_connection(&subscriptions);
2766 let (rpc1, mut receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
2767 let sub_id0 = rpc0
2768 .account_subscribe(
2769 alice.pubkey().to_string(),
2770 Some(RpcAccountInfoConfig {
2771 commitment: Some(CommitmentConfig::confirmed()),
2772 encoding: None,
2773 data_slice: None,
2774 min_context_slot: None,
2775 }),
2776 )
2777 .unwrap();
2778
2779 assert!(subscriptions.control.account_subscribed(&alice.pubkey()));
2780 rpc0.block_until_processed(&subscriptions);
2781
2782 let tx = system_transaction::create_account(
2783 &mint_keypair,
2784 &alice,
2785 blockhash,
2786 1,
2787 16,
2788 &stake::program::id(),
2789 );
2790
2791 let bank1 = bank_forks.write().unwrap().get(1).unwrap();
2793 bank1.process_transaction(&tx).unwrap();
2794 bank1.freeze();
2795
2796 bank_forks
2798 .read()
2799 .unwrap()
2800 .get(2)
2801 .unwrap()
2802 .process_transaction(&tx)
2803 .unwrap();
2804
2805 let mut highest_confirmed_slot: Slot = 0;
2807 let mut highest_root_slot: Slot = 0;
2808 let mut last_notified_confirmed_slot: Slot = 0;
2809 OptimisticallyConfirmedBankTracker::process_notification(
2810 BankNotification::OptimisticallyConfirmed(2),
2811 &bank_forks,
2812 &optimistically_confirmed_bank,
2813 &subscriptions,
2814 &mut pending_optimistically_confirmed_banks,
2815 &mut last_notified_confirmed_slot,
2816 &mut highest_confirmed_slot,
2817 &mut highest_root_slot,
2818 &None,
2819 &PrioritizationFeeCache::default(),
2820 );
2821
2822 highest_confirmed_slot = 0;
2824 OptimisticallyConfirmedBankTracker::process_notification(
2825 BankNotification::OptimisticallyConfirmed(1),
2826 &bank_forks,
2827 &optimistically_confirmed_bank,
2828 &subscriptions,
2829 &mut pending_optimistically_confirmed_banks,
2830 &mut last_notified_confirmed_slot,
2831 &mut highest_confirmed_slot,
2832 &mut highest_root_slot,
2833 &None,
2834 &PrioritizationFeeCache::default(),
2835 );
2836
2837 let response = receiver0.recv();
2838 let expected = json!({
2839 "jsonrpc": "2.0",
2840 "method": "accountNotification",
2841 "params": {
2842 "result": {
2843 "context": { "slot": 1 },
2844 "value": {
2845 "data": "1111111111111111",
2846 "executable": false,
2847 "lamports": 1,
2848 "owner": "Stake11111111111111111111111111111111111111",
2849 "rentEpoch": u64::MAX,
2850 "space": 16,
2851 },
2852 },
2853 "subscription": 0,
2854 }
2855 });
2856 assert_eq!(
2857 expected,
2858 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2859 );
2860 rpc0.account_unsubscribe(sub_id0).unwrap();
2861 rpc0.block_until_processed(&subscriptions);
2862
2863 let sub_id1 = rpc1
2864 .account_subscribe(
2865 alice.pubkey().to_string(),
2866 Some(RpcAccountInfoConfig {
2867 commitment: Some(CommitmentConfig::confirmed()),
2868 encoding: None,
2869 data_slice: None,
2870 min_context_slot: None,
2871 }),
2872 )
2873 .unwrap();
2874 rpc1.block_until_processed(&subscriptions);
2875
2876 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2877 bank2.freeze();
2878 highest_confirmed_slot = 0;
2879 OptimisticallyConfirmedBankTracker::process_notification(
2880 BankNotification::Frozen(bank2),
2881 &bank_forks,
2882 &optimistically_confirmed_bank,
2883 &subscriptions,
2884 &mut pending_optimistically_confirmed_banks,
2885 &mut last_notified_confirmed_slot,
2886 &mut highest_confirmed_slot,
2887 &mut highest_root_slot,
2888 &None,
2889 &PrioritizationFeeCache::default(),
2890 );
2891 let response = receiver1.recv();
2892 let expected = json!({
2893 "jsonrpc": "2.0",
2894 "method": "accountNotification",
2895 "params": {
2896 "result": {
2897 "context": { "slot": 2 },
2898 "value": {
2899 "data": "1111111111111111",
2900 "executable": false,
2901 "lamports": 1,
2902 "owner": "Stake11111111111111111111111111111111111111",
2903 "rentEpoch": u64::MAX,
2904 "space": 16,
2905 },
2906 },
2907 "subscription": 3,
2908 }
2909 });
2910 assert_eq!(
2911 expected,
2912 serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2913 );
2914 rpc1.account_unsubscribe(sub_id1).unwrap();
2915
2916 assert!(!subscriptions.control.account_subscribed(&alice.pubkey()));
2917 }
2918
2919 fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value {
2920 json!({
2921 "jsonrpc": "2.0",
2922 "method": "logsNotification",
2923 "params": {
2924 "result": {
2925 "context": {
2926 "slot": 0
2927 },
2928 "value": {
2929 "signature": signature,
2930 "err": null,
2931 "logs": [
2932 "Program 11111111111111111111111111111111 invoke [1]",
2933 "Program 11111111111111111111111111111111 success"
2934 ]
2935 }
2936 },
2937 "subscription": subscription_id
2938 }
2939 })
2940 }
2941
2942 #[test]
2943 #[serial]
2944 fn test_logs_subscribe() {
2945 let GenesisConfigInfo {
2946 genesis_config,
2947 mint_keypair,
2948 ..
2949 } = create_genesis_config(100);
2950 let bank = Bank::new_for_tests(&genesis_config);
2951 let blockhash = bank.last_blockhash();
2952 let bank_forks = BankForks::new_rw_arc(bank);
2953
2954 let alice = Keypair::new();
2955
2956 let exit = Arc::new(AtomicBool::new(false));
2957 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2958 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2959 let optimistically_confirmed_bank =
2960 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2961 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2962 exit,
2963 max_complete_transaction_status_slot,
2964 max_complete_rewards_slot,
2965 bank_forks.clone(),
2966 Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2967 optimistically_confirmed_bank,
2968 ));
2969
2970 let sub_config = RpcTransactionLogsConfig {
2971 commitment: Some(CommitmentConfig::processed()),
2972 };
2973
2974 let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions);
2975 let sub_id_for_all = rpc_all
2976 .logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone()))
2977 .unwrap();
2978 assert!(subscriptions.control.logs_subscribed(None));
2979
2980 let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions);
2981 let sub_id_for_alice = rpc_alice
2982 .logs_subscribe(
2983 RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]),
2984 Some(sub_config),
2985 )
2986 .unwrap();
2987 assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
2988 rpc_alice.block_until_processed(&subscriptions);
2989
2990 let tx = system_transaction::create_account(
2991 &mint_keypair,
2992 &alice,
2993 blockhash,
2994 1,
2995 0,
2996 &system_program::id(),
2997 );
2998
2999 assert!(bank_forks
3000 .read()
3001 .unwrap()
3002 .get(0)
3003 .unwrap()
3004 .process_transaction_with_metadata(tx.clone())
3005 .is_ok());
3006
3007 subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0));
3008
3009 let expected_response_all =
3010 make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all));
3011 let response_all = receiver_all.recv();
3012 assert_eq!(
3013 expected_response_all,
3014 serde_json::from_str::<serde_json::Value>(&response_all).unwrap(),
3015 );
3016 let expected_response_alice =
3017 make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice));
3018 let response_alice = receiver_alice.recv();
3019 assert_eq!(
3020 expected_response_alice,
3021 serde_json::from_str::<serde_json::Value>(&response_alice).unwrap(),
3022 );
3023
3024 rpc_all.logs_unsubscribe(sub_id_for_all).unwrap();
3025 assert!(!subscriptions.control.logs_subscribed(None));
3026 rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap();
3027 assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
3028 }
3029
3030 #[test]
3031 fn test_total_subscriptions() {
3032 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
3033 let bank = Bank::new_for_tests(&genesis_config);
3034 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
3035 let max_complete_rewards_slot = Arc::new(AtomicU64::default());
3036 let bank_forks = BankForks::new_rw_arc(bank);
3037 let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
3038 max_complete_transaction_status_slot,
3039 max_complete_rewards_slot,
3040 bank_forks,
3041 ));
3042
3043 let (rpc1, _receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
3044 let sub_id1 = rpc1
3045 .account_subscribe(Pubkey::default().to_string(), None)
3046 .unwrap();
3047
3048 assert_eq!(subscriptions.total(), 1);
3049
3050 let (rpc2, _receiver2) = rpc_pubsub_service::test_connection(&subscriptions);
3051 let sub_id2 = rpc2
3052 .program_subscribe(Pubkey::default().to_string(), None)
3053 .unwrap();
3054
3055 assert_eq!(subscriptions.total(), 2);
3056
3057 let (rpc3, _receiver3) = rpc_pubsub_service::test_connection(&subscriptions);
3058 let sub_id3 = rpc3
3059 .logs_subscribe(RpcTransactionLogsFilter::All, None)
3060 .unwrap();
3061 assert_eq!(subscriptions.total(), 3);
3062
3063 let (rpc4, _receiver4) = rpc_pubsub_service::test_connection(&subscriptions);
3064 let sub_id4 = rpc4
3065 .signature_subscribe(Signature::default().to_string(), None)
3066 .unwrap();
3067
3068 assert_eq!(subscriptions.total(), 4);
3069
3070 let (rpc5, _receiver5) = rpc_pubsub_service::test_connection(&subscriptions);
3071 let sub_id5 = rpc5.slot_subscribe().unwrap();
3072
3073 assert_eq!(subscriptions.total(), 5);
3074
3075 let (rpc6, _receiver6) = rpc_pubsub_service::test_connection(&subscriptions);
3076 let sub_id6 = rpc6.vote_subscribe().unwrap();
3077
3078 assert_eq!(subscriptions.total(), 6);
3079
3080 let (rpc7, _receiver7) = rpc_pubsub_service::test_connection(&subscriptions);
3081 let sub_id7 = rpc7.root_subscribe().unwrap();
3082
3083 assert_eq!(subscriptions.total(), 7);
3084
3085 let (rpc8, _receiver8) = rpc_pubsub_service::test_connection(&subscriptions);
3087 let sub_id8 = rpc8
3088 .account_subscribe(Pubkey::default().to_string(), None)
3089 .unwrap();
3090 assert_eq!(subscriptions.total(), 7);
3091
3092 rpc1.account_unsubscribe(sub_id1).unwrap();
3093 assert_eq!(subscriptions.total(), 7);
3094
3095 rpc8.account_unsubscribe(sub_id8).unwrap();
3096 assert_eq!(subscriptions.total(), 6);
3097
3098 rpc2.program_unsubscribe(sub_id2).unwrap();
3099 assert_eq!(subscriptions.total(), 5);
3100
3101 rpc3.logs_unsubscribe(sub_id3).unwrap();
3102 assert_eq!(subscriptions.total(), 4);
3103
3104 rpc4.signature_unsubscribe(sub_id4).unwrap();
3105 assert_eq!(subscriptions.total(), 3);
3106
3107 rpc5.slot_unsubscribe(sub_id5).unwrap();
3108 assert_eq!(subscriptions.total(), 2);
3109
3110 rpc6.vote_unsubscribe(sub_id6).unwrap();
3111 assert_eq!(subscriptions.total(), 1);
3112
3113 rpc7.root_unsubscribe(sub_id7).unwrap();
3114 assert_eq!(subscriptions.total(), 0);
3115 }
3116}