solana_rpc/
rpc_subscriptions.rs

1//! The `pubsub` module implements a threaded subscription service on client RPC request
2
3use {
4    crate::{
5        filter::filter_allows,
6        optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
7        parsed_token_accounts::{get_parsed_token_account, get_parsed_token_accounts},
8        rpc_pubsub_service::PubSubConfig,
9        rpc_subscription_tracker::{
10            AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams,
11            LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams,
12            SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionInfo,
13            SubscriptionParams, SubscriptionsTracker,
14        },
15    },
16    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
17    itertools::Either,
18    rayon::prelude::*,
19    serde::Serialize,
20    solana_account_decoder::{
21        encode_ui_account, parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding,
22    },
23    solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
24    solana_measure::measure::Measure,
25    solana_rpc_client_api::response::{
26        ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
27        RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
28        RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
29    },
30    solana_runtime::{
31        bank::{Bank, TransactionLogInfo},
32        bank_forks::BankForks,
33        commitment::{BlockCommitmentCache, CommitmentSlots},
34    },
35    solana_sdk::{
36        account::{AccountSharedData, ReadableAccount},
37        clock::Slot,
38        pubkey::Pubkey,
39        signature::Signature,
40        timing::timestamp,
41        transaction,
42    },
43    solana_transaction_status::{
44        BlockEncodingOptions, ConfirmedBlock, EncodeError, VersionedConfirmedBlock,
45    },
46    solana_vote::vote_transaction::VoteTransaction,
47    std::{
48        cell::RefCell,
49        collections::{HashMap, VecDeque},
50        io::Cursor,
51        str,
52        sync::{
53            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
54            Arc, Mutex, RwLock, Weak,
55        },
56        thread::{Builder, JoinHandle},
57        time::{Duration, Instant},
58    },
59    tokio::sync::broadcast,
60};
61
62const RECEIVE_DELAY_MILLIS: u64 = 100;
63
64fn get_transaction_logs(
65    bank: &Bank,
66    params: &LogsSubscriptionParams,
67) -> Option<Vec<TransactionLogInfo>> {
68    let pubkey = match &params.kind {
69        LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
70        LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
71    };
72    let mut logs = bank.get_transaction_logs(pubkey);
73    if matches!(params.kind, LogsSubscriptionKind::All) {
74        // Filter out votes if the subscriber doesn't want them
75        if let Some(logs) = &mut logs {
76            logs.retain(|log| !log.is_vote);
77        }
78    }
79    logs
80}
81#[derive(Debug)]
82pub struct TimestampedNotificationEntry {
83    pub entry: NotificationEntry,
84    pub queued_at: Instant,
85}
86
87impl From<NotificationEntry> for TimestampedNotificationEntry {
88    fn from(entry: NotificationEntry) -> Self {
89        TimestampedNotificationEntry {
90            entry,
91            queued_at: Instant::now(),
92        }
93    }
94}
95
96pub enum NotificationEntry {
97    Slot(SlotInfo),
98    SlotUpdate(SlotUpdate),
99    Vote((Pubkey, VoteTransaction, Signature)),
100    Root(Slot),
101    Bank(CommitmentSlots),
102    Gossip(Slot),
103    SignaturesReceived((Slot, Vec<Signature>)),
104    Subscribed(SubscriptionParams, SubscriptionId),
105    Unsubscribed(SubscriptionParams, SubscriptionId),
106}
107
108impl std::fmt::Debug for NotificationEntry {
109    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110        match self {
111            NotificationEntry::Root(root) => write!(f, "Root({root})"),
112            NotificationEntry::Vote(vote) => write!(f, "Vote({vote:?})"),
113            NotificationEntry::Slot(slot_info) => write!(f, "Slot({slot_info:?})"),
114            NotificationEntry::SlotUpdate(slot_update) => {
115                write!(f, "SlotUpdate({slot_update:?})")
116            }
117            NotificationEntry::Bank(commitment_slots) => {
118                write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
119            }
120            NotificationEntry::SignaturesReceived(slot_signatures) => {
121                write!(f, "SignaturesReceived({slot_signatures:?})")
122            }
123            NotificationEntry::Gossip(slot) => write!(f, "Gossip({slot:?})"),
124            NotificationEntry::Subscribed(params, id) => {
125                write!(f, "Subscribed({params:?}, {id:?})")
126            }
127            NotificationEntry::Unsubscribed(params, id) => {
128                write!(f, "Unsubscribed({params:?}, {id:?})")
129            }
130        }
131    }
132}
133
134#[allow(clippy::type_complexity)]
135fn check_commitment_and_notify<P, S, B, F, X, I>(
136    params: &P,
137    subscription: &SubscriptionInfo,
138    bank_forks: &Arc<RwLock<BankForks>>,
139    slot: Slot,
140    bank_method: B,
141    filter_results: F,
142    notifier: &RpcNotifier,
143    is_final: bool,
144) -> bool
145where
146    S: Clone + Serialize,
147    B: Fn(&Bank, &P) -> X,
148    F: Fn(X, &P, Slot, Arc<Bank>) -> (I, Slot),
149    X: Clone + Default,
150    I: IntoIterator<Item = S>,
151{
152    let mut notified = false;
153    let bank = bank_forks.read().unwrap().get(slot);
154    if let Some(bank) = bank {
155        let results = bank_method(&bank, params);
156        let mut w_last_notified_slot = subscription.last_notified_slot.write().unwrap();
157        let (filter_results, result_slot) =
158            filter_results(results, params, *w_last_notified_slot, bank);
159        for result in filter_results {
160            notifier.notify(
161                RpcResponse::from(RpcNotificationResponse {
162                    context: RpcNotificationContext { slot },
163                    value: result,
164                }),
165                subscription,
166                is_final,
167            );
168            *w_last_notified_slot = result_slot;
169            notified = true;
170        }
171    }
172
173    notified
174}
175
176#[derive(Debug, Clone)]
177pub struct RpcNotification {
178    pub subscription_id: SubscriptionId,
179    pub is_final: bool,
180    pub json: Weak<String>,
181    pub created_at: Instant,
182}
183
184#[derive(Debug, Clone, PartialEq)]
185struct RpcNotificationResponse<T> {
186    context: RpcNotificationContext,
187    value: T,
188}
189
190impl<T> From<RpcNotificationResponse<T>> for RpcResponse<T> {
191    fn from(notification: RpcNotificationResponse<T>) -> Self {
192        let RpcNotificationResponse {
193            context: RpcNotificationContext { slot },
194            value,
195        } = notification;
196        Self {
197            context: RpcResponseContext {
198                slot,
199                api_version: None,
200            },
201            value,
202        }
203    }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207struct RpcNotificationContext {
208    slot: Slot,
209}
210
211const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000);
212
213struct RecentItems {
214    queue: VecDeque<Arc<String>>,
215    total_bytes: usize,
216    max_len: usize,
217    max_total_bytes: usize,
218    last_metrics_submission: Instant,
219}
220
221impl RecentItems {
222    fn new(max_len: usize, max_total_bytes: usize) -> Self {
223        Self {
224            queue: VecDeque::new(),
225            total_bytes: 0,
226            max_len,
227            max_total_bytes,
228            last_metrics_submission: Instant::now(),
229        }
230    }
231
232    fn push(&mut self, item: Arc<String>) {
233        self.total_bytes = self
234            .total_bytes
235            .checked_add(item.len())
236            .expect("total bytes overflow");
237        self.queue.push_back(item);
238
239        while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
240            let item = self.queue.pop_front().expect("can't be empty");
241            self.total_bytes = self
242                .total_bytes
243                .checked_sub(item.len())
244                .expect("total bytes underflow");
245        }
246
247        let now = Instant::now();
248        let last_metrics_ago = now.duration_since(self.last_metrics_submission);
249        if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
250            datapoint_info!(
251                "rpc_subscriptions_recent_items",
252                ("num", self.queue.len(), i64),
253                ("total_bytes", self.total_bytes, i64),
254            );
255            self.last_metrics_submission = now;
256        } else {
257            trace!(
258                "rpc_subscriptions_recent_items num={} total_bytes={}",
259                self.queue.len(),
260                self.total_bytes,
261            );
262        }
263    }
264}
265
266struct RpcNotifier {
267    sender: broadcast::Sender<RpcNotification>,
268    recent_items: Mutex<RecentItems>,
269}
270
271thread_local! {
272    static RPC_NOTIFIER_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
273}
274
275#[derive(Debug, Serialize)]
276struct NotificationParams<T> {
277    result: T,
278    subscription: SubscriptionId,
279}
280
281#[derive(Debug, Serialize)]
282struct Notification<T> {
283    jsonrpc: Option<jsonrpc_core::Version>,
284    method: &'static str,
285    params: NotificationParams<T>,
286}
287
288impl RpcNotifier {
289    fn notify<T>(&self, value: T, subscription: &SubscriptionInfo, is_final: bool)
290    where
291        T: serde::Serialize,
292    {
293        let buf_arc = RPC_NOTIFIER_BUF.with(|buf| {
294            let mut buf = buf.borrow_mut();
295            buf.clear();
296            let notification = Notification {
297                jsonrpc: Some(jsonrpc_core::Version::V2),
298                method: subscription.method(),
299                params: NotificationParams {
300                    result: value,
301                    subscription: subscription.id(),
302                },
303            };
304            serde_json::to_writer(Cursor::new(&mut *buf), &notification)
305                .expect("serialization never fails");
306            let buf_str = str::from_utf8(&buf).expect("json is always utf-8");
307            Arc::new(String::from(buf_str))
308        });
309
310        let notification = RpcNotification {
311            subscription_id: subscription.id(),
312            json: Arc::downgrade(&buf_arc),
313            is_final,
314            created_at: Instant::now(),
315        };
316        // There is an unlikely case where this can fail: if the last subscription is closed
317        // just as the notifier generates a notification for it.
318        let _ = self.sender.send(notification);
319
320        inc_new_counter_info!("rpc-pubsub-messages", 1);
321        inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
322
323        self.recent_items.lock().unwrap().push(buf_arc);
324    }
325}
326
327fn filter_block_result_txs(
328    mut block: VersionedConfirmedBlock,
329    last_modified_slot: Slot,
330    params: &BlockSubscriptionParams,
331) -> Result<Option<RpcBlockUpdate>, RpcBlockUpdateError> {
332    block.transactions = match params.kind {
333        BlockSubscriptionKind::All => block.transactions,
334        BlockSubscriptionKind::MentionsAccountOrProgram(pk) => block
335            .transactions
336            .into_iter()
337            .filter(|tx| tx.account_keys().iter().any(|key| key == &pk))
338            .collect(),
339    };
340
341    if block.transactions.is_empty() {
342        if let BlockSubscriptionKind::MentionsAccountOrProgram(_) = params.kind {
343            return Ok(None);
344        }
345    }
346
347    let block = ConfirmedBlock::from(block)
348        .encode_with_options(
349            params.encoding,
350            BlockEncodingOptions {
351                transaction_details: params.transaction_details,
352                show_rewards: params.show_rewards,
353                max_supported_transaction_version: params.max_supported_transaction_version,
354            },
355        )
356        .map_err(|err| match err {
357            EncodeError::UnsupportedTransactionVersion(version) => {
358                RpcBlockUpdateError::UnsupportedTransactionVersion(version)
359            }
360        })?;
361
362    // If last_modified_slot < last_notified_slot, then the last notif was for a fork.
363    // That's the risk clients take when subscribing to non-finalized commitments.
364    // This code lets the logic for dealing with forks live on the client side.
365    Ok(Some(RpcBlockUpdate {
366        slot: last_modified_slot,
367        block: Some(block),
368        err: None,
369    }))
370}
371
372fn filter_account_result(
373    result: Option<(AccountSharedData, Slot)>,
374    params: &AccountSubscriptionParams,
375    last_notified_slot: Slot,
376    bank: Arc<Bank>,
377) -> (Option<UiAccount>, Slot) {
378    // If the account is not found, `last_modified_slot` will default to zero and
379    // we will notify clients that the account no longer exists if we haven't already
380    let (account, last_modified_slot) = result.unwrap_or_default();
381
382    // If last_modified_slot < last_notified_slot this means that we last notified for a fork
383    // and should notify that the account state has been reverted.
384    let account = (last_modified_slot != last_notified_slot).then(|| {
385        if is_known_spl_token_id(account.owner())
386            && params.encoding == UiAccountEncoding::JsonParsed
387        {
388            get_parsed_token_account(&bank, &params.pubkey, account, None)
389        } else {
390            encode_ui_account(&params.pubkey, &account, params.encoding, None, None)
391        }
392    });
393    (account, last_modified_slot)
394}
395
396fn filter_signature_result(
397    result: Option<transaction::Result<()>>,
398    _params: &SignatureSubscriptionParams,
399    last_notified_slot: Slot,
400    _bank: Arc<Bank>,
401) -> (Option<RpcSignatureResult>, Slot) {
402    (
403        result.map(|result| {
404            RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: result.err() })
405        }),
406        last_notified_slot,
407    )
408}
409
410fn filter_program_results(
411    accounts: Vec<(Pubkey, AccountSharedData)>,
412    params: &ProgramSubscriptionParams,
413    last_notified_slot: Slot,
414    bank: Arc<Bank>,
415) -> (impl Iterator<Item = RpcKeyedAccount>, Slot) {
416    let accounts_is_empty = accounts.is_empty();
417    let encoding = params.encoding;
418    let filters = params.filters.clone();
419    let keyed_accounts = accounts.into_iter().filter(move |(_, account)| {
420        filters
421            .iter()
422            .all(|filter_type| filter_allows(filter_type, account))
423    });
424    let accounts = if is_known_spl_token_id(&params.pubkey)
425        && params.encoding == UiAccountEncoding::JsonParsed
426        && !accounts_is_empty
427    {
428        let accounts = get_parsed_token_accounts(bank, keyed_accounts);
429        Either::Left(accounts)
430    } else {
431        let accounts = keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount {
432            pubkey: pubkey.to_string(),
433            account: encode_ui_account(&pubkey, &account, encoding, None, None),
434        });
435        Either::Right(accounts)
436    };
437    (accounts, last_notified_slot)
438}
439
440fn filter_logs_results(
441    logs: Option<Vec<TransactionLogInfo>>,
442    _params: &LogsSubscriptionParams,
443    last_notified_slot: Slot,
444    _bank: Arc<Bank>,
445) -> (impl Iterator<Item = RpcLogsResponse>, Slot) {
446    let responses = logs.into_iter().flatten().map(|log| RpcLogsResponse {
447        signature: log.signature.to_string(),
448        err: log.result.err(),
449        logs: log.log_messages,
450    });
451    (responses, last_notified_slot)
452}
453
454fn initial_last_notified_slot(
455    params: &SubscriptionParams,
456    bank_forks: &RwLock<BankForks>,
457    block_commitment_cache: &RwLock<BlockCommitmentCache>,
458    optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
459) -> Option<Slot> {
460    match params {
461        SubscriptionParams::Account(params) => {
462            let slot = if params.commitment.is_finalized() {
463                block_commitment_cache
464                    .read()
465                    .unwrap()
466                    .highest_super_majority_root()
467            } else if params.commitment.is_confirmed() {
468                optimistically_confirmed_bank.read().unwrap().bank.slot()
469            } else {
470                block_commitment_cache.read().unwrap().slot()
471            };
472
473            let bank = bank_forks.read().unwrap().get(slot)?;
474            Some(bank.get_account_modified_slot(&params.pubkey)?.1)
475        }
476        _ => None,
477    }
478}
479
480#[derive(Default)]
481struct PubsubNotificationStats {
482    since: Option<Instant>,
483    notification_entry_processing_count: u64,
484    notification_entry_processing_time_us: u64,
485}
486
487impl PubsubNotificationStats {
488    fn maybe_submit(&mut self) {
489        const SUBMIT_CADENCE: Duration = RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS;
490        let elapsed = self.since.as_ref().map(Instant::elapsed);
491        if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
492            return;
493        }
494        datapoint_info!(
495            "pubsub_notification_entries",
496            (
497                "notification_entry_processing_count",
498                self.notification_entry_processing_count,
499                i64
500            ),
501            (
502                "notification_entry_processing_time_us",
503                self.notification_entry_processing_time_us,
504                i64
505            ),
506        );
507        *self = Self {
508            since: Some(Instant::now()),
509            ..Self::default()
510        };
511    }
512}
513
514pub struct RpcSubscriptions {
515    notification_sender: Option<Sender<TimestampedNotificationEntry>>,
516    t_cleanup: Option<JoinHandle<()>>,
517
518    exit: Arc<AtomicBool>,
519    control: SubscriptionControl,
520}
521
522impl Drop for RpcSubscriptions {
523    fn drop(&mut self) {
524        self.shutdown().unwrap_or_else(|err| {
525            warn!("RPC Notification - shutdown error: {:?}", err);
526        });
527    }
528}
529
530impl RpcSubscriptions {
531    pub fn new(
532        exit: Arc<AtomicBool>,
533        max_complete_transaction_status_slot: Arc<AtomicU64>,
534        max_complete_rewards_slot: Arc<AtomicU64>,
535        blockstore: Arc<Blockstore>,
536        bank_forks: Arc<RwLock<BankForks>>,
537        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
538        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
539    ) -> Self {
540        Self::new_with_config(
541            exit,
542            max_complete_transaction_status_slot,
543            max_complete_rewards_slot,
544            blockstore,
545            bank_forks,
546            block_commitment_cache,
547            optimistically_confirmed_bank,
548            &PubSubConfig::default(),
549            None,
550        )
551    }
552
553    pub fn new_for_tests(
554        exit: Arc<AtomicBool>,
555        max_complete_transaction_status_slot: Arc<AtomicU64>,
556        max_complete_rewards_slot: Arc<AtomicU64>,
557        bank_forks: Arc<RwLock<BankForks>>,
558        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
559        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
560    ) -> Self {
561        let ledger_path = get_tmp_ledger_path!();
562        let blockstore = Blockstore::open(&ledger_path).unwrap();
563        let blockstore = Arc::new(blockstore);
564
565        Self::new_for_tests_with_blockstore(
566            exit,
567            max_complete_transaction_status_slot,
568            max_complete_rewards_slot,
569            blockstore,
570            bank_forks,
571            block_commitment_cache,
572            optimistically_confirmed_bank,
573        )
574    }
575
576    pub fn new_for_tests_with_blockstore(
577        exit: Arc<AtomicBool>,
578        max_complete_transaction_status_slot: Arc<AtomicU64>,
579        max_complete_rewards_slot: Arc<AtomicU64>,
580        blockstore: Arc<Blockstore>,
581        bank_forks: Arc<RwLock<BankForks>>,
582        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
583        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
584    ) -> Self {
585        let rpc_notifier_ready = Arc::new(AtomicBool::new(false));
586
587        let rpc_subscriptions = Self::new_with_config(
588            exit,
589            max_complete_transaction_status_slot,
590            max_complete_rewards_slot,
591            blockstore,
592            bank_forks,
593            block_commitment_cache,
594            optimistically_confirmed_bank,
595            &PubSubConfig::default_for_tests(),
596            Some(rpc_notifier_ready.clone()),
597        );
598
599        // Ensure RPC notifier is ready to receive notifications before proceeding
600        let start_time = Instant::now();
601        loop {
602            if rpc_notifier_ready.load(Ordering::Relaxed) {
603                break;
604            } else if (Instant::now() - start_time).as_millis() > 5000 {
605                panic!("RPC notifier thread setup took too long");
606            }
607        }
608
609        rpc_subscriptions
610    }
611
612    pub fn new_with_config(
613        exit: Arc<AtomicBool>,
614        max_complete_transaction_status_slot: Arc<AtomicU64>,
615        max_complete_rewards_slot: Arc<AtomicU64>,
616        blockstore: Arc<Blockstore>,
617        bank_forks: Arc<RwLock<BankForks>>,
618        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
619        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
620        config: &PubSubConfig,
621        rpc_notifier_ready: Option<Arc<AtomicBool>>,
622    ) -> Self {
623        let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();
624
625        let subscriptions = SubscriptionsTracker::new(bank_forks.clone());
626
627        let (broadcast_sender, _) = broadcast::channel(config.queue_capacity_items);
628
629        let notifier = RpcNotifier {
630            sender: broadcast_sender.clone(),
631            recent_items: Mutex::new(RecentItems::new(
632                config.queue_capacity_items,
633                config.queue_capacity_bytes,
634            )),
635        };
636
637        let t_cleanup = config.notification_threads.map(|notification_threads| {
638            let exit = exit.clone();
639            Builder::new()
640                .name("solRpcNotifier".to_string())
641                .spawn(move || {
642                    let pool = rayon::ThreadPoolBuilder::new()
643                        .num_threads(notification_threads.get())
644                        .thread_name(|i| format!("solRpcNotify{i:02}"))
645                        .build()
646                        .unwrap();
647                    pool.install(|| {
648                        if let Some(rpc_notifier_ready) = rpc_notifier_ready {
649                            rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
650                        }
651                        Self::process_notifications(
652                            exit,
653                            max_complete_transaction_status_slot,
654                            max_complete_rewards_slot,
655                            blockstore,
656                            notifier,
657                            notification_receiver,
658                            subscriptions,
659                            bank_forks,
660                            block_commitment_cache,
661                            optimistically_confirmed_bank,
662                        )
663                    });
664                })
665                .unwrap()
666        });
667
668        let control = SubscriptionControl::new(
669            config.max_active_subscriptions,
670            notification_sender.clone(),
671            broadcast_sender,
672        );
673
674        Self {
675            notification_sender: config.notification_threads.map(|_| notification_sender),
676            t_cleanup,
677            exit,
678            control,
679        }
680    }
681
682    // For tests only...
683    pub fn default_with_bank_forks(
684        max_complete_transaction_status_slot: Arc<AtomicU64>,
685        max_complete_rewards_slot: Arc<AtomicU64>,
686        bank_forks: Arc<RwLock<BankForks>>,
687    ) -> Self {
688        let ledger_path = get_tmp_ledger_path!();
689        let blockstore = Blockstore::open(&ledger_path).unwrap();
690        let blockstore = Arc::new(blockstore);
691        let optimistically_confirmed_bank =
692            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
693        Self::new(
694            Arc::new(AtomicBool::new(false)),
695            max_complete_transaction_status_slot,
696            max_complete_rewards_slot,
697            blockstore,
698            bank_forks,
699            Arc::new(RwLock::new(BlockCommitmentCache::default())),
700            optimistically_confirmed_bank,
701        )
702    }
703
704    pub fn control(&self) -> &SubscriptionControl {
705        &self.control
706    }
707
708    /// Notify subscribers of changes to any accounts or new signatures since
709    /// the bank's last checkpoint.
710    pub fn notify_subscribers(&self, commitment_slots: CommitmentSlots) {
711        self.enqueue_notification(NotificationEntry::Bank(commitment_slots));
712    }
713
714    /// Notify Confirmed commitment-level subscribers of changes to any accounts or new
715    /// signatures.
716    pub fn notify_gossip_subscribers(&self, slot: Slot) {
717        self.enqueue_notification(NotificationEntry::Gossip(slot));
718    }
719
720    pub fn notify_slot_update(&self, slot_update: SlotUpdate) {
721        self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update));
722    }
723
724    pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
725        self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
726        self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank {
727            slot,
728            parent,
729            timestamp: timestamp(),
730        }));
731    }
732
733    pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
734        self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
735    }
736
737    pub fn notify_vote(&self, vote_pubkey: Pubkey, vote: VoteTransaction, signature: Signature) {
738        self.enqueue_notification(NotificationEntry::Vote((vote_pubkey, vote, signature)));
739    }
740
741    pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
742        rooted_slots.sort_unstable();
743        rooted_slots.into_iter().for_each(|root| {
744            self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root {
745                slot: root,
746                timestamp: timestamp(),
747            }));
748            self.enqueue_notification(NotificationEntry::Root(root));
749        });
750    }
751
752    fn enqueue_notification(&self, notification_entry: NotificationEntry) {
753        if let Some(ref notification_sender) = self.notification_sender {
754            match notification_sender.send(notification_entry.into()) {
755                Ok(()) => (),
756                Err(SendError(notification)) => {
757                    warn!(
758                        "Dropped RPC Notification - receiver disconnected : {:?}",
759                        notification
760                    );
761                }
762            }
763        }
764    }
765
766    #[allow(clippy::too_many_arguments)]
767    fn process_notifications(
768        exit: Arc<AtomicBool>,
769        max_complete_transaction_status_slot: Arc<AtomicU64>,
770        max_complete_rewards_slot: Arc<AtomicU64>,
771        blockstore: Arc<Blockstore>,
772        notifier: RpcNotifier,
773        notification_receiver: Receiver<TimestampedNotificationEntry>,
774        mut subscriptions: SubscriptionsTracker,
775        bank_forks: Arc<RwLock<BankForks>>,
776        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
777        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
778    ) {
779        let mut stats = PubsubNotificationStats::default();
780
781        loop {
782            if exit.load(Ordering::Relaxed) {
783                break;
784            }
785            match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
786                Ok(notification_entry) => {
787                    let TimestampedNotificationEntry { entry, queued_at } = notification_entry;
788                    match entry {
789                        NotificationEntry::Subscribed(params, id) => {
790                            subscriptions.subscribe(params.clone(), id, || {
791                                initial_last_notified_slot(
792                                    &params,
793                                    &bank_forks,
794                                    &block_commitment_cache,
795                                    &optimistically_confirmed_bank,
796                                )
797                                .unwrap_or(0)
798                            });
799                        }
800                        NotificationEntry::Unsubscribed(params, id) => {
801                            subscriptions.unsubscribe(params, id);
802                        }
803                        NotificationEntry::Slot(slot_info) => {
804                            if let Some(sub) = subscriptions
805                                .node_progress_watchers()
806                                .get(&SubscriptionParams::Slot)
807                            {
808                                debug!("slot notify: {:?}", slot_info);
809                                inc_new_counter_info!("rpc-subscription-notify-slot", 1);
810                                notifier.notify(slot_info, sub, false);
811                            }
812                        }
813                        NotificationEntry::SlotUpdate(slot_update) => {
814                            if let Some(sub) = subscriptions
815                                .node_progress_watchers()
816                                .get(&SubscriptionParams::SlotsUpdates)
817                            {
818                                inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1);
819                                notifier.notify(slot_update, sub, false);
820                            }
821                        }
822                        // These notifications are only triggered by votes observed on gossip,
823                        // unlike `NotificationEntry::Gossip`, which also accounts for slots seen
824                        // in VoteState's from bank states built in ReplayStage.
825                        NotificationEntry::Vote((vote_pubkey, ref vote_info, signature)) => {
826                            if let Some(sub) = subscriptions
827                                .node_progress_watchers()
828                                .get(&SubscriptionParams::Vote)
829                            {
830                                let rpc_vote = RpcVote {
831                                    vote_pubkey: vote_pubkey.to_string(),
832                                    slots: vote_info.slots(),
833                                    hash: bs58::encode(vote_info.hash()).into_string(),
834                                    timestamp: vote_info.timestamp(),
835                                    signature: signature.to_string(),
836                                };
837                                debug!("vote notify: {:?}", vote_info);
838                                inc_new_counter_info!("rpc-subscription-notify-vote", 1);
839                                notifier.notify(&rpc_vote, sub, false);
840                            }
841                        }
842                        NotificationEntry::Root(root) => {
843                            if let Some(sub) = subscriptions
844                                .node_progress_watchers()
845                                .get(&SubscriptionParams::Root)
846                            {
847                                debug!("root notify: {:?}", root);
848                                inc_new_counter_info!("rpc-subscription-notify-root", 1);
849                                notifier.notify(root, sub, false);
850                            }
851                        }
852                        NotificationEntry::Bank(commitment_slots) => {
853                            const SOURCE: &str = "bank";
854                            RpcSubscriptions::notify_watchers(
855                                max_complete_transaction_status_slot.clone(),
856                                max_complete_rewards_slot.clone(),
857                                subscriptions.commitment_watchers(),
858                                &bank_forks,
859                                &blockstore,
860                                &commitment_slots,
861                                &notifier,
862                                SOURCE,
863                            );
864                        }
865                        NotificationEntry::Gossip(slot) => {
866                            let commitment_slots = CommitmentSlots {
867                                highest_confirmed_slot: slot,
868                                ..CommitmentSlots::default()
869                            };
870                            const SOURCE: &str = "gossip";
871                            RpcSubscriptions::notify_watchers(
872                                max_complete_transaction_status_slot.clone(),
873                                max_complete_rewards_slot.clone(),
874                                subscriptions.gossip_watchers(),
875                                &bank_forks,
876                                &blockstore,
877                                &commitment_slots,
878                                &notifier,
879                                SOURCE,
880                            );
881                        }
882                        NotificationEntry::SignaturesReceived((slot, slot_signatures)) => {
883                            for slot_signature in &slot_signatures {
884                                if let Some(subs) = subscriptions.by_signature().get(slot_signature)
885                                {
886                                    for subscription in subs.values() {
887                                        if let SubscriptionParams::Signature(params) =
888                                            subscription.params()
889                                        {
890                                            if params.enable_received_notification {
891                                                notifier.notify(
892                                                    RpcResponse::from(RpcNotificationResponse {
893                                                        context: RpcNotificationContext { slot },
894                                                        value: RpcSignatureResult::ReceivedSignature(
895                                                            ReceivedSignatureResult::ReceivedSignature,
896                                                        ),
897                                                    }),
898                                                    subscription,
899                                                    false,
900                                                );
901                                            }
902                                        } else {
903                                            error!("invalid params type in visit_by_signature");
904                                        }
905                                    }
906                                }
907                            }
908                        }
909                    }
910                    stats.notification_entry_processing_time_us +=
911                        queued_at.elapsed().as_micros() as u64;
912                    stats.notification_entry_processing_count += 1;
913                }
914                Err(RecvTimeoutError::Timeout) => {
915                    // not a problem - try reading again
916                }
917                Err(RecvTimeoutError::Disconnected) => {
918                    warn!("RPC Notification thread - sender disconnected");
919                    break;
920                }
921            }
922            stats.maybe_submit();
923        }
924    }
925
926    fn notify_watchers(
927        max_complete_transaction_status_slot: Arc<AtomicU64>,
928        max_complete_rewards_slot: Arc<AtomicU64>,
929        subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
930        bank_forks: &Arc<RwLock<BankForks>>,
931        blockstore: &Blockstore,
932        commitment_slots: &CommitmentSlots,
933        notifier: &RpcNotifier,
934        source: &'static str,
935    ) {
936        let mut total_time = Measure::start("notify_watchers");
937
938        let num_accounts_found = AtomicUsize::new(0);
939        let num_accounts_notified = AtomicUsize::new(0);
940
941        let num_blocks_found = AtomicUsize::new(0);
942        let num_blocks_notified = AtomicUsize::new(0);
943
944        let num_logs_found = AtomicUsize::new(0);
945        let num_logs_notified = AtomicUsize::new(0);
946
947        let num_programs_found = AtomicUsize::new(0);
948        let num_programs_notified = AtomicUsize::new(0);
949
950        let num_signatures_found = AtomicUsize::new(0);
951        let num_signatures_notified = AtomicUsize::new(0);
952
953        let subscriptions = subscriptions.into_par_iter();
954        subscriptions.for_each(|(_id, subscription)| {
955            let slot = if let Some(commitment) = subscription.commitment() {
956                if commitment.is_finalized() {
957                    Some(commitment_slots.highest_super_majority_root)
958                } else if commitment.is_confirmed() {
959                    Some(commitment_slots.highest_confirmed_slot)
960                } else {
961                    Some(commitment_slots.slot)
962                }
963            } else {
964                error!("missing commitment in notify_watchers");
965                None
966            };
967            match subscription.params() {
968                SubscriptionParams::Account(params) => {
969                    num_accounts_found.fetch_add(1, Ordering::Relaxed);
970                    if let Some(slot) = slot {
971                        let notified = check_commitment_and_notify(
972                            params,
973                            subscription,
974                            bank_forks,
975                            slot,
976                            |bank, params| bank.get_account_modified_slot(&params.pubkey),
977                            filter_account_result,
978                            notifier,
979                            false,
980                        );
981
982                        if notified {
983                            num_accounts_notified.fetch_add(1, Ordering::Relaxed);
984                        }
985                    }
986                }
987                SubscriptionParams::Block(params) => {
988                    num_blocks_found.fetch_add(1, Ordering::Relaxed);
989                    if let Some(slot) = slot {
990                        let bank = bank_forks.read().unwrap().get(slot);
991                        if let Some(bank) = bank {
992                            // We're calling it unnotified in this context
993                            // because, logically, it gets set to `last_notified_slot + 1`
994                            // on the final iteration of the loop down below.
995                            // This is used to notify blocks for slots that were
996                            // potentially missed due to upstream transient errors
997                            // that led to this notification not being triggered for
998                            // a slot.
999                            //
1000                            // e.g.
1001                            // notify_watchers is triggered for Slot 1
1002                            // some time passes
1003                            // notify_watchers is triggered for Slot 4
1004                            // this will try to fetch blocks for slots 2, 3, and 4
1005                            // as long as they are ancestors of `slot`
1006                            let mut w_last_unnotified_slot =
1007                                subscription.last_notified_slot.write().unwrap();
1008                            // would mean it's the first notification for this subscription connection
1009                            if *w_last_unnotified_slot == 0 {
1010                                *w_last_unnotified_slot = slot;
1011                            }
1012                            let mut slots_to_notify: Vec<_> =
1013                                (*w_last_unnotified_slot..slot).collect();
1014                            let ancestors = bank.proper_ancestors_set();
1015                            slots_to_notify.retain(|slot| ancestors.contains(slot));
1016                            slots_to_notify.push(slot);
1017                            for s in slots_to_notify {
1018                                // To avoid skipping a slot that fails this condition,
1019                                // caused by non-deterministic concurrency accesses, we
1020                                // break out of the loop. Besides if the current `s` is
1021                                // greater, then any `s + K` is also greater.
1022                                if s > max_complete_transaction_status_slot.load(Ordering::SeqCst)
1023                                    || s > max_complete_rewards_slot.load(Ordering::SeqCst)
1024                                {
1025                                    break;
1026                                }
1027
1028                                let block_update_result = blockstore
1029                                    .get_complete_block(s, false)
1030                                    .map_err(|e| {
1031                                        error!("get_complete_block error: {}", e);
1032                                        RpcBlockUpdateError::BlockStoreError
1033                                    })
1034                                    .and_then(|block| filter_block_result_txs(block, s, params));
1035
1036                                match block_update_result {
1037                                    Ok(block_update) => {
1038                                        if let Some(block_update) = block_update {
1039                                            notifier.notify(
1040                                                RpcResponse::from(RpcNotificationResponse {
1041                                                    context: RpcNotificationContext { slot: s },
1042                                                    value: block_update,
1043                                                }),
1044                                                subscription,
1045                                                false,
1046                                            );
1047                                            num_blocks_notified.fetch_add(1, Ordering::Relaxed);
1048                                            // the next time this subscription is notified it will
1049                                            // try to fetch all slots between (s + 1) to `slot`, inclusively
1050                                            *w_last_unnotified_slot = s + 1;
1051                                        }
1052                                    }
1053                                    Err(err) => {
1054                                        // we don't advance `w_last_unnotified_slot` so that
1055                                        // it'll retry on the next notification trigger
1056                                        notifier.notify(
1057                                            RpcResponse::from(RpcNotificationResponse {
1058                                                context: RpcNotificationContext { slot: s },
1059                                                value: RpcBlockUpdate {
1060                                                    slot,
1061                                                    block: None,
1062                                                    err: Some(err),
1063                                                },
1064                                            }),
1065                                            subscription,
1066                                            false,
1067                                        );
1068                                    }
1069                                }
1070                            }
1071                        }
1072                    }
1073                }
1074                SubscriptionParams::Logs(params) => {
1075                    num_logs_found.fetch_add(1, Ordering::Relaxed);
1076                    if let Some(slot) = slot {
1077                        let notified = check_commitment_and_notify(
1078                            params,
1079                            subscription,
1080                            bank_forks,
1081                            slot,
1082                            get_transaction_logs,
1083                            filter_logs_results,
1084                            notifier,
1085                            false,
1086                        );
1087
1088                        if notified {
1089                            num_logs_notified.fetch_add(1, Ordering::Relaxed);
1090                        }
1091                    }
1092                }
1093                SubscriptionParams::Program(params) => {
1094                    num_programs_found.fetch_add(1, Ordering::Relaxed);
1095                    if let Some(slot) = slot {
1096                        let notified = check_commitment_and_notify(
1097                            params,
1098                            subscription,
1099                            bank_forks,
1100                            slot,
1101                            |bank, params| {
1102                                bank.get_program_accounts_modified_since_parent(&params.pubkey)
1103                            },
1104                            filter_program_results,
1105                            notifier,
1106                            false,
1107                        );
1108
1109                        if notified {
1110                            num_programs_notified.fetch_add(1, Ordering::Relaxed);
1111                        }
1112                    }
1113                }
1114                SubscriptionParams::Signature(params) => {
1115                    num_signatures_found.fetch_add(1, Ordering::Relaxed);
1116                    if let Some(slot) = slot {
1117                        let notified = check_commitment_and_notify(
1118                            params,
1119                            subscription,
1120                            bank_forks,
1121                            slot,
1122                            |bank, params| {
1123                                bank.get_signature_status_processed_since_parent(&params.signature)
1124                            },
1125                            filter_signature_result,
1126                            notifier,
1127                            true, // Unsubscribe.
1128                        );
1129
1130                        if notified {
1131                            num_signatures_notified.fetch_add(1, Ordering::Relaxed);
1132                        }
1133                    }
1134                }
1135                _ => error!("wrong subscription type in alps map"),
1136            }
1137        });
1138
1139        total_time.stop();
1140
1141        let total_notified = num_accounts_notified.load(Ordering::Relaxed)
1142            + num_logs_notified.load(Ordering::Relaxed)
1143            + num_programs_notified.load(Ordering::Relaxed)
1144            + num_signatures_notified.load(Ordering::Relaxed);
1145        let total_ms = total_time.as_ms();
1146        if total_notified > 0 || total_ms > 10 {
1147            debug!(
1148                "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / {}",
1149                source,
1150                num_accounts_found.load(Ordering::Relaxed),
1151                num_accounts_notified.load(Ordering::Relaxed),
1152                num_logs_found.load(Ordering::Relaxed),
1153                num_logs_notified.load(Ordering::Relaxed),
1154                num_programs_found.load(Ordering::Relaxed),
1155                num_programs_notified.load(Ordering::Relaxed),
1156                num_signatures_found.load(Ordering::Relaxed),
1157                num_signatures_notified.load(Ordering::Relaxed),
1158            );
1159            datapoint_info!(
1160                "rpc_subscriptions",
1161                ("source", source, String),
1162                (
1163                    "num_account_subscriptions",
1164                    num_accounts_found.load(Ordering::Relaxed),
1165                    i64
1166                ),
1167                (
1168                    "num_account_pubkeys_notified",
1169                    num_accounts_notified.load(Ordering::Relaxed),
1170                    i64
1171                ),
1172                (
1173                    "num_logs_subscriptions",
1174                    num_logs_found.load(Ordering::Relaxed),
1175                    i64
1176                ),
1177                (
1178                    "num_logs_notified",
1179                    num_logs_notified.load(Ordering::Relaxed),
1180                    i64
1181                ),
1182                (
1183                    "num_program_subscriptions",
1184                    num_programs_found.load(Ordering::Relaxed),
1185                    i64
1186                ),
1187                (
1188                    "num_programs_notified",
1189                    num_programs_notified.load(Ordering::Relaxed),
1190                    i64
1191                ),
1192                (
1193                    "num_signature_subscriptions",
1194                    num_signatures_found.load(Ordering::Relaxed),
1195                    i64
1196                ),
1197                (
1198                    "num_signatures_notified",
1199                    num_signatures_notified.load(Ordering::Relaxed),
1200                    i64
1201                ),
1202                ("notifications_time", total_time.as_us() as i64, i64),
1203            );
1204        }
1205    }
1206
1207    fn shutdown(&mut self) -> std::thread::Result<()> {
1208        if self.t_cleanup.is_some() {
1209            info!("RPC Notification thread - shutting down");
1210            self.exit.store(true, Ordering::Relaxed);
1211            let x = self.t_cleanup.take().unwrap().join();
1212            info!("RPC Notification thread - shut down.");
1213            x
1214        } else {
1215            warn!("RPC Notification thread - already shut down.");
1216            Ok(())
1217        }
1218    }
1219
1220    #[cfg(test)]
1221    fn total(&self) -> usize {
1222        self.control.total()
1223    }
1224}
1225
1226#[cfg(test)]
1227pub(crate) mod tests {
1228    use {
1229        super::*,
1230        crate::{
1231            optimistically_confirmed_bank_tracker::{
1232                BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
1233            },
1234            rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
1235            rpc_pubsub::RpcSolPubSubInternal,
1236            rpc_pubsub_service,
1237        },
1238        serial_test::serial,
1239        solana_ledger::get_tmp_ledger_path_auto_delete,
1240        solana_rpc_client_api::config::{
1241            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
1242            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
1243            RpcTransactionLogsFilter,
1244        },
1245        solana_runtime::{
1246            commitment::BlockCommitment,
1247            genesis_utils::{create_genesis_config, GenesisConfigInfo},
1248            prioritization_fee_cache::PrioritizationFeeCache,
1249        },
1250        solana_sdk::{
1251            commitment_config::CommitmentConfig,
1252            message::Message,
1253            signature::{Keypair, Signer},
1254            stake, system_instruction, system_program, system_transaction,
1255            transaction::Transaction,
1256        },
1257        solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
1258        std::{
1259            collections::HashSet,
1260            sync::atomic::{AtomicU64, Ordering::Relaxed},
1261        },
1262    };
1263
1264    struct AccountResult {
1265        lamports: u64,
1266        subscription: u64,
1267        data: &'static str,
1268        space: usize,
1269    }
1270
1271    fn make_account_result(
1272        non_default_account: bool,
1273        account_result: AccountResult,
1274    ) -> serde_json::Value {
1275        json!({
1276           "jsonrpc": "2.0",
1277           "method": "accountNotification",
1278           "params": {
1279               "result": {
1280                   "context": { "slot": 1 },
1281                   "value": {
1282                       "data": account_result.data,
1283                       "executable": false,
1284                       "lamports": account_result.lamports,
1285                       "owner": "11111111111111111111111111111111",
1286                       "rentEpoch": if non_default_account {u64::MAX} else {0},
1287                       "space": account_result.space,
1288                    },
1289               },
1290               "subscription": account_result.subscription,
1291           }
1292        })
1293    }
1294
1295    #[test]
1296    #[serial]
1297    fn test_check_account_subscribe() {
1298        let GenesisConfigInfo {
1299            genesis_config,
1300            mint_keypair,
1301            ..
1302        } = create_genesis_config(100);
1303        let bank = Bank::new_for_tests(&genesis_config);
1304        let blockhash = bank.last_blockhash();
1305        let bank_forks = BankForks::new_rw_arc(bank);
1306        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1307        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1308        bank_forks.write().unwrap().insert(bank1);
1309        let alice = Keypair::new();
1310
1311        let exit = Arc::new(AtomicBool::new(false));
1312        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1313        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1314        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1315            exit,
1316            max_complete_transaction_status_slot,
1317            max_complete_rewards_slot,
1318            bank_forks.clone(),
1319            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1320                1, 1,
1321            ))),
1322            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
1323        ));
1324
1325        let tx0 = system_transaction::create_account(
1326            &mint_keypair,
1327            &alice,
1328            blockhash,
1329            1,
1330            0,
1331            &system_program::id(),
1332        );
1333        let expected0 = make_account_result(
1334            true,
1335            AccountResult {
1336                lamports: 1,
1337                subscription: 0,
1338                space: 0,
1339                data: "",
1340            },
1341        );
1342
1343        let tx1 = {
1344            let instruction =
1345                system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1);
1346            let message = Message::new(&[instruction], Some(&mint_keypair.pubkey()));
1347            Transaction::new(&[&alice, &mint_keypair], message, blockhash)
1348        };
1349        let expected1 = make_account_result(
1350            false,
1351            AccountResult {
1352                lamports: 0,
1353                subscription: 2,
1354                space: 0,
1355                data: "",
1356            },
1357        );
1358
1359        let tx2 = system_transaction::create_account(
1360            &mint_keypair,
1361            &alice,
1362            blockhash,
1363            1,
1364            1024,
1365            &system_program::id(),
1366        );
1367        let expected2 = make_account_result(
1368            true,
1369            AccountResult {
1370                lamports: 1,
1371                subscription: 4,
1372                space: 1024,
1373                data: "error: data too large for bs58 encoding",
1374            },
1375        );
1376
1377        let subscribe_cases = vec![
1378            (alice.pubkey(), tx0, expected0),
1379            (alice.pubkey(), tx1, expected1),
1380            (alice.pubkey(), tx2, expected2),
1381        ];
1382
1383        for (pubkey, tx, expected) in subscribe_cases {
1384            let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1385
1386            let sub_id = rpc
1387                .account_subscribe(
1388                    pubkey.to_string(),
1389                    Some(RpcAccountInfoConfig {
1390                        commitment: Some(CommitmentConfig::processed()),
1391                        encoding: None,
1392                        data_slice: None,
1393                        min_context_slot: None,
1394                    }),
1395                )
1396                .unwrap();
1397
1398            subscriptions
1399                .control
1400                .assert_subscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1401                    pubkey,
1402                    commitment: CommitmentConfig::processed(),
1403                    data_slice: None,
1404                    encoding: UiAccountEncoding::Binary,
1405                }));
1406
1407            rpc.block_until_processed(&subscriptions);
1408
1409            bank_forks
1410                .read()
1411                .unwrap()
1412                .get(1)
1413                .unwrap()
1414                .process_transaction(&tx)
1415                .unwrap();
1416            let commitment_slots = CommitmentSlots {
1417                slot: 1,
1418                ..CommitmentSlots::default()
1419            };
1420            subscriptions.notify_subscribers(commitment_slots);
1421            let response = receiver.recv();
1422
1423            assert_eq!(
1424                expected,
1425                serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1426            );
1427            rpc.account_unsubscribe(sub_id).unwrap();
1428
1429            subscriptions
1430                .control
1431                .assert_unsubscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1432                    pubkey,
1433                    commitment: CommitmentConfig::processed(),
1434                    data_slice: None,
1435                    encoding: UiAccountEncoding::Binary,
1436                }));
1437        }
1438    }
1439
1440    #[test]
1441    #[serial]
1442    fn test_check_confirmed_block_subscribe() {
1443        let exit = Arc::new(AtomicBool::new(false));
1444        let GenesisConfigInfo {
1445            genesis_config,
1446            mint_keypair,
1447            ..
1448        } = create_genesis_config(10_000);
1449        let bank = Bank::new_for_tests(&genesis_config);
1450        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1451        let bank_forks = BankForks::new_rw_arc(bank);
1452        let optimistically_confirmed_bank =
1453            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1454        let ledger_path = get_tmp_ledger_path_auto_delete!();
1455        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1456        let blockstore = Arc::new(blockstore);
1457        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1458        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1459        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1460            exit,
1461            max_complete_transaction_status_slot,
1462            max_complete_rewards_slot,
1463            blockstore.clone(),
1464            bank_forks.clone(),
1465            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1466            optimistically_confirmed_bank,
1467        ));
1468        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1469        let filter = RpcBlockSubscribeFilter::All;
1470        let config = RpcBlockSubscribeConfig {
1471            commitment: Some(CommitmentConfig::confirmed()),
1472            encoding: Some(UiTransactionEncoding::Json),
1473            transaction_details: Some(TransactionDetails::Signatures),
1474            show_rewards: None,
1475            max_supported_transaction_version: None,
1476        };
1477        let params = BlockSubscriptionParams {
1478            kind: BlockSubscriptionKind::All,
1479            commitment: config.commitment.unwrap(),
1480            encoding: config.encoding.unwrap(),
1481            transaction_details: config.transaction_details.unwrap(),
1482            show_rewards: config.show_rewards.unwrap_or_default(),
1483            max_supported_transaction_version: config.max_supported_transaction_version,
1484        };
1485        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1486
1487        subscriptions
1488            .control
1489            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1490
1491        let bank = bank_forks.read().unwrap().working_bank();
1492        let keypair1 = Keypair::new();
1493        let keypair2 = Keypair::new();
1494        let keypair3 = Keypair::new();
1495        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1496        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1497            .unwrap();
1498        populate_blockstore_for_tests(
1499            create_test_transaction_entries(
1500                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1501                bank.clone(),
1502            )
1503            .0,
1504            bank,
1505            blockstore.clone(),
1506            max_complete_transaction_status_slot,
1507        );
1508
1509        let slot = 0;
1510        subscriptions.notify_gossip_subscribers(slot);
1511        let actual_resp = receiver.recv();
1512        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1513
1514        let confirmed_block =
1515            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1516        let block = confirmed_block
1517            .encode_with_options(
1518                params.encoding,
1519                BlockEncodingOptions {
1520                    transaction_details: params.transaction_details,
1521                    show_rewards: false,
1522                    max_supported_transaction_version: None,
1523                },
1524            )
1525            .unwrap();
1526        let expected_resp = RpcBlockUpdate {
1527            slot,
1528            block: Some(block),
1529            err: None,
1530        };
1531        let expected_resp = json!({
1532           "jsonrpc": "2.0",
1533           "method": "blockNotification",
1534           "params": {
1535               "result": {
1536                   "context": { "slot": slot },
1537                   "value": expected_resp,
1538               },
1539               "subscription": 0,
1540           }
1541        });
1542        assert_eq!(expected_resp, actual_resp);
1543
1544        // should not trigger since commitment NOT set to finalized
1545        subscriptions.notify_subscribers(CommitmentSlots {
1546            slot,
1547            root: slot,
1548            highest_confirmed_slot: slot,
1549            highest_super_majority_root: slot,
1550        });
1551        let should_err = receiver.recv_timeout(Duration::from_millis(300));
1552        assert!(should_err.is_err());
1553
1554        rpc.slot_unsubscribe(sub_id).unwrap();
1555        subscriptions
1556            .control
1557            .assert_unsubscribed(&SubscriptionParams::Block(params));
1558    }
1559
1560    #[test]
1561    #[serial]
1562    fn test_check_confirmed_block_subscribe_with_mentions() {
1563        let exit = Arc::new(AtomicBool::new(false));
1564        let GenesisConfigInfo {
1565            genesis_config,
1566            mint_keypair,
1567            ..
1568        } = create_genesis_config(10_000);
1569        let bank = Bank::new_for_tests(&genesis_config);
1570        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1571        let bank_forks = BankForks::new_rw_arc(bank);
1572        let optimistically_confirmed_bank =
1573            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1574        let ledger_path = get_tmp_ledger_path_auto_delete!();
1575        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1576        let blockstore = Arc::new(blockstore);
1577        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1578        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1579        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1580            exit,
1581            max_complete_transaction_status_slot,
1582            max_complete_rewards_slot,
1583            blockstore.clone(),
1584            bank_forks.clone(),
1585            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1586            optimistically_confirmed_bank,
1587        ));
1588        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1589        let keypair1 = Keypair::new();
1590        let filter =
1591            RpcBlockSubscribeFilter::MentionsAccountOrProgram(keypair1.pubkey().to_string());
1592        let config = RpcBlockSubscribeConfig {
1593            commitment: Some(CommitmentConfig::confirmed()),
1594            encoding: Some(UiTransactionEncoding::Json),
1595            transaction_details: Some(TransactionDetails::Signatures),
1596            show_rewards: None,
1597            max_supported_transaction_version: None,
1598        };
1599        let params = BlockSubscriptionParams {
1600            kind: BlockSubscriptionKind::MentionsAccountOrProgram(keypair1.pubkey()),
1601            commitment: config.commitment.unwrap(),
1602            encoding: config.encoding.unwrap(),
1603            transaction_details: config.transaction_details.unwrap(),
1604            show_rewards: config.show_rewards.unwrap_or_default(),
1605            max_supported_transaction_version: config.max_supported_transaction_version,
1606        };
1607        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1608
1609        subscriptions
1610            .control
1611            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1612
1613        let bank = bank_forks.read().unwrap().working_bank();
1614        let keypair2 = Keypair::new();
1615        let keypair3 = Keypair::new();
1616        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1617        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1618            .unwrap();
1619        populate_blockstore_for_tests(
1620            create_test_transaction_entries(
1621                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1622                bank.clone(),
1623            )
1624            .0,
1625            bank,
1626            blockstore.clone(),
1627            max_complete_transaction_status_slot,
1628        );
1629
1630        let slot = 0;
1631        subscriptions.notify_gossip_subscribers(slot);
1632        let actual_resp = receiver.recv();
1633        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1634
1635        // make sure it filtered out the other keypairs
1636        let mut confirmed_block =
1637            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1638        confirmed_block.transactions.retain(|tx_with_meta| {
1639            tx_with_meta
1640                .account_keys()
1641                .iter()
1642                .any(|key| key == &keypair1.pubkey())
1643        });
1644        let block = confirmed_block
1645            .encode_with_options(
1646                params.encoding,
1647                BlockEncodingOptions {
1648                    transaction_details: params.transaction_details,
1649                    show_rewards: false,
1650                    max_supported_transaction_version: None,
1651                },
1652            )
1653            .unwrap();
1654        let expected_resp = RpcBlockUpdate {
1655            slot,
1656            block: Some(block),
1657            err: None,
1658        };
1659        let expected_resp = json!({
1660           "jsonrpc": "2.0",
1661           "method": "blockNotification",
1662           "params": {
1663               "result": {
1664                   "context": { "slot": slot },
1665                   "value": expected_resp,
1666               },
1667               "subscription": 0,
1668           }
1669        });
1670        assert_eq!(expected_resp, actual_resp);
1671
1672        rpc.slot_unsubscribe(sub_id).unwrap();
1673        subscriptions
1674            .control
1675            .assert_unsubscribed(&SubscriptionParams::Block(params));
1676    }
1677
1678    #[test]
1679    #[serial]
1680    fn test_check_finalized_block_subscribe() {
1681        let exit = Arc::new(AtomicBool::new(false));
1682        let GenesisConfigInfo {
1683            genesis_config,
1684            mint_keypair,
1685            ..
1686        } = create_genesis_config(10_000);
1687        let bank = Bank::new_for_tests(&genesis_config);
1688        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1689        let bank_forks = BankForks::new_rw_arc(bank);
1690        let optimistically_confirmed_bank =
1691            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1692        let ledger_path = get_tmp_ledger_path_auto_delete!();
1693        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1694        let blockstore = Arc::new(blockstore);
1695        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1696        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1697        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1698            exit,
1699            max_complete_transaction_status_slot,
1700            max_complete_rewards_slot,
1701            blockstore.clone(),
1702            bank_forks.clone(),
1703            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1704            optimistically_confirmed_bank,
1705        ));
1706        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1707        let filter = RpcBlockSubscribeFilter::All;
1708        let config = RpcBlockSubscribeConfig {
1709            commitment: Some(CommitmentConfig::finalized()),
1710            encoding: Some(UiTransactionEncoding::Json),
1711            transaction_details: Some(TransactionDetails::Signatures),
1712            show_rewards: None,
1713            max_supported_transaction_version: None,
1714        };
1715        let params = BlockSubscriptionParams {
1716            kind: BlockSubscriptionKind::All,
1717            commitment: config.commitment.unwrap(),
1718            encoding: config.encoding.unwrap(),
1719            transaction_details: config.transaction_details.unwrap(),
1720            show_rewards: config.show_rewards.unwrap_or_default(),
1721            max_supported_transaction_version: config.max_supported_transaction_version,
1722        };
1723        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1724        subscriptions
1725            .control
1726            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1727
1728        let bank = bank_forks.read().unwrap().working_bank();
1729        let keypair1 = Keypair::new();
1730        let keypair2 = Keypair::new();
1731        let keypair3 = Keypair::new();
1732        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1733        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1734            .unwrap();
1735        populate_blockstore_for_tests(
1736            create_test_transaction_entries(
1737                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1738                bank.clone(),
1739            )
1740            .0,
1741            bank,
1742            blockstore.clone(),
1743            max_complete_transaction_status_slot,
1744        );
1745
1746        let slot = 0;
1747        subscriptions.notify_subscribers(CommitmentSlots {
1748            slot,
1749            root: slot,
1750            highest_confirmed_slot: slot,
1751            highest_super_majority_root: slot,
1752        });
1753        let actual_resp = receiver.recv();
1754        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1755
1756        let confirmed_block =
1757            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1758        let block = confirmed_block
1759            .encode_with_options(
1760                params.encoding,
1761                BlockEncodingOptions {
1762                    transaction_details: params.transaction_details,
1763                    show_rewards: false,
1764                    max_supported_transaction_version: None,
1765                },
1766            )
1767            .unwrap();
1768        let expected_resp = RpcBlockUpdate {
1769            slot,
1770            block: Some(block),
1771            err: None,
1772        };
1773        let expected_resp = json!({
1774           "jsonrpc": "2.0",
1775           "method": "blockNotification",
1776           "params": {
1777               "result": {
1778                   "context": { "slot": slot },
1779                   "value": expected_resp,
1780               },
1781               "subscription": 0,
1782           }
1783        });
1784        assert_eq!(expected_resp, actual_resp);
1785
1786        // should not trigger since commitment set to finalized
1787        subscriptions.notify_gossip_subscribers(slot);
1788        let should_err = receiver.recv_timeout(Duration::from_millis(300));
1789        assert!(should_err.is_err());
1790
1791        rpc.slot_unsubscribe(sub_id).unwrap();
1792        subscriptions
1793            .control
1794            .assert_unsubscribed(&SubscriptionParams::Block(params));
1795    }
1796
1797    #[test]
1798    #[serial]
1799    fn test_check_program_subscribe() {
1800        let GenesisConfigInfo {
1801            genesis_config,
1802            mint_keypair,
1803            ..
1804        } = create_genesis_config(100);
1805        let bank = Bank::new_for_tests(&genesis_config);
1806        let blockhash = bank.last_blockhash();
1807        let bank_forks = BankForks::new_rw_arc(bank);
1808        let alice = Keypair::new();
1809        let tx = system_transaction::create_account(
1810            &mint_keypair,
1811            &alice,
1812            blockhash,
1813            1,
1814            16,
1815            &stake::program::id(),
1816        );
1817        bank_forks
1818            .read()
1819            .unwrap()
1820            .get(0)
1821            .unwrap()
1822            .process_transaction(&tx)
1823            .unwrap();
1824
1825        let exit = Arc::new(AtomicBool::new(false));
1826        let optimistically_confirmed_bank =
1827            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1828        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1829        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1830        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1831            exit,
1832            max_complete_transaction_status_slot,
1833            max_complete_rewards_slot,
1834            bank_forks,
1835            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1836            optimistically_confirmed_bank,
1837        ));
1838        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1839        let sub_id = rpc
1840            .program_subscribe(
1841                stake::program::id().to_string(),
1842                Some(RpcProgramAccountsConfig {
1843                    account_config: RpcAccountInfoConfig {
1844                        commitment: Some(CommitmentConfig::processed()),
1845                        ..RpcAccountInfoConfig::default()
1846                    },
1847                    ..RpcProgramAccountsConfig::default()
1848                }),
1849            )
1850            .unwrap();
1851
1852        subscriptions
1853            .control
1854            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1855                pubkey: stake::program::id(),
1856                filters: Vec::new(),
1857                commitment: CommitmentConfig::processed(),
1858                data_slice: None,
1859                encoding: UiAccountEncoding::Binary,
1860                with_context: false,
1861            }));
1862
1863        subscriptions.notify_subscribers(CommitmentSlots::default());
1864        let response = receiver.recv();
1865        let expected = json!({
1866           "jsonrpc": "2.0",
1867           "method": "programNotification",
1868           "params": {
1869               "result": {
1870                   "context": { "slot": 0 },
1871                   "value": {
1872                       "account": {
1873                          "data": "1111111111111111",
1874                          "executable": false,
1875                          "lamports": 1,
1876                          "owner": "Stake11111111111111111111111111111111111111",
1877                          "rentEpoch": u64::MAX,
1878                          "space": 16,
1879                       },
1880                       "pubkey": alice.pubkey().to_string(),
1881                    },
1882               },
1883               "subscription": 0,
1884           }
1885        });
1886        assert_eq!(
1887            expected,
1888            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1889        );
1890
1891        rpc.program_unsubscribe(sub_id).unwrap();
1892        subscriptions
1893            .control
1894            .assert_unsubscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1895                pubkey: stake::program::id(),
1896                filters: Vec::new(),
1897                commitment: CommitmentConfig::processed(),
1898                data_slice: None,
1899                encoding: UiAccountEncoding::Binary,
1900                with_context: false,
1901            }));
1902    }
1903
1904    #[test]
1905    #[serial]
1906    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() {
1907        // Testing if we can get the pubsub notification if a slot does not
1908        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
1909        // notification.
1910        let GenesisConfigInfo {
1911            genesis_config,
1912            mint_keypair,
1913            ..
1914        } = create_genesis_config(100);
1915        let bank = Bank::new_for_tests(&genesis_config);
1916        bank.lazy_rent_collection.store(true, Relaxed);
1917
1918        let blockhash = bank.last_blockhash();
1919        let bank_forks = BankForks::new_rw_arc(bank);
1920
1921        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1922        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1923        bank_forks.write().unwrap().insert(bank1);
1924        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
1925
1926        // add account for alice and process the transaction at bank1
1927        let alice = Keypair::new();
1928        let tx = system_transaction::create_account(
1929            &mint_keypair,
1930            &alice,
1931            blockhash,
1932            1,
1933            16,
1934            &stake::program::id(),
1935        );
1936
1937        bank1.process_transaction(&tx).unwrap();
1938
1939        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
1940        bank_forks.write().unwrap().insert(bank2);
1941
1942        // add account for bob and process the transaction at bank2
1943        let bob = Keypair::new();
1944        let tx = system_transaction::create_account(
1945            &mint_keypair,
1946            &bob,
1947            blockhash,
1948            2,
1949            16,
1950            &stake::program::id(),
1951        );
1952        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
1953
1954        bank2.process_transaction(&tx).unwrap();
1955
1956        let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
1957        bank_forks.write().unwrap().insert(bank3);
1958
1959        // add account for joe and process the transaction at bank3
1960        let joe = Keypair::new();
1961        let tx = system_transaction::create_account(
1962            &mint_keypair,
1963            &joe,
1964            blockhash,
1965            3,
1966            16,
1967            &stake::program::id(),
1968        );
1969        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
1970
1971        bank3.process_transaction(&tx).unwrap();
1972
1973        // now add programSubscribe at the "confirmed" commitment level
1974        let exit = Arc::new(AtomicBool::new(false));
1975        let optimistically_confirmed_bank =
1976            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1977        let mut pending_optimistically_confirmed_banks = HashSet::new();
1978        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1979        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
1980        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1981            exit,
1982            max_complete_transaction_status_slot,
1983            max_complete_rewards_slot,
1984            bank_forks.clone(),
1985            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1986                1, 1,
1987            ))),
1988            optimistically_confirmed_bank.clone(),
1989        ));
1990
1991        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1992
1993        let sub_id = rpc
1994            .program_subscribe(
1995                stake::program::id().to_string(),
1996                Some(RpcProgramAccountsConfig {
1997                    account_config: RpcAccountInfoConfig {
1998                        commitment: Some(CommitmentConfig::confirmed()),
1999                        ..RpcAccountInfoConfig::default()
2000                    },
2001                    ..RpcProgramAccountsConfig::default()
2002                }),
2003            )
2004            .unwrap();
2005
2006        subscriptions
2007            .control
2008            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2009                pubkey: stake::program::id(),
2010                filters: Vec::new(),
2011                encoding: UiAccountEncoding::Binary,
2012                data_slice: None,
2013                commitment: CommitmentConfig::confirmed(),
2014                with_context: false,
2015            }));
2016
2017        let mut highest_confirmed_slot: Slot = 0;
2018        let mut highest_root_slot: Slot = 0;
2019        let mut last_notified_confirmed_slot: Slot = 0;
2020        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is unfrozen, we expect
2021        // to see transaction for alice and bob to be notified in order.
2022        OptimisticallyConfirmedBankTracker::process_notification(
2023            BankNotification::OptimisticallyConfirmed(3),
2024            &bank_forks,
2025            &optimistically_confirmed_bank,
2026            &subscriptions,
2027            &mut pending_optimistically_confirmed_banks,
2028            &mut last_notified_confirmed_slot,
2029            &mut highest_confirmed_slot,
2030            &mut highest_root_slot,
2031            &None,
2032            &PrioritizationFeeCache::default(),
2033        );
2034
2035        // a closure to reduce code duplications in building expected responses:
2036        let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2037            json!({
2038               "jsonrpc": "2.0",
2039               "method": "programNotification",
2040               "params": {
2041                   "result": {
2042                       "context": { "slot": slot },
2043                       "value": {
2044                           "account": {
2045                              "data": "1111111111111111",
2046                              "executable": false,
2047                              "lamports": lamports,
2048                              "owner": "Stake11111111111111111111111111111111111111",
2049                              "rentEpoch": u64::MAX,
2050                              "space": 16,
2051                           },
2052                           "pubkey": pubkey,
2053                        },
2054                   },
2055                   "subscription": subscription,
2056               }
2057            })
2058        };
2059
2060        let response = receiver.recv();
2061        let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2062        assert_eq!(
2063            expected,
2064            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2065        );
2066
2067        let response = receiver.recv();
2068        let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2069        assert_eq!(
2070            expected,
2071            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2072        );
2073
2074        bank3.freeze();
2075        OptimisticallyConfirmedBankTracker::process_notification(
2076            BankNotification::Frozen(bank3),
2077            &bank_forks,
2078            &optimistically_confirmed_bank,
2079            &subscriptions,
2080            &mut pending_optimistically_confirmed_banks,
2081            &mut last_notified_confirmed_slot,
2082            &mut highest_confirmed_slot,
2083            &mut highest_root_slot,
2084            &None,
2085            &PrioritizationFeeCache::default(),
2086        );
2087
2088        let response = receiver.recv();
2089        let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2090        assert_eq!(
2091            expected,
2092            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2093        );
2094        rpc.program_unsubscribe(sub_id).unwrap();
2095    }
2096
2097    #[test]
2098    #[serial]
2099    #[should_panic]
2100    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications(
2101    ) {
2102        // Testing if we can get the pubsub notification if a slot does not
2103        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
2104        // notification with a bank in the BankForks. We are not expecting to receive any notifications -- should panic.
2105        let GenesisConfigInfo {
2106            genesis_config,
2107            mint_keypair,
2108            ..
2109        } = create_genesis_config(100);
2110        let bank = Bank::new_for_tests(&genesis_config);
2111        bank.lazy_rent_collection.store(true, Relaxed);
2112
2113        let blockhash = bank.last_blockhash();
2114        let bank_forks = BankForks::new_rw_arc(bank);
2115
2116        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2117        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2118        bank_forks.write().unwrap().insert(bank1);
2119        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2120
2121        // add account for alice and process the transaction at bank1
2122        let alice = Keypair::new();
2123        let tx = system_transaction::create_account(
2124            &mint_keypair,
2125            &alice,
2126            blockhash,
2127            1,
2128            16,
2129            &stake::program::id(),
2130        );
2131
2132        bank1.process_transaction(&tx).unwrap();
2133
2134        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2135        bank_forks.write().unwrap().insert(bank2);
2136
2137        // add account for bob and process the transaction at bank2
2138        let bob = Keypair::new();
2139        let tx = system_transaction::create_account(
2140            &mint_keypair,
2141            &bob,
2142            blockhash,
2143            2,
2144            16,
2145            &stake::program::id(),
2146        );
2147        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2148
2149        bank2.process_transaction(&tx).unwrap();
2150
2151        // now add programSubscribe at the "confirmed" commitment level
2152        let exit = Arc::new(AtomicBool::new(false));
2153        let optimistically_confirmed_bank =
2154            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2155        let mut pending_optimistically_confirmed_banks = HashSet::new();
2156        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2157        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2158        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2159            exit,
2160            max_complete_transaction_status_slot,
2161            max_complete_rewards_slot,
2162            bank_forks.clone(),
2163            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2164                1, 1,
2165            ))),
2166            optimistically_confirmed_bank.clone(),
2167        ));
2168        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2169        rpc.program_subscribe(
2170            stake::program::id().to_string(),
2171            Some(RpcProgramAccountsConfig {
2172                account_config: RpcAccountInfoConfig {
2173                    commitment: Some(CommitmentConfig::confirmed()),
2174                    ..RpcAccountInfoConfig::default()
2175                },
2176                ..RpcProgramAccountsConfig::default()
2177            }),
2178        )
2179        .unwrap();
2180
2181        subscriptions
2182            .control
2183            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2184                pubkey: stake::program::id(),
2185                filters: Vec::new(),
2186                encoding: UiAccountEncoding::Binary,
2187                data_slice: None,
2188                commitment: CommitmentConfig::confirmed(),
2189                with_context: false,
2190            }));
2191
2192        let mut highest_confirmed_slot: Slot = 0;
2193        let mut highest_root_slot: Slot = 0;
2194        let mut last_notified_confirmed_slot: Slot = 0;
2195        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we do not
2196        // expect to see any RPC notifications.
2197        OptimisticallyConfirmedBankTracker::process_notification(
2198            BankNotification::OptimisticallyConfirmed(3),
2199            &bank_forks,
2200            &optimistically_confirmed_bank,
2201            &subscriptions,
2202            &mut pending_optimistically_confirmed_banks,
2203            &mut last_notified_confirmed_slot,
2204            &mut highest_confirmed_slot,
2205            &mut highest_root_slot,
2206            &None,
2207            &PrioritizationFeeCache::default(),
2208        );
2209
2210        // The following should panic
2211        let _response = receiver.recv();
2212    }
2213
2214    #[test]
2215    #[serial]
2216    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() {
2217        // Testing if we can get the pubsub notification if a slot does not
2218        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
2219        // notification. It differs from the test_check_program_subscribe_for_missing_optimistically_confirmed_slot
2220        // test in that when the descendant get confirmed, the descendant does not have a bank yet.
2221        let GenesisConfigInfo {
2222            genesis_config,
2223            mint_keypair,
2224            ..
2225        } = create_genesis_config(100);
2226        let bank = Bank::new_for_tests(&genesis_config);
2227        bank.lazy_rent_collection.store(true, Relaxed);
2228
2229        let blockhash = bank.last_blockhash();
2230        let bank_forks = BankForks::new_rw_arc(bank);
2231
2232        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2233        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2234        bank_forks.write().unwrap().insert(bank1);
2235        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2236
2237        // add account for alice and process the transaction at bank1
2238        let alice = Keypair::new();
2239        let tx = system_transaction::create_account(
2240            &mint_keypair,
2241            &alice,
2242            blockhash,
2243            1,
2244            16,
2245            &stake::program::id(),
2246        );
2247
2248        bank1.process_transaction(&tx).unwrap();
2249
2250        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2251        bank_forks.write().unwrap().insert(bank2);
2252
2253        // add account for bob and process the transaction at bank2
2254        let bob = Keypair::new();
2255        let tx = system_transaction::create_account(
2256            &mint_keypair,
2257            &bob,
2258            blockhash,
2259            2,
2260            16,
2261            &stake::program::id(),
2262        );
2263        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2264
2265        bank2.process_transaction(&tx).unwrap();
2266
2267        // now add programSubscribe at the "confirmed" commitment level
2268        let exit = Arc::new(AtomicBool::new(false));
2269        let optimistically_confirmed_bank =
2270            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2271        let mut pending_optimistically_confirmed_banks = HashSet::new();
2272        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2273        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2274        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2275            exit,
2276            max_complete_transaction_status_slot,
2277            max_complete_rewards_slot,
2278            bank_forks.clone(),
2279            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2280                1, 1,
2281            ))),
2282            optimistically_confirmed_bank.clone(),
2283        ));
2284        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2285        let sub_id = rpc
2286            .program_subscribe(
2287                stake::program::id().to_string(),
2288                Some(RpcProgramAccountsConfig {
2289                    account_config: RpcAccountInfoConfig {
2290                        commitment: Some(CommitmentConfig::confirmed()),
2291                        ..RpcAccountInfoConfig::default()
2292                    },
2293                    ..RpcProgramAccountsConfig::default()
2294                }),
2295            )
2296            .unwrap();
2297
2298        subscriptions
2299            .control
2300            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2301                pubkey: stake::program::id(),
2302                filters: Vec::new(),
2303                encoding: UiAccountEncoding::Binary,
2304                data_slice: None,
2305                commitment: CommitmentConfig::confirmed(),
2306                with_context: false,
2307            }));
2308
2309        let mut highest_confirmed_slot: Slot = 0;
2310        let mut highest_root_slot: Slot = 0;
2311        let mut last_notified_confirmed_slot: Slot = 0;
2312        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we expect
2313        // to see transaction for alice and bob to be notified only when bank3 is added to the fork and
2314        // frozen. The notifications should be in the increasing order of the slot.
2315        OptimisticallyConfirmedBankTracker::process_notification(
2316            BankNotification::OptimisticallyConfirmed(3),
2317            &bank_forks,
2318            &optimistically_confirmed_bank,
2319            &subscriptions,
2320            &mut pending_optimistically_confirmed_banks,
2321            &mut last_notified_confirmed_slot,
2322            &mut highest_confirmed_slot,
2323            &mut highest_root_slot,
2324            &None,
2325            &PrioritizationFeeCache::default(),
2326        );
2327
2328        // a closure to reduce code duplications in building expected responses:
2329        let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2330            json!({
2331               "jsonrpc": "2.0",
2332               "method": "programNotification",
2333               "params": {
2334                   "result": {
2335                       "context": { "slot": slot },
2336                       "value": {
2337                           "account": {
2338                              "data": "1111111111111111",
2339                              "executable": false,
2340                              "lamports": lamports,
2341                              "owner": "Stake11111111111111111111111111111111111111",
2342                              "rentEpoch": u64::MAX,
2343                              "space": 16,
2344                           },
2345                           "pubkey": pubkey,
2346                        },
2347                   },
2348                   "subscription": subscription,
2349               }
2350            })
2351        };
2352
2353        let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
2354        bank_forks.write().unwrap().insert(bank3);
2355
2356        // add account for joe and process the transaction at bank3
2357        let joe = Keypair::new();
2358        let tx = system_transaction::create_account(
2359            &mint_keypair,
2360            &joe,
2361            blockhash,
2362            3,
2363            16,
2364            &stake::program::id(),
2365        );
2366        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
2367
2368        bank3.process_transaction(&tx).unwrap();
2369        bank3.freeze();
2370        OptimisticallyConfirmedBankTracker::process_notification(
2371            BankNotification::Frozen(bank3),
2372            &bank_forks,
2373            &optimistically_confirmed_bank,
2374            &subscriptions,
2375            &mut pending_optimistically_confirmed_banks,
2376            &mut last_notified_confirmed_slot,
2377            &mut highest_confirmed_slot,
2378            &mut highest_root_slot,
2379            &None,
2380            &PrioritizationFeeCache::default(),
2381        );
2382
2383        let response = receiver.recv();
2384        let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2385        assert_eq!(
2386            expected,
2387            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2388        );
2389
2390        let response = receiver.recv();
2391        let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2392        assert_eq!(
2393            expected,
2394            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2395        );
2396
2397        let response = receiver.recv();
2398        let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2399        assert_eq!(
2400            expected,
2401            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2402        );
2403        rpc.program_unsubscribe(sub_id).unwrap();
2404    }
2405
2406    #[test]
2407    #[serial]
2408    fn test_check_signature_subscribe() {
2409        let GenesisConfigInfo {
2410            genesis_config,
2411            mint_keypair,
2412            ..
2413        } = create_genesis_config(100);
2414        let bank = Bank::new_for_tests(&genesis_config);
2415        let blockhash = bank.last_blockhash();
2416        let bank_forks = BankForks::new_rw_arc(bank);
2417        let alice = Keypair::new();
2418
2419        let past_bank_tx =
2420            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
2421        let unprocessed_tx =
2422            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
2423        let processed_tx =
2424            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
2425
2426        bank_forks
2427            .read()
2428            .unwrap()
2429            .get(0)
2430            .unwrap()
2431            .process_transaction(&past_bank_tx)
2432            .unwrap();
2433
2434        let next_bank = Bank::new_from_parent(
2435            bank_forks.read().unwrap().get(0).unwrap(),
2436            &solana_pubkey::new_rand(),
2437            1,
2438        );
2439        bank_forks.write().unwrap().insert(next_bank);
2440
2441        bank_forks
2442            .read()
2443            .unwrap()
2444            .get(1)
2445            .unwrap()
2446            .process_transaction(&processed_tx)
2447            .unwrap();
2448        let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
2449
2450        let mut cache0 = BlockCommitment::default();
2451        cache0.increase_confirmation_stake(1, 10);
2452        let cache1 = BlockCommitment::default();
2453
2454        let mut block_commitment = HashMap::new();
2455        block_commitment.entry(0).or_insert(cache0);
2456        block_commitment.entry(1).or_insert(cache1);
2457        let block_commitment_cache = BlockCommitmentCache::new(
2458            block_commitment,
2459            10,
2460            CommitmentSlots {
2461                slot: bank1.slot(),
2462                ..CommitmentSlots::default()
2463            },
2464        );
2465
2466        let exit = Arc::new(AtomicBool::new(false));
2467        let optimistically_confirmed_bank =
2468            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2469        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2470        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2471        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2472            exit,
2473            max_complete_transaction_status_slot,
2474            max_complete_rewards_slot,
2475            bank_forks,
2476            Arc::new(RwLock::new(block_commitment_cache)),
2477            optimistically_confirmed_bank,
2478        ));
2479
2480        let (past_bank_rpc1, mut past_bank_receiver1) =
2481            rpc_pubsub_service::test_connection(&subscriptions);
2482        let (past_bank_rpc2, mut past_bank_receiver2) =
2483            rpc_pubsub_service::test_connection(&subscriptions);
2484        let (processed_rpc, mut processed_receiver) =
2485            rpc_pubsub_service::test_connection(&subscriptions);
2486        let (another_rpc, _another_receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2487        let (processed_rpc3, mut processed_receiver3) =
2488            rpc_pubsub_service::test_connection(&subscriptions);
2489
2490        let past_bank_sub_id1 = past_bank_rpc1
2491            .signature_subscribe(
2492                past_bank_tx.signatures[0].to_string(),
2493                Some(RpcSignatureSubscribeConfig {
2494                    commitment: Some(CommitmentConfig::processed()),
2495                    enable_received_notification: Some(false),
2496                }),
2497            )
2498            .unwrap();
2499        let past_bank_sub_id2 = past_bank_rpc2
2500            .signature_subscribe(
2501                past_bank_tx.signatures[0].to_string(),
2502                Some(RpcSignatureSubscribeConfig {
2503                    commitment: Some(CommitmentConfig::finalized()),
2504                    enable_received_notification: Some(false),
2505                }),
2506            )
2507            .unwrap();
2508        let processed_sub_id = processed_rpc
2509            .signature_subscribe(
2510                processed_tx.signatures[0].to_string(),
2511                Some(RpcSignatureSubscribeConfig {
2512                    commitment: Some(CommitmentConfig::processed()),
2513                    enable_received_notification: Some(false),
2514                }),
2515            )
2516            .unwrap();
2517        another_rpc
2518            .signature_subscribe(
2519                unprocessed_tx.signatures[0].to_string(),
2520                Some(RpcSignatureSubscribeConfig {
2521                    commitment: Some(CommitmentConfig::processed()),
2522                    enable_received_notification: Some(false),
2523                }),
2524            )
2525            .unwrap();
2526
2527        // Add a subscription that gets `received` notifications
2528        let processed_sub_id3 = processed_rpc3
2529            .signature_subscribe(
2530                unprocessed_tx.signatures[0].to_string(),
2531                Some(RpcSignatureSubscribeConfig {
2532                    commitment: Some(CommitmentConfig::processed()),
2533                    enable_received_notification: Some(true),
2534                }),
2535            )
2536            .unwrap();
2537
2538        assert!(subscriptions
2539            .control
2540            .signature_subscribed(&unprocessed_tx.signatures[0]));
2541        assert!(subscriptions
2542            .control
2543            .signature_subscribed(&processed_tx.signatures[0]));
2544
2545        let mut commitment_slots = CommitmentSlots::default();
2546        let received_slot = 1;
2547        commitment_slots.slot = received_slot;
2548        subscriptions
2549            .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
2550        subscriptions.notify_subscribers(commitment_slots);
2551        let expected_res =
2552            RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
2553        let received_expected_res =
2554            RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
2555        struct Notification {
2556            slot: Slot,
2557            id: u64,
2558        }
2559
2560        let expected_notification =
2561            |exp: Notification, expected_res: &RpcSignatureResult| -> String {
2562                let json = json!({
2563                    "jsonrpc": "2.0",
2564                    "method": "signatureNotification",
2565                    "params": {
2566                        "result": {
2567                            "context": { "slot": exp.slot },
2568                            "value": expected_res,
2569                        },
2570                        "subscription": exp.id,
2571                    }
2572                });
2573                serde_json::to_string(&json).unwrap()
2574            };
2575
2576        // Expect to receive a notification from bank 1 because this subscription is
2577        // looking for 0 confirmations and so checks the current bank
2578        let expected = expected_notification(
2579            Notification {
2580                slot: 1,
2581                id: past_bank_sub_id1.into(),
2582            },
2583            &expected_res,
2584        );
2585        let response = past_bank_receiver1.recv();
2586        assert_eq!(expected, response);
2587
2588        // Expect to receive a notification from bank 0 because this subscription is
2589        // looking for 1 confirmation and so checks the past bank
2590        let expected = expected_notification(
2591            Notification {
2592                slot: 0,
2593                id: past_bank_sub_id2.into(),
2594            },
2595            &expected_res,
2596        );
2597        let response = past_bank_receiver2.recv();
2598        assert_eq!(expected, response);
2599
2600        let expected = expected_notification(
2601            Notification {
2602                slot: 1,
2603                id: processed_sub_id.into(),
2604            },
2605            &expected_res,
2606        );
2607        let response = processed_receiver.recv();
2608        assert_eq!(expected, response);
2609
2610        // Expect a "received" notification
2611        let expected = expected_notification(
2612            Notification {
2613                slot: received_slot,
2614                id: processed_sub_id3.into(),
2615            },
2616            &received_expected_res,
2617        );
2618        let response = processed_receiver3.recv();
2619        assert_eq!(expected, response);
2620
2621        // Subscription should be automatically removed after notification
2622
2623        assert!(!subscriptions
2624            .control
2625            .signature_subscribed(&processed_tx.signatures[0]));
2626        assert!(!subscriptions
2627            .control
2628            .signature_subscribed(&past_bank_tx.signatures[0]));
2629
2630        // Unprocessed signature subscription should not be removed
2631        assert!(subscriptions
2632            .control
2633            .signature_subscribed(&unprocessed_tx.signatures[0]));
2634    }
2635
2636    #[test]
2637    #[serial]
2638    fn test_check_slot_subscribe() {
2639        let exit = Arc::new(AtomicBool::new(false));
2640        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2641        let bank = Bank::new_for_tests(&genesis_config);
2642        let bank_forks = BankForks::new_rw_arc(bank);
2643        let optimistically_confirmed_bank =
2644            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2645        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2646        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2647        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2648            exit,
2649            max_complete_transaction_status_slot,
2650            max_complete_rewards_slot,
2651            bank_forks,
2652            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2653            optimistically_confirmed_bank,
2654        ));
2655        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2656        let sub_id = rpc.slot_subscribe().unwrap();
2657
2658        subscriptions
2659            .control
2660            .assert_subscribed(&SubscriptionParams::Slot);
2661
2662        subscriptions.notify_slot(0, 0, 0);
2663        let response = receiver.recv();
2664
2665        let expected_res = SlotInfo {
2666            parent: 0,
2667            slot: 0,
2668            root: 0,
2669        };
2670        let expected_res_str = serde_json::to_string(&expected_res).unwrap();
2671
2672        let expected = format!(
2673            r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2674        );
2675        assert_eq!(expected, response);
2676
2677        rpc.slot_unsubscribe(sub_id).unwrap();
2678        subscriptions
2679            .control
2680            .assert_unsubscribed(&SubscriptionParams::Slot);
2681    }
2682
2683    #[test]
2684    #[serial]
2685    fn test_check_root_subscribe() {
2686        let exit = Arc::new(AtomicBool::new(false));
2687        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2688        let bank = Bank::new_for_tests(&genesis_config);
2689        let bank_forks = BankForks::new_rw_arc(bank);
2690        let optimistically_confirmed_bank =
2691            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2692        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2693        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2694        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2695            exit,
2696            max_complete_transaction_status_slot,
2697            max_complete_rewards_slot,
2698            bank_forks,
2699            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2700            optimistically_confirmed_bank,
2701        ));
2702        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2703        let sub_id = rpc.root_subscribe().unwrap();
2704
2705        subscriptions
2706            .control
2707            .assert_subscribed(&SubscriptionParams::Root);
2708
2709        subscriptions.notify_roots(vec![2, 1, 3]);
2710
2711        for expected_root in 1..=3 {
2712            let response = receiver.recv();
2713
2714            let expected_res_str =
2715                serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap();
2716            let expected = format!(
2717                r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2718            );
2719            assert_eq!(expected, response);
2720        }
2721
2722        rpc.root_unsubscribe(sub_id).unwrap();
2723        subscriptions
2724            .control
2725            .assert_unsubscribed(&SubscriptionParams::Root);
2726    }
2727
2728    #[test]
2729    #[serial]
2730    fn test_gossip_separate_account_notifications() {
2731        let GenesisConfigInfo {
2732            genesis_config,
2733            mint_keypair,
2734            ..
2735        } = create_genesis_config(100);
2736        let bank = Bank::new_for_tests(&genesis_config);
2737        let blockhash = bank.last_blockhash();
2738        let bank_forks = BankForks::new_rw_arc(bank);
2739        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2740        let bank1 = Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 1);
2741        bank_forks.write().unwrap().insert(bank1);
2742        let bank2 = Bank::new_from_parent(bank0, &Pubkey::default(), 2);
2743        bank_forks.write().unwrap().insert(bank2);
2744
2745        // we need a pubkey that will pass its rent collection slot so rent_epoch gets updated to max since this account is exempt
2746        let alice = Keypair::from_base58_string("sfLnS4rZ5a8gXke3aGxCgM6usFAVPxLUaBSRdssGY9uS5eoiEWQ41CqDcpXbcekpKsie8Lyy3LNFdhEvjUE1wd9");
2747
2748        let optimistically_confirmed_bank =
2749            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2750        let mut pending_optimistically_confirmed_banks = HashSet::new();
2751
2752        let exit = Arc::new(AtomicBool::new(false));
2753        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2754        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2755        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2756            exit,
2757            max_complete_transaction_status_slot,
2758            max_complete_rewards_slot,
2759            bank_forks.clone(),
2760            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2761                1, 1,
2762            ))),
2763            optimistically_confirmed_bank.clone(),
2764        ));
2765        let (rpc0, mut receiver0) = rpc_pubsub_service::test_connection(&subscriptions);
2766        let (rpc1, mut receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
2767        let sub_id0 = rpc0
2768            .account_subscribe(
2769                alice.pubkey().to_string(),
2770                Some(RpcAccountInfoConfig {
2771                    commitment: Some(CommitmentConfig::confirmed()),
2772                    encoding: None,
2773                    data_slice: None,
2774                    min_context_slot: None,
2775                }),
2776            )
2777            .unwrap();
2778
2779        assert!(subscriptions.control.account_subscribed(&alice.pubkey()));
2780        rpc0.block_until_processed(&subscriptions);
2781
2782        let tx = system_transaction::create_account(
2783            &mint_keypair,
2784            &alice,
2785            blockhash,
2786            1,
2787            16,
2788            &stake::program::id(),
2789        );
2790
2791        // Add the transaction to the 1st bank and then freeze the bank
2792        let bank1 = bank_forks.write().unwrap().get(1).unwrap();
2793        bank1.process_transaction(&tx).unwrap();
2794        bank1.freeze();
2795
2796        // Add the same transaction to the unfrozen 2nd bank
2797        bank_forks
2798            .read()
2799            .unwrap()
2800            .get(2)
2801            .unwrap()
2802            .process_transaction(&tx)
2803            .unwrap();
2804
2805        // First, notify the unfrozen bank first to queue pending notification
2806        let mut highest_confirmed_slot: Slot = 0;
2807        let mut highest_root_slot: Slot = 0;
2808        let mut last_notified_confirmed_slot: Slot = 0;
2809        OptimisticallyConfirmedBankTracker::process_notification(
2810            BankNotification::OptimisticallyConfirmed(2),
2811            &bank_forks,
2812            &optimistically_confirmed_bank,
2813            &subscriptions,
2814            &mut pending_optimistically_confirmed_banks,
2815            &mut last_notified_confirmed_slot,
2816            &mut highest_confirmed_slot,
2817            &mut highest_root_slot,
2818            &None,
2819            &PrioritizationFeeCache::default(),
2820        );
2821
2822        // Now, notify the frozen bank and ensure its notifications are processed
2823        highest_confirmed_slot = 0;
2824        OptimisticallyConfirmedBankTracker::process_notification(
2825            BankNotification::OptimisticallyConfirmed(1),
2826            &bank_forks,
2827            &optimistically_confirmed_bank,
2828            &subscriptions,
2829            &mut pending_optimistically_confirmed_banks,
2830            &mut last_notified_confirmed_slot,
2831            &mut highest_confirmed_slot,
2832            &mut highest_root_slot,
2833            &None,
2834            &PrioritizationFeeCache::default(),
2835        );
2836
2837        let response = receiver0.recv();
2838        let expected = json!({
2839           "jsonrpc": "2.0",
2840           "method": "accountNotification",
2841           "params": {
2842               "result": {
2843                   "context": { "slot": 1 },
2844                   "value": {
2845                       "data": "1111111111111111",
2846                       "executable": false,
2847                       "lamports": 1,
2848                       "owner": "Stake11111111111111111111111111111111111111",
2849                       "rentEpoch": u64::MAX,
2850                       "space": 16,
2851                    },
2852               },
2853               "subscription": 0,
2854           }
2855        });
2856        assert_eq!(
2857            expected,
2858            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2859        );
2860        rpc0.account_unsubscribe(sub_id0).unwrap();
2861        rpc0.block_until_processed(&subscriptions);
2862
2863        let sub_id1 = rpc1
2864            .account_subscribe(
2865                alice.pubkey().to_string(),
2866                Some(RpcAccountInfoConfig {
2867                    commitment: Some(CommitmentConfig::confirmed()),
2868                    encoding: None,
2869                    data_slice: None,
2870                    min_context_slot: None,
2871                }),
2872            )
2873            .unwrap();
2874        rpc1.block_until_processed(&subscriptions);
2875
2876        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2877        bank2.freeze();
2878        highest_confirmed_slot = 0;
2879        OptimisticallyConfirmedBankTracker::process_notification(
2880            BankNotification::Frozen(bank2),
2881            &bank_forks,
2882            &optimistically_confirmed_bank,
2883            &subscriptions,
2884            &mut pending_optimistically_confirmed_banks,
2885            &mut last_notified_confirmed_slot,
2886            &mut highest_confirmed_slot,
2887            &mut highest_root_slot,
2888            &None,
2889            &PrioritizationFeeCache::default(),
2890        );
2891        let response = receiver1.recv();
2892        let expected = json!({
2893           "jsonrpc": "2.0",
2894           "method": "accountNotification",
2895           "params": {
2896               "result": {
2897                   "context": { "slot": 2 },
2898                   "value": {
2899                       "data": "1111111111111111",
2900                       "executable": false,
2901                       "lamports": 1,
2902                       "owner": "Stake11111111111111111111111111111111111111",
2903                       "rentEpoch": u64::MAX,
2904                       "space": 16,
2905                    },
2906               },
2907               "subscription": 3,
2908           }
2909        });
2910        assert_eq!(
2911            expected,
2912            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2913        );
2914        rpc1.account_unsubscribe(sub_id1).unwrap();
2915
2916        assert!(!subscriptions.control.account_subscribed(&alice.pubkey()));
2917    }
2918
2919    fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value {
2920        json!({
2921            "jsonrpc": "2.0",
2922            "method": "logsNotification",
2923            "params": {
2924                "result": {
2925                    "context": {
2926                        "slot": 0
2927                    },
2928                    "value": {
2929                        "signature": signature,
2930                        "err": null,
2931                        "logs": [
2932                            "Program 11111111111111111111111111111111 invoke [1]",
2933                            "Program 11111111111111111111111111111111 success"
2934                        ]
2935                    }
2936                },
2937                "subscription": subscription_id
2938            }
2939        })
2940    }
2941
2942    #[test]
2943    #[serial]
2944    fn test_logs_subscribe() {
2945        let GenesisConfigInfo {
2946            genesis_config,
2947            mint_keypair,
2948            ..
2949        } = create_genesis_config(100);
2950        let bank = Bank::new_for_tests(&genesis_config);
2951        let blockhash = bank.last_blockhash();
2952        let bank_forks = BankForks::new_rw_arc(bank);
2953
2954        let alice = Keypair::new();
2955
2956        let exit = Arc::new(AtomicBool::new(false));
2957        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2958        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
2959        let optimistically_confirmed_bank =
2960            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2961        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2962            exit,
2963            max_complete_transaction_status_slot,
2964            max_complete_rewards_slot,
2965            bank_forks.clone(),
2966            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2967            optimistically_confirmed_bank,
2968        ));
2969
2970        let sub_config = RpcTransactionLogsConfig {
2971            commitment: Some(CommitmentConfig::processed()),
2972        };
2973
2974        let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions);
2975        let sub_id_for_all = rpc_all
2976            .logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone()))
2977            .unwrap();
2978        assert!(subscriptions.control.logs_subscribed(None));
2979
2980        let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions);
2981        let sub_id_for_alice = rpc_alice
2982            .logs_subscribe(
2983                RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]),
2984                Some(sub_config),
2985            )
2986            .unwrap();
2987        assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
2988        rpc_alice.block_until_processed(&subscriptions);
2989
2990        let tx = system_transaction::create_account(
2991            &mint_keypair,
2992            &alice,
2993            blockhash,
2994            1,
2995            0,
2996            &system_program::id(),
2997        );
2998
2999        assert!(bank_forks
3000            .read()
3001            .unwrap()
3002            .get(0)
3003            .unwrap()
3004            .process_transaction_with_metadata(tx.clone())
3005            .is_ok());
3006
3007        subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0));
3008
3009        let expected_response_all =
3010            make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all));
3011        let response_all = receiver_all.recv();
3012        assert_eq!(
3013            expected_response_all,
3014            serde_json::from_str::<serde_json::Value>(&response_all).unwrap(),
3015        );
3016        let expected_response_alice =
3017            make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice));
3018        let response_alice = receiver_alice.recv();
3019        assert_eq!(
3020            expected_response_alice,
3021            serde_json::from_str::<serde_json::Value>(&response_alice).unwrap(),
3022        );
3023
3024        rpc_all.logs_unsubscribe(sub_id_for_all).unwrap();
3025        assert!(!subscriptions.control.logs_subscribed(None));
3026        rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap();
3027        assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
3028    }
3029
3030    #[test]
3031    fn test_total_subscriptions() {
3032        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
3033        let bank = Bank::new_for_tests(&genesis_config);
3034        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
3035        let max_complete_rewards_slot = Arc::new(AtomicU64::default());
3036        let bank_forks = BankForks::new_rw_arc(bank);
3037        let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
3038            max_complete_transaction_status_slot,
3039            max_complete_rewards_slot,
3040            bank_forks,
3041        ));
3042
3043        let (rpc1, _receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
3044        let sub_id1 = rpc1
3045            .account_subscribe(Pubkey::default().to_string(), None)
3046            .unwrap();
3047
3048        assert_eq!(subscriptions.total(), 1);
3049
3050        let (rpc2, _receiver2) = rpc_pubsub_service::test_connection(&subscriptions);
3051        let sub_id2 = rpc2
3052            .program_subscribe(Pubkey::default().to_string(), None)
3053            .unwrap();
3054
3055        assert_eq!(subscriptions.total(), 2);
3056
3057        let (rpc3, _receiver3) = rpc_pubsub_service::test_connection(&subscriptions);
3058        let sub_id3 = rpc3
3059            .logs_subscribe(RpcTransactionLogsFilter::All, None)
3060            .unwrap();
3061        assert_eq!(subscriptions.total(), 3);
3062
3063        let (rpc4, _receiver4) = rpc_pubsub_service::test_connection(&subscriptions);
3064        let sub_id4 = rpc4
3065            .signature_subscribe(Signature::default().to_string(), None)
3066            .unwrap();
3067
3068        assert_eq!(subscriptions.total(), 4);
3069
3070        let (rpc5, _receiver5) = rpc_pubsub_service::test_connection(&subscriptions);
3071        let sub_id5 = rpc5.slot_subscribe().unwrap();
3072
3073        assert_eq!(subscriptions.total(), 5);
3074
3075        let (rpc6, _receiver6) = rpc_pubsub_service::test_connection(&subscriptions);
3076        let sub_id6 = rpc6.vote_subscribe().unwrap();
3077
3078        assert_eq!(subscriptions.total(), 6);
3079
3080        let (rpc7, _receiver7) = rpc_pubsub_service::test_connection(&subscriptions);
3081        let sub_id7 = rpc7.root_subscribe().unwrap();
3082
3083        assert_eq!(subscriptions.total(), 7);
3084
3085        // Add duplicate account subscription, but it shouldn't increment the count.
3086        let (rpc8, _receiver8) = rpc_pubsub_service::test_connection(&subscriptions);
3087        let sub_id8 = rpc8
3088            .account_subscribe(Pubkey::default().to_string(), None)
3089            .unwrap();
3090        assert_eq!(subscriptions.total(), 7);
3091
3092        rpc1.account_unsubscribe(sub_id1).unwrap();
3093        assert_eq!(subscriptions.total(), 7);
3094
3095        rpc8.account_unsubscribe(sub_id8).unwrap();
3096        assert_eq!(subscriptions.total(), 6);
3097
3098        rpc2.program_unsubscribe(sub_id2).unwrap();
3099        assert_eq!(subscriptions.total(), 5);
3100
3101        rpc3.logs_unsubscribe(sub_id3).unwrap();
3102        assert_eq!(subscriptions.total(), 4);
3103
3104        rpc4.signature_unsubscribe(sub_id4).unwrap();
3105        assert_eq!(subscriptions.total(), 3);
3106
3107        rpc5.slot_unsubscribe(sub_id5).unwrap();
3108        assert_eq!(subscriptions.total(), 2);
3109
3110        rpc6.vote_unsubscribe(sub_id6).unwrap();
3111        assert_eq!(subscriptions.total(), 1);
3112
3113        rpc7.root_unsubscribe(sub_id7).unwrap();
3114        assert_eq!(subscriptions.total(), 0);
3115    }
3116}