solana_rpc/
rpc_subscriptions.rs

1//! The `pubsub` module implements a threaded subscription service on client RPC request
2
3use {
4    crate::{
5        filter::filter_allows,
6        optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
7        parsed_token_accounts::{get_parsed_token_account, get_parsed_token_accounts},
8        rpc_pubsub_service::PubSubConfig,
9        rpc_subscription_tracker::{
10            AccountSubscriptionParams, BlockSubscriptionKind, BlockSubscriptionParams,
11            LogsSubscriptionKind, LogsSubscriptionParams, ProgramSubscriptionParams,
12            SignatureSubscriptionParams, SubscriptionControl, SubscriptionId, SubscriptionInfo,
13            SubscriptionParams, SubscriptionsTracker,
14        },
15    },
16    crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
17    itertools::Either,
18    rayon::prelude::*,
19    serde::Serialize,
20    solana_account::{AccountSharedData, ReadableAccount},
21    solana_account_decoder::{
22        encode_ui_account, parse_token::is_known_spl_token_id, UiAccount, UiAccountEncoding,
23    },
24    solana_clock::Slot,
25    solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
26    solana_measure::measure::Measure,
27    solana_pubkey::Pubkey,
28    solana_rpc_client_api::response::{
29        ProcessedSignatureResult, ReceivedSignatureResult, Response as RpcResponse, RpcBlockUpdate,
30        RpcBlockUpdateError, RpcKeyedAccount, RpcLogsResponse, RpcResponseContext,
31        RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
32    },
33    solana_runtime::{
34        bank::{Bank, TransactionLogInfo},
35        bank_forks::BankForks,
36        commitment::{BlockCommitmentCache, CommitmentSlots},
37    },
38    solana_signature::Signature,
39    solana_time_utils::timestamp,
40    solana_transaction_status::{
41        BlockEncodingOptions, ConfirmedBlock, EncodeError, VersionedConfirmedBlock,
42    },
43    solana_vote::vote_transaction::VoteTransaction,
44    std::{
45        cell::RefCell,
46        collections::{HashMap, VecDeque},
47        io::Cursor,
48        str,
49        sync::{
50            atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
51            Arc, Mutex, RwLock, Weak,
52        },
53        thread::{Builder, JoinHandle},
54        time::{Duration, Instant},
55    },
56    tokio::sync::broadcast,
57};
58
59mod transaction {
60    pub use solana_transaction_error::TransactionResult as Result;
61}
62
63const RECEIVE_DELAY_MILLIS: u64 = 100;
64
65fn get_transaction_logs(
66    bank: &Bank,
67    params: &LogsSubscriptionParams,
68) -> Option<Vec<TransactionLogInfo>> {
69    let pubkey = match &params.kind {
70        LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
71        LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
72    };
73    let mut logs = bank.get_transaction_logs(pubkey);
74    if matches!(params.kind, LogsSubscriptionKind::All) {
75        // Filter out votes if the subscriber doesn't want them
76        if let Some(logs) = &mut logs {
77            logs.retain(|log| !log.is_vote);
78        }
79    }
80    logs
81}
82#[derive(Debug)]
83pub struct TimestampedNotificationEntry {
84    pub entry: NotificationEntry,
85    pub queued_at: Instant,
86}
87
88impl From<NotificationEntry> for TimestampedNotificationEntry {
89    fn from(entry: NotificationEntry) -> Self {
90        TimestampedNotificationEntry {
91            entry,
92            queued_at: Instant::now(),
93        }
94    }
95}
96
97pub enum NotificationEntry {
98    Slot(SlotInfo),
99    SlotUpdate(SlotUpdate),
100    Vote((Pubkey, VoteTransaction, Signature)),
101    Root(Slot),
102    Bank(CommitmentSlots),
103    Gossip(Slot),
104    SignaturesReceived((Slot, Vec<Signature>)),
105    Subscribed(SubscriptionParams, SubscriptionId),
106    Unsubscribed(SubscriptionParams, SubscriptionId),
107}
108
109impl std::fmt::Debug for NotificationEntry {
110    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
111        match self {
112            NotificationEntry::Root(root) => write!(f, "Root({root})"),
113            NotificationEntry::Vote(vote) => write!(f, "Vote({vote:?})"),
114            NotificationEntry::Slot(slot_info) => write!(f, "Slot({slot_info:?})"),
115            NotificationEntry::SlotUpdate(slot_update) => {
116                write!(f, "SlotUpdate({slot_update:?})")
117            }
118            NotificationEntry::Bank(commitment_slots) => {
119                write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
120            }
121            NotificationEntry::SignaturesReceived(slot_signatures) => {
122                write!(f, "SignaturesReceived({slot_signatures:?})")
123            }
124            NotificationEntry::Gossip(slot) => write!(f, "Gossip({slot:?})"),
125            NotificationEntry::Subscribed(params, id) => {
126                write!(f, "Subscribed({params:?}, {id:?})")
127            }
128            NotificationEntry::Unsubscribed(params, id) => {
129                write!(f, "Unsubscribed({params:?}, {id:?})")
130            }
131        }
132    }
133}
134
135#[allow(clippy::type_complexity)]
136fn check_commitment_and_notify<P, S, B, F, X, I>(
137    params: &P,
138    subscription: &SubscriptionInfo,
139    bank_forks: &Arc<RwLock<BankForks>>,
140    slot: Slot,
141    bank_method: B,
142    filter_results: F,
143    notifier: &RpcNotifier,
144    is_final: bool,
145) -> bool
146where
147    S: Clone + Serialize,
148    B: Fn(&Bank, &P) -> X,
149    F: Fn(X, &P, Slot, Arc<Bank>) -> (I, Slot),
150    X: Clone + Default,
151    I: IntoIterator<Item = S>,
152{
153    let mut notified = false;
154    let bank = bank_forks.read().unwrap().get(slot);
155    if let Some(bank) = bank {
156        let results = bank_method(&bank, params);
157        let mut w_last_notified_slot = subscription.last_notified_slot.write().unwrap();
158        let (filter_results, result_slot) =
159            filter_results(results, params, *w_last_notified_slot, bank);
160        for result in filter_results {
161            notifier.notify(
162                RpcResponse::from(RpcNotificationResponse {
163                    context: RpcNotificationContext { slot },
164                    value: result,
165                }),
166                subscription,
167                is_final,
168            );
169            *w_last_notified_slot = result_slot;
170            notified = true;
171        }
172    }
173
174    notified
175}
176
177#[derive(Debug, Clone)]
178pub struct RpcNotification {
179    pub subscription_id: SubscriptionId,
180    pub is_final: bool,
181    pub json: Weak<String>,
182    pub created_at: Instant,
183}
184
185#[derive(Debug, Clone, PartialEq)]
186struct RpcNotificationResponse<T> {
187    context: RpcNotificationContext,
188    value: T,
189}
190
191impl<T> From<RpcNotificationResponse<T>> for RpcResponse<T> {
192    fn from(notification: RpcNotificationResponse<T>) -> Self {
193        let RpcNotificationResponse {
194            context: RpcNotificationContext { slot },
195            value,
196        } = notification;
197        Self {
198            context: RpcResponseContext {
199                slot,
200                api_version: None,
201            },
202            value,
203        }
204    }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208struct RpcNotificationContext {
209    slot: Slot,
210}
211
212const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000);
213
214struct RecentItems {
215    queue: VecDeque<Arc<String>>,
216    total_bytes: usize,
217    max_len: usize,
218    max_total_bytes: usize,
219    last_metrics_submission: Instant,
220}
221
222impl RecentItems {
223    fn new(max_len: usize, max_total_bytes: usize) -> Self {
224        Self {
225            queue: VecDeque::new(),
226            total_bytes: 0,
227            max_len,
228            max_total_bytes,
229            last_metrics_submission: Instant::now(),
230        }
231    }
232
233    fn push(&mut self, item: Arc<String>) {
234        self.total_bytes = self
235            .total_bytes
236            .checked_add(item.len())
237            .expect("total bytes overflow");
238        self.queue.push_back(item);
239
240        while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len {
241            let item = self.queue.pop_front().expect("can't be empty");
242            self.total_bytes = self
243                .total_bytes
244                .checked_sub(item.len())
245                .expect("total bytes underflow");
246        }
247
248        let now = Instant::now();
249        let last_metrics_ago = now.duration_since(self.last_metrics_submission);
250        if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS {
251            datapoint_info!(
252                "rpc_subscriptions_recent_items",
253                ("num", self.queue.len(), i64),
254                ("total_bytes", self.total_bytes, i64),
255            );
256            self.last_metrics_submission = now;
257        } else {
258            trace!(
259                "rpc_subscriptions_recent_items num={} total_bytes={}",
260                self.queue.len(),
261                self.total_bytes,
262            );
263        }
264    }
265}
266
267struct RpcNotifier {
268    sender: broadcast::Sender<RpcNotification>,
269    recent_items: Mutex<RecentItems>,
270}
271
272thread_local! {
273    static RPC_NOTIFIER_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
274}
275
276#[derive(Debug, Serialize)]
277struct NotificationParams<T> {
278    result: T,
279    subscription: SubscriptionId,
280}
281
282#[derive(Debug, Serialize)]
283struct Notification<T> {
284    jsonrpc: Option<jsonrpc_core::Version>,
285    method: &'static str,
286    params: NotificationParams<T>,
287}
288
289impl RpcNotifier {
290    fn notify<T>(&self, value: T, subscription: &SubscriptionInfo, is_final: bool)
291    where
292        T: serde::Serialize,
293    {
294        let buf_arc = RPC_NOTIFIER_BUF.with(|buf| {
295            let mut buf = buf.borrow_mut();
296            buf.clear();
297            let notification = Notification {
298                jsonrpc: Some(jsonrpc_core::Version::V2),
299                method: subscription.method(),
300                params: NotificationParams {
301                    result: value,
302                    subscription: subscription.id(),
303                },
304            };
305            serde_json::to_writer(Cursor::new(&mut *buf), &notification)
306                .expect("serialization never fails");
307            let buf_str = str::from_utf8(&buf).expect("json is always utf-8");
308            Arc::new(String::from(buf_str))
309        });
310
311        let notification = RpcNotification {
312            subscription_id: subscription.id(),
313            json: Arc::downgrade(&buf_arc),
314            is_final,
315            created_at: Instant::now(),
316        };
317        // There is an unlikely case where this can fail: if the last subscription is closed
318        // just as the notifier generates a notification for it.
319        let _ = self.sender.send(notification);
320
321        inc_new_counter_info!("rpc-pubsub-messages", 1);
322        inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len());
323
324        self.recent_items.lock().unwrap().push(buf_arc);
325    }
326}
327
328fn filter_block_result_txs(
329    mut block: VersionedConfirmedBlock,
330    last_modified_slot: Slot,
331    params: &BlockSubscriptionParams,
332) -> Result<Option<RpcBlockUpdate>, RpcBlockUpdateError> {
333    block.transactions = match params.kind {
334        BlockSubscriptionKind::All => block.transactions,
335        BlockSubscriptionKind::MentionsAccountOrProgram(pk) => block
336            .transactions
337            .into_iter()
338            .filter(|tx| tx.account_keys().iter().any(|key| key == &pk))
339            .collect(),
340    };
341
342    if block.transactions.is_empty() {
343        if let BlockSubscriptionKind::MentionsAccountOrProgram(_) = params.kind {
344            return Ok(None);
345        }
346    }
347
348    let block = ConfirmedBlock::from(block)
349        .encode_with_options(
350            params.encoding,
351            BlockEncodingOptions {
352                transaction_details: params.transaction_details,
353                show_rewards: params.show_rewards,
354                max_supported_transaction_version: params.max_supported_transaction_version,
355            },
356        )
357        .map_err(|err| match err {
358            EncodeError::UnsupportedTransactionVersion(version) => {
359                RpcBlockUpdateError::UnsupportedTransactionVersion(version)
360            }
361        })?;
362
363    // If last_modified_slot < last_notified_slot, then the last notif was for a fork.
364    // That's the risk clients take when subscribing to non-finalized commitments.
365    // This code lets the logic for dealing with forks live on the client side.
366    Ok(Some(RpcBlockUpdate {
367        slot: last_modified_slot,
368        block: Some(block),
369        err: None,
370    }))
371}
372
373fn filter_account_result(
374    result: Option<(AccountSharedData, Slot)>,
375    params: &AccountSubscriptionParams,
376    last_notified_slot: Slot,
377    bank: Arc<Bank>,
378) -> (Option<UiAccount>, Slot) {
379    // If the account is not found, `last_modified_slot` will default to zero and
380    // we will notify clients that the account no longer exists if we haven't already
381    let (account, last_modified_slot) = result.unwrap_or_default();
382
383    // If last_modified_slot < last_notified_slot this means that we last notified for a fork
384    // and should notify that the account state has been reverted.
385    let account = (last_modified_slot != last_notified_slot).then(|| {
386        if is_known_spl_token_id(account.owner())
387            && params.encoding == UiAccountEncoding::JsonParsed
388        {
389            get_parsed_token_account(&bank, &params.pubkey, account, None)
390        } else {
391            encode_ui_account(&params.pubkey, &account, params.encoding, None, None)
392        }
393    });
394    (account, last_modified_slot)
395}
396
397fn filter_signature_result(
398    result: Option<transaction::Result<()>>,
399    _params: &SignatureSubscriptionParams,
400    last_notified_slot: Slot,
401    _bank: Arc<Bank>,
402) -> (Option<RpcSignatureResult>, Slot) {
403    (
404        result.map(|result| {
405            RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult {
406                err: result.err().map(Into::into),
407            })
408        }),
409        last_notified_slot,
410    )
411}
412
413fn filter_program_results(
414    accounts: Vec<(Pubkey, AccountSharedData)>,
415    params: &ProgramSubscriptionParams,
416    last_notified_slot: Slot,
417    bank: Arc<Bank>,
418) -> (impl Iterator<Item = RpcKeyedAccount>, Slot) {
419    let accounts_is_empty = accounts.is_empty();
420    let encoding = params.encoding;
421    let filters = params.filters.clone();
422    let keyed_accounts = accounts.into_iter().filter(move |(_, account)| {
423        filters
424            .iter()
425            .all(|filter_type| filter_allows(filter_type, account))
426    });
427    let accounts = if is_known_spl_token_id(&params.pubkey)
428        && params.encoding == UiAccountEncoding::JsonParsed
429        && !accounts_is_empty
430    {
431        let accounts = get_parsed_token_accounts(bank, keyed_accounts);
432        Either::Left(accounts)
433    } else {
434        let accounts = keyed_accounts.map(move |(pubkey, account)| RpcKeyedAccount {
435            pubkey: pubkey.to_string(),
436            account: encode_ui_account(&pubkey, &account, encoding, None, None),
437        });
438        Either::Right(accounts)
439    };
440    (accounts, last_notified_slot)
441}
442
443fn filter_logs_results(
444    logs: Option<Vec<TransactionLogInfo>>,
445    _params: &LogsSubscriptionParams,
446    last_notified_slot: Slot,
447    _bank: Arc<Bank>,
448) -> (impl Iterator<Item = RpcLogsResponse>, Slot) {
449    let responses = logs.into_iter().flatten().map(|log| RpcLogsResponse {
450        signature: log.signature.to_string(),
451        err: log.result.err().map(Into::into),
452        logs: log.log_messages,
453    });
454    (responses, last_notified_slot)
455}
456
457fn initial_last_notified_slot(
458    params: &SubscriptionParams,
459    bank_forks: &RwLock<BankForks>,
460    block_commitment_cache: &RwLock<BlockCommitmentCache>,
461    optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
462) -> Option<Slot> {
463    match params {
464        SubscriptionParams::Account(params) => {
465            let slot = if params.commitment.is_finalized() {
466                block_commitment_cache
467                    .read()
468                    .unwrap()
469                    .highest_super_majority_root()
470            } else if params.commitment.is_confirmed() {
471                optimistically_confirmed_bank.read().unwrap().bank.slot()
472            } else {
473                block_commitment_cache.read().unwrap().slot()
474            };
475
476            let bank = bank_forks.read().unwrap().get(slot)?;
477            Some(bank.get_account_modified_slot(&params.pubkey)?.1)
478        }
479        _ => None,
480    }
481}
482
483#[derive(Default)]
484struct PubsubNotificationStats {
485    since: Option<Instant>,
486    notification_entry_processing_count: u64,
487    notification_entry_processing_time_us: u64,
488}
489
490impl PubsubNotificationStats {
491    fn maybe_submit(&mut self) {
492        const SUBMIT_CADENCE: Duration = RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS;
493        let elapsed = self.since.as_ref().map(Instant::elapsed);
494        if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
495            return;
496        }
497        datapoint_info!(
498            "pubsub_notification_entries",
499            (
500                "notification_entry_processing_count",
501                self.notification_entry_processing_count,
502                i64
503            ),
504            (
505                "notification_entry_processing_time_us",
506                self.notification_entry_processing_time_us,
507                i64
508            ),
509        );
510        *self = Self {
511            since: Some(Instant::now()),
512            ..Self::default()
513        };
514    }
515}
516
517pub struct RpcSubscriptions {
518    notification_sender: Option<Sender<TimestampedNotificationEntry>>,
519    t_cleanup: Option<JoinHandle<()>>,
520
521    exit: Arc<AtomicBool>,
522    control: SubscriptionControl,
523}
524
525impl Drop for RpcSubscriptions {
526    fn drop(&mut self) {
527        self.shutdown().unwrap_or_else(|err| {
528            warn!("RPC Notification - shutdown error: {err:?}");
529        });
530    }
531}
532
533impl RpcSubscriptions {
534    pub fn new(
535        exit: Arc<AtomicBool>,
536        max_complete_transaction_status_slot: Arc<AtomicU64>,
537        blockstore: Arc<Blockstore>,
538        bank_forks: Arc<RwLock<BankForks>>,
539        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
540        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
541    ) -> Self {
542        Self::new_with_config(
543            exit,
544            max_complete_transaction_status_slot,
545            blockstore,
546            bank_forks,
547            block_commitment_cache,
548            optimistically_confirmed_bank,
549            &PubSubConfig::default(),
550            None,
551        )
552    }
553
554    pub fn new_for_tests(
555        exit: Arc<AtomicBool>,
556        max_complete_transaction_status_slot: Arc<AtomicU64>,
557        bank_forks: Arc<RwLock<BankForks>>,
558        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
559        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
560    ) -> Self {
561        let ledger_path = get_tmp_ledger_path!();
562        let blockstore = Blockstore::open(&ledger_path).unwrap();
563        let blockstore = Arc::new(blockstore);
564
565        Self::new_for_tests_with_blockstore(
566            exit,
567            max_complete_transaction_status_slot,
568            blockstore,
569            bank_forks,
570            block_commitment_cache,
571            optimistically_confirmed_bank,
572        )
573    }
574
575    pub fn new_for_tests_with_blockstore(
576        exit: Arc<AtomicBool>,
577        max_complete_transaction_status_slot: Arc<AtomicU64>,
578        blockstore: Arc<Blockstore>,
579        bank_forks: Arc<RwLock<BankForks>>,
580        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
581        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
582    ) -> Self {
583        let rpc_notifier_ready = Arc::new(AtomicBool::new(false));
584
585        let rpc_subscriptions = Self::new_with_config(
586            exit,
587            max_complete_transaction_status_slot,
588            blockstore,
589            bank_forks,
590            block_commitment_cache,
591            optimistically_confirmed_bank,
592            &PubSubConfig::default_for_tests(),
593            Some(rpc_notifier_ready.clone()),
594        );
595
596        // Ensure RPC notifier is ready to receive notifications before proceeding
597        let start_time = Instant::now();
598        loop {
599            if rpc_notifier_ready.load(Ordering::Relaxed) {
600                break;
601            } else if (Instant::now() - start_time).as_millis() > 5000 {
602                panic!("RPC notifier thread setup took too long");
603            }
604        }
605
606        rpc_subscriptions
607    }
608
609    pub fn new_with_config(
610        exit: Arc<AtomicBool>,
611        max_complete_transaction_status_slot: Arc<AtomicU64>,
612        blockstore: Arc<Blockstore>,
613        bank_forks: Arc<RwLock<BankForks>>,
614        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
615        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
616        config: &PubSubConfig,
617        rpc_notifier_ready: Option<Arc<AtomicBool>>,
618    ) -> Self {
619        let (notification_sender, notification_receiver) = crossbeam_channel::unbounded();
620
621        let subscriptions = SubscriptionsTracker::new(bank_forks.clone());
622
623        let (broadcast_sender, _) = broadcast::channel(config.queue_capacity_items);
624
625        let notifier = RpcNotifier {
626            sender: broadcast_sender.clone(),
627            recent_items: Mutex::new(RecentItems::new(
628                config.queue_capacity_items,
629                config.queue_capacity_bytes,
630            )),
631        };
632
633        let t_cleanup = config.notification_threads.map(|notification_threads| {
634            let exit = exit.clone();
635            Builder::new()
636                .name("solRpcNotifier".to_string())
637                .spawn(move || {
638                    let pool = rayon::ThreadPoolBuilder::new()
639                        .num_threads(notification_threads.get())
640                        .thread_name(|i| format!("solRpcNotify{i:02}"))
641                        .build()
642                        .unwrap();
643                    pool.install(|| {
644                        if let Some(rpc_notifier_ready) = rpc_notifier_ready {
645                            rpc_notifier_ready.fetch_or(true, Ordering::Relaxed);
646                        }
647                        Self::process_notifications(
648                            exit,
649                            max_complete_transaction_status_slot,
650                            blockstore,
651                            notifier,
652                            notification_receiver,
653                            subscriptions,
654                            bank_forks,
655                            block_commitment_cache,
656                            optimistically_confirmed_bank,
657                        )
658                    });
659                })
660                .unwrap()
661        });
662
663        let control = SubscriptionControl::new(
664            config.max_active_subscriptions,
665            notification_sender.clone(),
666            broadcast_sender,
667        );
668
669        Self {
670            notification_sender: config.notification_threads.map(|_| notification_sender),
671            t_cleanup,
672            exit,
673            control,
674        }
675    }
676
677    // For tests only...
678    pub fn default_with_bank_forks(
679        max_complete_transaction_status_slot: Arc<AtomicU64>,
680        bank_forks: Arc<RwLock<BankForks>>,
681    ) -> Self {
682        let ledger_path = get_tmp_ledger_path!();
683        let blockstore = Blockstore::open(&ledger_path).unwrap();
684        let blockstore = Arc::new(blockstore);
685        let optimistically_confirmed_bank =
686            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
687        Self::new(
688            Arc::new(AtomicBool::new(false)),
689            max_complete_transaction_status_slot,
690            blockstore,
691            bank_forks,
692            Arc::new(RwLock::new(BlockCommitmentCache::default())),
693            optimistically_confirmed_bank,
694        )
695    }
696
697    pub fn control(&self) -> &SubscriptionControl {
698        &self.control
699    }
700
701    /// Notify subscribers of changes to any accounts or new signatures since
702    /// the bank's last checkpoint.
703    pub fn notify_subscribers(&self, commitment_slots: CommitmentSlots) {
704        self.enqueue_notification(NotificationEntry::Bank(commitment_slots));
705    }
706
707    /// Notify Confirmed commitment-level subscribers of changes to any accounts or new
708    /// signatures.
709    pub fn notify_gossip_subscribers(&self, slot: Slot) {
710        self.enqueue_notification(NotificationEntry::Gossip(slot));
711    }
712
713    pub fn notify_slot_update(&self, slot_update: SlotUpdate) {
714        self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update));
715    }
716
717    pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
718        self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
719        self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank {
720            slot,
721            parent,
722            timestamp: timestamp(),
723        }));
724    }
725
726    pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
727        self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
728    }
729
730    pub fn notify_vote(&self, vote_pubkey: Pubkey, vote: VoteTransaction, signature: Signature) {
731        self.enqueue_notification(NotificationEntry::Vote((vote_pubkey, vote, signature)));
732    }
733
734    pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
735        rooted_slots.sort_unstable();
736        rooted_slots.into_iter().for_each(|root| {
737            self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root {
738                slot: root,
739                timestamp: timestamp(),
740            }));
741            self.enqueue_notification(NotificationEntry::Root(root));
742        });
743    }
744
745    fn enqueue_notification(&self, notification_entry: NotificationEntry) {
746        if let Some(ref notification_sender) = self.notification_sender {
747            match notification_sender.send(notification_entry.into()) {
748                Ok(()) => (),
749                Err(SendError(notification)) => {
750                    warn!("Dropped RPC Notification - receiver disconnected : {notification:?}");
751                }
752            }
753        }
754    }
755
756    #[allow(clippy::too_many_arguments)]
757    fn process_notifications(
758        exit: Arc<AtomicBool>,
759        max_complete_transaction_status_slot: Arc<AtomicU64>,
760        blockstore: Arc<Blockstore>,
761        notifier: RpcNotifier,
762        notification_receiver: Receiver<TimestampedNotificationEntry>,
763        mut subscriptions: SubscriptionsTracker,
764        bank_forks: Arc<RwLock<BankForks>>,
765        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
766        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
767    ) {
768        let mut stats = PubsubNotificationStats::default();
769
770        loop {
771            if exit.load(Ordering::Relaxed) {
772                break;
773            }
774            match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
775                Ok(notification_entry) => {
776                    let TimestampedNotificationEntry { entry, queued_at } = notification_entry;
777                    match entry {
778                        NotificationEntry::Subscribed(params, id) => {
779                            subscriptions.subscribe(params.clone(), id, || {
780                                initial_last_notified_slot(
781                                    &params,
782                                    &bank_forks,
783                                    &block_commitment_cache,
784                                    &optimistically_confirmed_bank,
785                                )
786                                .unwrap_or(0)
787                            });
788                        }
789                        NotificationEntry::Unsubscribed(params, id) => {
790                            subscriptions.unsubscribe(params, id);
791                        }
792                        NotificationEntry::Slot(slot_info) => {
793                            if let Some(sub) = subscriptions
794                                .node_progress_watchers()
795                                .get(&SubscriptionParams::Slot)
796                            {
797                                debug!("slot notify: {slot_info:?}");
798                                inc_new_counter_info!("rpc-subscription-notify-slot", 1);
799                                notifier.notify(slot_info, sub, false);
800                            }
801                        }
802                        NotificationEntry::SlotUpdate(slot_update) => {
803                            if let Some(sub) = subscriptions
804                                .node_progress_watchers()
805                                .get(&SubscriptionParams::SlotsUpdates)
806                            {
807                                inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1);
808                                notifier.notify(slot_update, sub, false);
809                            }
810                        }
811                        // These notifications are only triggered by votes observed on gossip,
812                        // unlike `NotificationEntry::Gossip`, which also accounts for slots seen
813                        // in VoteState's from bank states built in ReplayStage.
814                        NotificationEntry::Vote((vote_pubkey, ref vote_info, signature)) => {
815                            if let Some(sub) = subscriptions
816                                .node_progress_watchers()
817                                .get(&SubscriptionParams::Vote)
818                            {
819                                let rpc_vote = RpcVote {
820                                    vote_pubkey: vote_pubkey.to_string(),
821                                    slots: vote_info.slots(),
822                                    hash: bs58::encode(vote_info.hash()).into_string(),
823                                    timestamp: vote_info.timestamp(),
824                                    signature: signature.to_string(),
825                                };
826                                debug!("vote notify: {vote_info:?}");
827                                inc_new_counter_info!("rpc-subscription-notify-vote", 1);
828                                notifier.notify(&rpc_vote, sub, false);
829                            }
830                        }
831                        NotificationEntry::Root(root) => {
832                            if let Some(sub) = subscriptions
833                                .node_progress_watchers()
834                                .get(&SubscriptionParams::Root)
835                            {
836                                debug!("root notify: {root:?}");
837                                inc_new_counter_info!("rpc-subscription-notify-root", 1);
838                                notifier.notify(root, sub, false);
839                            }
840                        }
841                        NotificationEntry::Bank(commitment_slots) => {
842                            const SOURCE: &str = "bank";
843                            RpcSubscriptions::notify_watchers(
844                                max_complete_transaction_status_slot.clone(),
845                                subscriptions.commitment_watchers(),
846                                &bank_forks,
847                                &blockstore,
848                                &commitment_slots,
849                                &notifier,
850                                SOURCE,
851                            );
852                        }
853                        NotificationEntry::Gossip(slot) => {
854                            let commitment_slots = CommitmentSlots {
855                                highest_confirmed_slot: slot,
856                                ..CommitmentSlots::default()
857                            };
858                            const SOURCE: &str = "gossip";
859                            RpcSubscriptions::notify_watchers(
860                                max_complete_transaction_status_slot.clone(),
861                                subscriptions.gossip_watchers(),
862                                &bank_forks,
863                                &blockstore,
864                                &commitment_slots,
865                                &notifier,
866                                SOURCE,
867                            );
868                        }
869                        NotificationEntry::SignaturesReceived((slot, slot_signatures)) => {
870                            for slot_signature in &slot_signatures {
871                                if let Some(subs) = subscriptions.by_signature().get(slot_signature)
872                                {
873                                    for subscription in subs.values() {
874                                        if let SubscriptionParams::Signature(params) =
875                                            subscription.params()
876                                        {
877                                            if params.enable_received_notification {
878                                                notifier.notify(
879                                                    RpcResponse::from(RpcNotificationResponse {
880                                                        context: RpcNotificationContext { slot },
881                                                        value: RpcSignatureResult::ReceivedSignature(
882                                                            ReceivedSignatureResult::ReceivedSignature,
883                                                        ),
884                                                    }),
885                                                    subscription,
886                                                    false,
887                                                );
888                                            }
889                                        } else {
890                                            error!("invalid params type in visit_by_signature");
891                                        }
892                                    }
893                                }
894                            }
895                        }
896                    }
897                    stats.notification_entry_processing_time_us +=
898                        queued_at.elapsed().as_micros() as u64;
899                    stats.notification_entry_processing_count += 1;
900                }
901                Err(RecvTimeoutError::Timeout) => {
902                    // not a problem - try reading again
903                }
904                Err(RecvTimeoutError::Disconnected) => {
905                    warn!("RPC Notification thread - sender disconnected");
906                    break;
907                }
908            }
909            stats.maybe_submit();
910        }
911    }
912
913    fn notify_watchers(
914        max_complete_transaction_status_slot: Arc<AtomicU64>,
915        subscriptions: &HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
916        bank_forks: &Arc<RwLock<BankForks>>,
917        blockstore: &Blockstore,
918        commitment_slots: &CommitmentSlots,
919        notifier: &RpcNotifier,
920        source: &'static str,
921    ) {
922        let mut total_time = Measure::start("notify_watchers");
923
924        let num_accounts_found = AtomicUsize::new(0);
925        let num_accounts_notified = AtomicUsize::new(0);
926
927        let num_blocks_found = AtomicUsize::new(0);
928        let num_blocks_notified = AtomicUsize::new(0);
929
930        let num_logs_found = AtomicUsize::new(0);
931        let num_logs_notified = AtomicUsize::new(0);
932
933        let num_programs_found = AtomicUsize::new(0);
934        let num_programs_notified = AtomicUsize::new(0);
935
936        let num_signatures_found = AtomicUsize::new(0);
937        let num_signatures_notified = AtomicUsize::new(0);
938
939        let subscriptions = subscriptions.into_par_iter();
940        subscriptions.for_each(|(_id, subscription)| {
941            let slot = if let Some(commitment) = subscription.commitment() {
942                if commitment.is_finalized() {
943                    Some(commitment_slots.highest_super_majority_root)
944                } else if commitment.is_confirmed() {
945                    Some(commitment_slots.highest_confirmed_slot)
946                } else {
947                    Some(commitment_slots.slot)
948                }
949            } else {
950                error!("missing commitment in notify_watchers");
951                None
952            };
953            match subscription.params() {
954                SubscriptionParams::Account(params) => {
955                    num_accounts_found.fetch_add(1, Ordering::Relaxed);
956                    if let Some(slot) = slot {
957                        let notified = check_commitment_and_notify(
958                            params,
959                            subscription,
960                            bank_forks,
961                            slot,
962                            |bank, params| bank.get_account_modified_slot(&params.pubkey),
963                            filter_account_result,
964                            notifier,
965                            false,
966                        );
967
968                        if notified {
969                            num_accounts_notified.fetch_add(1, Ordering::Relaxed);
970                        }
971                    }
972                }
973                SubscriptionParams::Block(params) => {
974                    num_blocks_found.fetch_add(1, Ordering::Relaxed);
975                    if let Some(slot) = slot {
976                        let bank = bank_forks.read().unwrap().get(slot);
977                        if let Some(bank) = bank {
978                            // We're calling it unnotified in this context
979                            // because, logically, it gets set to `last_notified_slot + 1`
980                            // on the final iteration of the loop down below.
981                            // This is used to notify blocks for slots that were
982                            // potentially missed due to upstream transient errors
983                            // that led to this notification not being triggered for
984                            // a slot.
985                            //
986                            // e.g.
987                            // notify_watchers is triggered for Slot 1
988                            // some time passes
989                            // notify_watchers is triggered for Slot 4
990                            // this will try to fetch blocks for slots 2, 3, and 4
991                            // as long as they are ancestors of `slot`
992                            let mut w_last_unnotified_slot =
993                                subscription.last_notified_slot.write().unwrap();
994                            // would mean it's the first notification for this subscription connection
995                            if *w_last_unnotified_slot == 0 {
996                                *w_last_unnotified_slot = slot;
997                            }
998                            let mut slots_to_notify: Vec<_> =
999                                (*w_last_unnotified_slot..slot).collect();
1000                            let ancestors = bank.proper_ancestors_set();
1001                            slots_to_notify.retain(|slot| ancestors.contains(slot));
1002                            slots_to_notify.push(slot);
1003                            for s in slots_to_notify {
1004                                // To avoid skipping a slot that fails this condition,
1005                                // caused by non-deterministic concurrency accesses, we
1006                                // break out of the loop. Besides if the current `s` is
1007                                // greater, then any `s + K` is also greater.
1008                                if s > max_complete_transaction_status_slot.load(Ordering::SeqCst) {
1009                                    break;
1010                                }
1011
1012                                let block_update_result = blockstore
1013                                    .get_complete_block(s, false)
1014                                    .map_err(|e| {
1015                                        error!("get_complete_block error: {e}");
1016                                        RpcBlockUpdateError::BlockStoreError
1017                                    })
1018                                    .and_then(|block| filter_block_result_txs(block, s, params));
1019
1020                                match block_update_result {
1021                                    Ok(block_update) => {
1022                                        if let Some(block_update) = block_update {
1023                                            notifier.notify(
1024                                                RpcResponse::from(RpcNotificationResponse {
1025                                                    context: RpcNotificationContext { slot: s },
1026                                                    value: block_update,
1027                                                }),
1028                                                subscription,
1029                                                false,
1030                                            );
1031                                            num_blocks_notified.fetch_add(1, Ordering::Relaxed);
1032                                            // the next time this subscription is notified it will
1033                                            // try to fetch all slots between (s + 1) to `slot`, inclusively
1034                                            *w_last_unnotified_slot = s + 1;
1035                                        }
1036                                    }
1037                                    Err(err) => {
1038                                        // we don't advance `w_last_unnotified_slot` so that
1039                                        // it'll retry on the next notification trigger
1040                                        notifier.notify(
1041                                            RpcResponse::from(RpcNotificationResponse {
1042                                                context: RpcNotificationContext { slot: s },
1043                                                value: RpcBlockUpdate {
1044                                                    slot,
1045                                                    block: None,
1046                                                    err: Some(err),
1047                                                },
1048                                            }),
1049                                            subscription,
1050                                            false,
1051                                        );
1052                                    }
1053                                }
1054                            }
1055                        }
1056                    }
1057                }
1058                SubscriptionParams::Logs(params) => {
1059                    num_logs_found.fetch_add(1, Ordering::Relaxed);
1060                    if let Some(slot) = slot {
1061                        let notified = check_commitment_and_notify(
1062                            params,
1063                            subscription,
1064                            bank_forks,
1065                            slot,
1066                            get_transaction_logs,
1067                            filter_logs_results,
1068                            notifier,
1069                            false,
1070                        );
1071
1072                        if notified {
1073                            num_logs_notified.fetch_add(1, Ordering::Relaxed);
1074                        }
1075                    }
1076                }
1077                SubscriptionParams::Program(params) => {
1078                    num_programs_found.fetch_add(1, Ordering::Relaxed);
1079                    if let Some(slot) = slot {
1080                        let notified = check_commitment_and_notify(
1081                            params,
1082                            subscription,
1083                            bank_forks,
1084                            slot,
1085                            |bank, params| {
1086                                bank.get_program_accounts_modified_since_parent(&params.pubkey)
1087                            },
1088                            filter_program_results,
1089                            notifier,
1090                            false,
1091                        );
1092
1093                        if notified {
1094                            num_programs_notified.fetch_add(1, Ordering::Relaxed);
1095                        }
1096                    }
1097                }
1098                SubscriptionParams::Signature(params) => {
1099                    num_signatures_found.fetch_add(1, Ordering::Relaxed);
1100                    if let Some(slot) = slot {
1101                        let notified = check_commitment_and_notify(
1102                            params,
1103                            subscription,
1104                            bank_forks,
1105                            slot,
1106                            |bank, params| {
1107                                bank.get_signature_status_processed_since_parent(&params.signature)
1108                            },
1109                            filter_signature_result,
1110                            notifier,
1111                            true, // Unsubscribe.
1112                        );
1113
1114                        if notified {
1115                            num_signatures_notified.fetch_add(1, Ordering::Relaxed);
1116                        }
1117                    }
1118                }
1119                _ => error!("wrong subscription type in alps map"),
1120            }
1121        });
1122
1123        total_time.stop();
1124
1125        let total_notified = num_accounts_notified.load(Ordering::Relaxed)
1126            + num_logs_notified.load(Ordering::Relaxed)
1127            + num_programs_notified.load(Ordering::Relaxed)
1128            + num_signatures_notified.load(Ordering::Relaxed);
1129        let total_ms = total_time.as_ms();
1130        if total_notified > 0 || total_ms > 10 {
1131            debug!(
1132                "notified({}): accounts: {} / {} logs: {} / {} programs: {} / {} signatures: {} / \
1133                 {}",
1134                source,
1135                num_accounts_found.load(Ordering::Relaxed),
1136                num_accounts_notified.load(Ordering::Relaxed),
1137                num_logs_found.load(Ordering::Relaxed),
1138                num_logs_notified.load(Ordering::Relaxed),
1139                num_programs_found.load(Ordering::Relaxed),
1140                num_programs_notified.load(Ordering::Relaxed),
1141                num_signatures_found.load(Ordering::Relaxed),
1142                num_signatures_notified.load(Ordering::Relaxed),
1143            );
1144            datapoint_info!(
1145                "rpc_subscriptions",
1146                ("source", source, String),
1147                (
1148                    "num_account_subscriptions",
1149                    num_accounts_found.load(Ordering::Relaxed),
1150                    i64
1151                ),
1152                (
1153                    "num_account_pubkeys_notified",
1154                    num_accounts_notified.load(Ordering::Relaxed),
1155                    i64
1156                ),
1157                (
1158                    "num_logs_subscriptions",
1159                    num_logs_found.load(Ordering::Relaxed),
1160                    i64
1161                ),
1162                (
1163                    "num_logs_notified",
1164                    num_logs_notified.load(Ordering::Relaxed),
1165                    i64
1166                ),
1167                (
1168                    "num_program_subscriptions",
1169                    num_programs_found.load(Ordering::Relaxed),
1170                    i64
1171                ),
1172                (
1173                    "num_programs_notified",
1174                    num_programs_notified.load(Ordering::Relaxed),
1175                    i64
1176                ),
1177                (
1178                    "num_signature_subscriptions",
1179                    num_signatures_found.load(Ordering::Relaxed),
1180                    i64
1181                ),
1182                (
1183                    "num_signatures_notified",
1184                    num_signatures_notified.load(Ordering::Relaxed),
1185                    i64
1186                ),
1187                ("notifications_time", total_time.as_us() as i64, i64),
1188            );
1189        }
1190    }
1191
1192    fn shutdown(&mut self) -> std::thread::Result<()> {
1193        if self.t_cleanup.is_some() {
1194            info!("RPC Notification thread - shutting down");
1195            self.exit.store(true, Ordering::Relaxed);
1196            let x = self.t_cleanup.take().unwrap().join();
1197            info!("RPC Notification thread - shut down.");
1198            x
1199        } else {
1200            warn!("RPC Notification thread - already shut down.");
1201            Ok(())
1202        }
1203    }
1204
1205    #[cfg(test)]
1206    fn total(&self) -> usize {
1207        self.control.total()
1208    }
1209}
1210
1211#[cfg(test)]
1212pub(crate) mod tests {
1213    use {
1214        super::*,
1215        crate::{
1216            optimistically_confirmed_bank_tracker::{
1217                BankNotification, OptimisticallyConfirmedBank, OptimisticallyConfirmedBankTracker,
1218            },
1219            rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
1220            rpc_pubsub::RpcSolPubSubInternal,
1221            rpc_pubsub_service,
1222        },
1223        serial_test::serial,
1224        solana_commitment_config::CommitmentConfig,
1225        solana_keypair::Keypair,
1226        solana_ledger::get_tmp_ledger_path_auto_delete,
1227        solana_message::Message,
1228        solana_rpc_client_api::config::{
1229            RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
1230            RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
1231            RpcTransactionLogsFilter,
1232        },
1233        solana_runtime::{
1234            commitment::BlockCommitment,
1235            genesis_utils::{create_genesis_config, GenesisConfigInfo},
1236            prioritization_fee_cache::PrioritizationFeeCache,
1237        },
1238        solana_signer::Signer,
1239        solana_stake_interface as stake,
1240        solana_system_interface::{instruction as system_instruction, program as system_program},
1241        solana_system_transaction as system_transaction,
1242        solana_transaction::Transaction,
1243        solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
1244        std::{collections::HashSet, sync::atomic::AtomicU64},
1245    };
1246
1247    struct AccountResult {
1248        lamports: u64,
1249        subscription: u64,
1250        data: &'static str,
1251        space: usize,
1252    }
1253
1254    fn make_account_result(
1255        non_default_account: bool,
1256        account_result: AccountResult,
1257    ) -> serde_json::Value {
1258        json!({
1259           "jsonrpc": "2.0",
1260           "method": "accountNotification",
1261           "params": {
1262               "result": {
1263                   "context": { "slot": 1 },
1264                   "value": {
1265                       "data": account_result.data,
1266                       "executable": false,
1267                       "lamports": account_result.lamports,
1268                       "owner": "11111111111111111111111111111111",
1269                       "rentEpoch": if non_default_account {u64::MAX} else {0},
1270                       "space": account_result.space,
1271                    },
1272               },
1273               "subscription": account_result.subscription,
1274           }
1275        })
1276    }
1277
1278    #[test]
1279    #[serial]
1280    fn test_check_account_subscribe() {
1281        let GenesisConfigInfo {
1282            genesis_config,
1283            mint_keypair,
1284            ..
1285        } = create_genesis_config(100);
1286        let bank = Bank::new_for_tests(&genesis_config);
1287        let blockhash = bank.last_blockhash();
1288        let bank_forks = BankForks::new_rw_arc(bank);
1289        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1290        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1291        bank_forks.write().unwrap().insert(bank1);
1292        let alice = Keypair::new();
1293
1294        let exit = Arc::new(AtomicBool::new(false));
1295        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1296        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1297            exit,
1298            max_complete_transaction_status_slot,
1299            bank_forks.clone(),
1300            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1301                1, 1,
1302            ))),
1303            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
1304        ));
1305
1306        let tx0 = system_transaction::create_account(
1307            &mint_keypair,
1308            &alice,
1309            blockhash,
1310            1,
1311            0,
1312            &system_program::id(),
1313        );
1314        let expected0 = make_account_result(
1315            true,
1316            AccountResult {
1317                lamports: 1,
1318                subscription: 0,
1319                space: 0,
1320                data: "",
1321            },
1322        );
1323
1324        let tx1 = {
1325            let instruction =
1326                system_instruction::transfer(&alice.pubkey(), &mint_keypair.pubkey(), 1);
1327            let message = Message::new(&[instruction], Some(&mint_keypair.pubkey()));
1328            Transaction::new(&[&alice, &mint_keypair], message, blockhash)
1329        };
1330        let expected1 = make_account_result(
1331            false,
1332            AccountResult {
1333                lamports: 0,
1334                subscription: 2,
1335                space: 0,
1336                data: "",
1337            },
1338        );
1339
1340        let tx2 = system_transaction::create_account(
1341            &mint_keypair,
1342            &alice,
1343            blockhash,
1344            1,
1345            1024,
1346            &system_program::id(),
1347        );
1348        let expected2 = make_account_result(
1349            true,
1350            AccountResult {
1351                lamports: 1,
1352                subscription: 4,
1353                space: 1024,
1354                data: "error: data too large for bs58 encoding",
1355            },
1356        );
1357
1358        let subscribe_cases = vec![
1359            (alice.pubkey(), tx0, expected0),
1360            (alice.pubkey(), tx1, expected1),
1361            (alice.pubkey(), tx2, expected2),
1362        ];
1363
1364        for (pubkey, tx, expected) in subscribe_cases {
1365            let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1366
1367            let sub_id = rpc
1368                .account_subscribe(
1369                    pubkey.to_string(),
1370                    Some(RpcAccountInfoConfig {
1371                        commitment: Some(CommitmentConfig::processed()),
1372                        encoding: None,
1373                        data_slice: None,
1374                        min_context_slot: None,
1375                    }),
1376                )
1377                .unwrap();
1378
1379            subscriptions
1380                .control
1381                .assert_subscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1382                    pubkey,
1383                    commitment: CommitmentConfig::processed(),
1384                    data_slice: None,
1385                    encoding: UiAccountEncoding::Binary,
1386                }));
1387
1388            rpc.block_until_processed(&subscriptions);
1389
1390            bank_forks
1391                .read()
1392                .unwrap()
1393                .get(1)
1394                .unwrap()
1395                .process_transaction(&tx)
1396                .unwrap();
1397            let commitment_slots = CommitmentSlots {
1398                slot: 1,
1399                ..CommitmentSlots::default()
1400            };
1401            subscriptions.notify_subscribers(commitment_slots);
1402            let response = receiver.recv();
1403
1404            assert_eq!(
1405                expected,
1406                serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1407            );
1408            rpc.account_unsubscribe(sub_id).unwrap();
1409
1410            subscriptions
1411                .control
1412                .assert_unsubscribed(&SubscriptionParams::Account(AccountSubscriptionParams {
1413                    pubkey,
1414                    commitment: CommitmentConfig::processed(),
1415                    data_slice: None,
1416                    encoding: UiAccountEncoding::Binary,
1417                }));
1418        }
1419    }
1420
1421    #[test]
1422    #[serial]
1423    fn test_check_confirmed_block_subscribe() {
1424        let exit = Arc::new(AtomicBool::new(false));
1425        let GenesisConfigInfo {
1426            genesis_config,
1427            mint_keypair,
1428            ..
1429        } = create_genesis_config(10_000);
1430        let bank = Bank::new_for_tests(&genesis_config);
1431        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1432        let bank_forks = BankForks::new_rw_arc(bank);
1433        let optimistically_confirmed_bank =
1434            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1435        let ledger_path = get_tmp_ledger_path_auto_delete!();
1436        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1437        let blockstore = Arc::new(blockstore);
1438        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1439        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1440            exit,
1441            max_complete_transaction_status_slot,
1442            blockstore.clone(),
1443            bank_forks.clone(),
1444            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1445            optimistically_confirmed_bank,
1446        ));
1447        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1448        let filter = RpcBlockSubscribeFilter::All;
1449        let config = RpcBlockSubscribeConfig {
1450            commitment: Some(CommitmentConfig::confirmed()),
1451            encoding: Some(UiTransactionEncoding::Json),
1452            transaction_details: Some(TransactionDetails::Signatures),
1453            show_rewards: None,
1454            max_supported_transaction_version: None,
1455        };
1456        let params = BlockSubscriptionParams {
1457            kind: BlockSubscriptionKind::All,
1458            commitment: config.commitment.unwrap(),
1459            encoding: config.encoding.unwrap(),
1460            transaction_details: config.transaction_details.unwrap(),
1461            show_rewards: config.show_rewards.unwrap_or_default(),
1462            max_supported_transaction_version: config.max_supported_transaction_version,
1463        };
1464        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1465
1466        subscriptions
1467            .control
1468            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1469
1470        let bank = bank_forks.read().unwrap().working_bank();
1471        let keypair1 = Keypair::new();
1472        let keypair2 = Keypair::new();
1473        let keypair3 = Keypair::new();
1474        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1475        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1476            .unwrap();
1477        populate_blockstore_for_tests(
1478            create_test_transaction_entries(
1479                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1480                bank.clone(),
1481            )
1482            .0,
1483            bank,
1484            blockstore.clone(),
1485            max_complete_transaction_status_slot,
1486        );
1487
1488        let slot = 0;
1489        subscriptions.notify_gossip_subscribers(slot);
1490        let actual_resp = receiver.recv();
1491        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1492
1493        let confirmed_block =
1494            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1495        let block = confirmed_block
1496            .encode_with_options(
1497                params.encoding,
1498                BlockEncodingOptions {
1499                    transaction_details: params.transaction_details,
1500                    show_rewards: false,
1501                    max_supported_transaction_version: None,
1502                },
1503            )
1504            .unwrap();
1505        let expected_resp = RpcBlockUpdate {
1506            slot,
1507            block: Some(block),
1508            err: None,
1509        };
1510        let expected_resp = json!({
1511           "jsonrpc": "2.0",
1512           "method": "blockNotification",
1513           "params": {
1514               "result": {
1515                   "context": { "slot": slot },
1516                   "value": expected_resp,
1517               },
1518               "subscription": 0,
1519           }
1520        });
1521        assert_eq!(expected_resp, actual_resp);
1522
1523        // should not trigger since commitment NOT set to finalized
1524        subscriptions.notify_subscribers(CommitmentSlots {
1525            slot,
1526            root: slot,
1527            highest_confirmed_slot: slot,
1528            highest_super_majority_root: slot,
1529        });
1530        let should_err = receiver.recv_timeout(Duration::from_millis(300));
1531        assert!(should_err.is_err());
1532
1533        rpc.slot_unsubscribe(sub_id).unwrap();
1534        subscriptions
1535            .control
1536            .assert_unsubscribed(&SubscriptionParams::Block(params));
1537    }
1538
1539    #[test]
1540    #[serial]
1541    fn test_check_confirmed_block_subscribe_with_mentions() {
1542        let exit = Arc::new(AtomicBool::new(false));
1543        let GenesisConfigInfo {
1544            genesis_config,
1545            mint_keypair,
1546            ..
1547        } = create_genesis_config(10_000);
1548        let bank = Bank::new_for_tests(&genesis_config);
1549        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1550        let bank_forks = BankForks::new_rw_arc(bank);
1551        let optimistically_confirmed_bank =
1552            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1553        let ledger_path = get_tmp_ledger_path_auto_delete!();
1554        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1555        let blockstore = Arc::new(blockstore);
1556        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1557        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1558            exit,
1559            max_complete_transaction_status_slot,
1560            blockstore.clone(),
1561            bank_forks.clone(),
1562            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1563            optimistically_confirmed_bank,
1564        ));
1565        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1566        let keypair1 = Keypair::new();
1567        let filter =
1568            RpcBlockSubscribeFilter::MentionsAccountOrProgram(keypair1.pubkey().to_string());
1569        let config = RpcBlockSubscribeConfig {
1570            commitment: Some(CommitmentConfig::confirmed()),
1571            encoding: Some(UiTransactionEncoding::Json),
1572            transaction_details: Some(TransactionDetails::Signatures),
1573            show_rewards: None,
1574            max_supported_transaction_version: None,
1575        };
1576        let params = BlockSubscriptionParams {
1577            kind: BlockSubscriptionKind::MentionsAccountOrProgram(keypair1.pubkey()),
1578            commitment: config.commitment.unwrap(),
1579            encoding: config.encoding.unwrap(),
1580            transaction_details: config.transaction_details.unwrap(),
1581            show_rewards: config.show_rewards.unwrap_or_default(),
1582            max_supported_transaction_version: config.max_supported_transaction_version,
1583        };
1584        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1585
1586        subscriptions
1587            .control
1588            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1589
1590        let bank = bank_forks.read().unwrap().working_bank();
1591        let keypair2 = Keypair::new();
1592        let keypair3 = Keypair::new();
1593        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1594        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1595            .unwrap();
1596        populate_blockstore_for_tests(
1597            create_test_transaction_entries(
1598                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1599                bank.clone(),
1600            )
1601            .0,
1602            bank,
1603            blockstore.clone(),
1604            max_complete_transaction_status_slot,
1605        );
1606
1607        let slot = 0;
1608        subscriptions.notify_gossip_subscribers(slot);
1609        let actual_resp = receiver.recv();
1610        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1611
1612        // make sure it filtered out the other keypairs
1613        let mut confirmed_block =
1614            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1615        confirmed_block.transactions.retain(|tx_with_meta| {
1616            tx_with_meta
1617                .account_keys()
1618                .iter()
1619                .any(|key| key == &keypair1.pubkey())
1620        });
1621        let block = confirmed_block
1622            .encode_with_options(
1623                params.encoding,
1624                BlockEncodingOptions {
1625                    transaction_details: params.transaction_details,
1626                    show_rewards: false,
1627                    max_supported_transaction_version: None,
1628                },
1629            )
1630            .unwrap();
1631        let expected_resp = RpcBlockUpdate {
1632            slot,
1633            block: Some(block),
1634            err: None,
1635        };
1636        let expected_resp = json!({
1637           "jsonrpc": "2.0",
1638           "method": "blockNotification",
1639           "params": {
1640               "result": {
1641                   "context": { "slot": slot },
1642                   "value": expected_resp,
1643               },
1644               "subscription": 0,
1645           }
1646        });
1647        assert_eq!(expected_resp, actual_resp);
1648
1649        rpc.slot_unsubscribe(sub_id).unwrap();
1650        subscriptions
1651            .control
1652            .assert_unsubscribed(&SubscriptionParams::Block(params));
1653    }
1654
1655    #[test]
1656    #[serial]
1657    fn test_check_finalized_block_subscribe() {
1658        let exit = Arc::new(AtomicBool::new(false));
1659        let GenesisConfigInfo {
1660            genesis_config,
1661            mint_keypair,
1662            ..
1663        } = create_genesis_config(10_000);
1664        let bank = Bank::new_for_tests(&genesis_config);
1665        let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
1666        let bank_forks = BankForks::new_rw_arc(bank);
1667        let optimistically_confirmed_bank =
1668            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1669        let ledger_path = get_tmp_ledger_path_auto_delete!();
1670        let blockstore = Blockstore::open(ledger_path.path()).unwrap();
1671        let blockstore = Arc::new(blockstore);
1672        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1673        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
1674            exit,
1675            max_complete_transaction_status_slot,
1676            blockstore.clone(),
1677            bank_forks.clone(),
1678            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1679            optimistically_confirmed_bank,
1680        ));
1681        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1682        let filter = RpcBlockSubscribeFilter::All;
1683        let config = RpcBlockSubscribeConfig {
1684            commitment: Some(CommitmentConfig::finalized()),
1685            encoding: Some(UiTransactionEncoding::Json),
1686            transaction_details: Some(TransactionDetails::Signatures),
1687            show_rewards: None,
1688            max_supported_transaction_version: None,
1689        };
1690        let params = BlockSubscriptionParams {
1691            kind: BlockSubscriptionKind::All,
1692            commitment: config.commitment.unwrap(),
1693            encoding: config.encoding.unwrap(),
1694            transaction_details: config.transaction_details.unwrap(),
1695            show_rewards: config.show_rewards.unwrap_or_default(),
1696            max_supported_transaction_version: config.max_supported_transaction_version,
1697        };
1698        let sub_id = rpc.block_subscribe(filter, Some(config)).unwrap();
1699        subscriptions
1700            .control
1701            .assert_subscribed(&SubscriptionParams::Block(params.clone()));
1702
1703        let bank = bank_forks.read().unwrap().working_bank();
1704        let keypair1 = Keypair::new();
1705        let keypair2 = Keypair::new();
1706        let keypair3 = Keypair::new();
1707        let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
1708        bank.transfer(rent_exempt_amount, &mint_keypair, &keypair2.pubkey())
1709            .unwrap();
1710        populate_blockstore_for_tests(
1711            create_test_transaction_entries(
1712                vec![&mint_keypair, &keypair1, &keypair2, &keypair3],
1713                bank.clone(),
1714            )
1715            .0,
1716            bank,
1717            blockstore.clone(),
1718            max_complete_transaction_status_slot,
1719        );
1720
1721        let slot = 0;
1722        subscriptions.notify_subscribers(CommitmentSlots {
1723            slot,
1724            root: slot,
1725            highest_confirmed_slot: slot,
1726            highest_super_majority_root: slot,
1727        });
1728        let actual_resp = receiver.recv();
1729        let actual_resp = serde_json::from_str::<serde_json::Value>(&actual_resp).unwrap();
1730
1731        let confirmed_block =
1732            ConfirmedBlock::from(blockstore.get_complete_block(slot, false).unwrap());
1733        let block = confirmed_block
1734            .encode_with_options(
1735                params.encoding,
1736                BlockEncodingOptions {
1737                    transaction_details: params.transaction_details,
1738                    show_rewards: false,
1739                    max_supported_transaction_version: None,
1740                },
1741            )
1742            .unwrap();
1743        let expected_resp = RpcBlockUpdate {
1744            slot,
1745            block: Some(block),
1746            err: None,
1747        };
1748        let expected_resp = json!({
1749           "jsonrpc": "2.0",
1750           "method": "blockNotification",
1751           "params": {
1752               "result": {
1753                   "context": { "slot": slot },
1754                   "value": expected_resp,
1755               },
1756               "subscription": 0,
1757           }
1758        });
1759        assert_eq!(expected_resp, actual_resp);
1760
1761        // should not trigger since commitment set to finalized
1762        subscriptions.notify_gossip_subscribers(slot);
1763        let should_err = receiver.recv_timeout(Duration::from_millis(300));
1764        assert!(should_err.is_err());
1765
1766        rpc.slot_unsubscribe(sub_id).unwrap();
1767        subscriptions
1768            .control
1769            .assert_unsubscribed(&SubscriptionParams::Block(params));
1770    }
1771
1772    #[test]
1773    #[serial]
1774    fn test_check_program_subscribe() {
1775        let GenesisConfigInfo {
1776            genesis_config,
1777            mint_keypair,
1778            ..
1779        } = create_genesis_config(100);
1780        let bank = Bank::new_for_tests(&genesis_config);
1781        let blockhash = bank.last_blockhash();
1782        let bank_forks = BankForks::new_rw_arc(bank);
1783        let alice = Keypair::new();
1784        let tx = system_transaction::create_account(
1785            &mint_keypair,
1786            &alice,
1787            blockhash,
1788            1,
1789            16,
1790            &stake::program::id(),
1791        );
1792        bank_forks
1793            .read()
1794            .unwrap()
1795            .get(0)
1796            .unwrap()
1797            .process_transaction(&tx)
1798            .unwrap();
1799
1800        let exit = Arc::new(AtomicBool::new(false));
1801        let optimistically_confirmed_bank =
1802            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1803        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1804        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1805            exit,
1806            max_complete_transaction_status_slot,
1807            bank_forks,
1808            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
1809            optimistically_confirmed_bank,
1810        ));
1811        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1812        let sub_id = rpc
1813            .program_subscribe(
1814                stake::program::id().to_string(),
1815                Some(RpcProgramAccountsConfig {
1816                    account_config: RpcAccountInfoConfig {
1817                        commitment: Some(CommitmentConfig::processed()),
1818                        ..RpcAccountInfoConfig::default()
1819                    },
1820                    ..RpcProgramAccountsConfig::default()
1821                }),
1822            )
1823            .unwrap();
1824
1825        subscriptions
1826            .control
1827            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1828                pubkey: stake::program::id(),
1829                filters: Vec::new(),
1830                commitment: CommitmentConfig::processed(),
1831                data_slice: None,
1832                encoding: UiAccountEncoding::Binary,
1833                with_context: false,
1834            }));
1835
1836        subscriptions.notify_subscribers(CommitmentSlots::default());
1837        let response = receiver.recv();
1838        let expected = json!({
1839           "jsonrpc": "2.0",
1840           "method": "programNotification",
1841           "params": {
1842               "result": {
1843                   "context": { "slot": 0 },
1844                   "value": {
1845                       "account": {
1846                          "data": "1111111111111111",
1847                          "executable": false,
1848                          "lamports": 1,
1849                          "owner": "Stake11111111111111111111111111111111111111",
1850                          "rentEpoch": u64::MAX,
1851                          "space": 16,
1852                       },
1853                       "pubkey": alice.pubkey().to_string(),
1854                    },
1855               },
1856               "subscription": 0,
1857           }
1858        });
1859        assert_eq!(
1860            expected,
1861            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
1862        );
1863
1864        rpc.program_unsubscribe(sub_id).unwrap();
1865        subscriptions
1866            .control
1867            .assert_unsubscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1868                pubkey: stake::program::id(),
1869                filters: Vec::new(),
1870                commitment: CommitmentConfig::processed(),
1871                data_slice: None,
1872                encoding: UiAccountEncoding::Binary,
1873                with_context: false,
1874            }));
1875    }
1876
1877    #[test]
1878    #[serial]
1879    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() {
1880        // Testing if we can get the pubsub notification if a slot does not
1881        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
1882        // notification.
1883        let GenesisConfigInfo {
1884            genesis_config,
1885            mint_keypair,
1886            ..
1887        } = create_genesis_config(100);
1888        let bank = Bank::new_for_tests(&genesis_config);
1889
1890        let blockhash = bank.last_blockhash();
1891        let bank_forks = BankForks::new_rw_arc(bank);
1892
1893        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
1894        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
1895        bank_forks.write().unwrap().insert(bank1);
1896        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
1897
1898        // add account for alice and process the transaction at bank1
1899        let alice = Keypair::new();
1900        let tx = system_transaction::create_account(
1901            &mint_keypair,
1902            &alice,
1903            blockhash,
1904            1,
1905            16,
1906            &stake::program::id(),
1907        );
1908
1909        bank1.process_transaction(&tx).unwrap();
1910
1911        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
1912        bank_forks.write().unwrap().insert(bank2);
1913
1914        // add account for bob and process the transaction at bank2
1915        let bob = Keypair::new();
1916        let tx = system_transaction::create_account(
1917            &mint_keypair,
1918            &bob,
1919            blockhash,
1920            2,
1921            16,
1922            &stake::program::id(),
1923        );
1924        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
1925
1926        bank2.process_transaction(&tx).unwrap();
1927
1928        let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
1929        bank_forks.write().unwrap().insert(bank3);
1930
1931        // add account for joe and process the transaction at bank3
1932        let joe = Keypair::new();
1933        let tx = system_transaction::create_account(
1934            &mint_keypair,
1935            &joe,
1936            blockhash,
1937            3,
1938            16,
1939            &stake::program::id(),
1940        );
1941        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
1942
1943        bank3.process_transaction(&tx).unwrap();
1944
1945        // now add programSubscribe at the "confirmed" commitment level
1946        let exit = Arc::new(AtomicBool::new(false));
1947        let optimistically_confirmed_bank =
1948            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
1949        let mut pending_optimistically_confirmed_banks = HashSet::new();
1950        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
1951        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
1952            exit,
1953            max_complete_transaction_status_slot,
1954            bank_forks.clone(),
1955            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1956                1, 1,
1957            ))),
1958            optimistically_confirmed_bank.clone(),
1959        ));
1960
1961        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
1962
1963        let sub_id = rpc
1964            .program_subscribe(
1965                stake::program::id().to_string(),
1966                Some(RpcProgramAccountsConfig {
1967                    account_config: RpcAccountInfoConfig {
1968                        commitment: Some(CommitmentConfig::confirmed()),
1969                        ..RpcAccountInfoConfig::default()
1970                    },
1971                    ..RpcProgramAccountsConfig::default()
1972                }),
1973            )
1974            .unwrap();
1975
1976        subscriptions
1977            .control
1978            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
1979                pubkey: stake::program::id(),
1980                filters: Vec::new(),
1981                encoding: UiAccountEncoding::Binary,
1982                data_slice: None,
1983                commitment: CommitmentConfig::confirmed(),
1984                with_context: false,
1985            }));
1986
1987        let mut highest_confirmed_slot: Slot = 0;
1988        let mut highest_root_slot: Slot = 0;
1989        let mut last_notified_confirmed_slot: Slot = 0;
1990        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is unfrozen, we expect
1991        // to see transaction for alice and bob to be notified in order.
1992        OptimisticallyConfirmedBankTracker::process_notification(
1993            (
1994                BankNotification::OptimisticallyConfirmed(3),
1995                None, /* no work sequence */
1996            ),
1997            &bank_forks,
1998            &optimistically_confirmed_bank,
1999            &subscriptions,
2000            &mut pending_optimistically_confirmed_banks,
2001            &mut last_notified_confirmed_slot,
2002            &mut highest_confirmed_slot,
2003            &mut highest_root_slot,
2004            &None,
2005            &PrioritizationFeeCache::default(),
2006            &None, // no dependency tracker
2007        );
2008
2009        // a closure to reduce code duplications in building expected responses:
2010        let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2011            json!({
2012               "jsonrpc": "2.0",
2013               "method": "programNotification",
2014               "params": {
2015                   "result": {
2016                       "context": { "slot": slot },
2017                       "value": {
2018                           "account": {
2019                              "data": "1111111111111111",
2020                              "executable": false,
2021                              "lamports": lamports,
2022                              "owner": "Stake11111111111111111111111111111111111111",
2023                              "rentEpoch": u64::MAX,
2024                              "space": 16,
2025                           },
2026                           "pubkey": pubkey,
2027                        },
2028                   },
2029                   "subscription": subscription,
2030               }
2031            })
2032        };
2033
2034        let response = receiver.recv();
2035        let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2036        assert_eq!(
2037            expected,
2038            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2039        );
2040
2041        let response = receiver.recv();
2042        let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2043        assert_eq!(
2044            expected,
2045            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2046        );
2047
2048        bank3.freeze();
2049        OptimisticallyConfirmedBankTracker::process_notification(
2050            (
2051                BankNotification::Frozen(bank3),
2052                None, /* no work sequence */
2053            ),
2054            &bank_forks,
2055            &optimistically_confirmed_bank,
2056            &subscriptions,
2057            &mut pending_optimistically_confirmed_banks,
2058            &mut last_notified_confirmed_slot,
2059            &mut highest_confirmed_slot,
2060            &mut highest_root_slot,
2061            &None,
2062            &PrioritizationFeeCache::default(),
2063            &None, // no dependency tracker
2064        );
2065
2066        let response = receiver.recv();
2067        let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2068        assert_eq!(
2069            expected,
2070            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2071        );
2072        rpc.program_unsubscribe(sub_id).unwrap();
2073    }
2074
2075    #[test]
2076    #[serial]
2077    #[should_panic]
2078    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications(
2079    ) {
2080        // Testing if we can get the pubsub notification if a slot does not
2081        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
2082        // notification with a bank in the BankForks. We are not expecting to receive any notifications -- should panic.
2083        let GenesisConfigInfo {
2084            genesis_config,
2085            mint_keypair,
2086            ..
2087        } = create_genesis_config(100);
2088        let bank = Bank::new_for_tests(&genesis_config);
2089
2090        let blockhash = bank.last_blockhash();
2091        let bank_forks = BankForks::new_rw_arc(bank);
2092
2093        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2094        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2095        bank_forks.write().unwrap().insert(bank1);
2096        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2097
2098        // add account for alice and process the transaction at bank1
2099        let alice = Keypair::new();
2100        let tx = system_transaction::create_account(
2101            &mint_keypair,
2102            &alice,
2103            blockhash,
2104            1,
2105            16,
2106            &stake::program::id(),
2107        );
2108
2109        bank1.process_transaction(&tx).unwrap();
2110
2111        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2112        bank_forks.write().unwrap().insert(bank2);
2113
2114        // add account for bob and process the transaction at bank2
2115        let bob = Keypair::new();
2116        let tx = system_transaction::create_account(
2117            &mint_keypair,
2118            &bob,
2119            blockhash,
2120            2,
2121            16,
2122            &stake::program::id(),
2123        );
2124        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2125
2126        bank2.process_transaction(&tx).unwrap();
2127
2128        // now add programSubscribe at the "confirmed" commitment level
2129        let exit = Arc::new(AtomicBool::new(false));
2130        let optimistically_confirmed_bank =
2131            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2132        let mut pending_optimistically_confirmed_banks = HashSet::new();
2133        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2134        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2135            exit,
2136            max_complete_transaction_status_slot,
2137            bank_forks.clone(),
2138            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2139                1, 1,
2140            ))),
2141            optimistically_confirmed_bank.clone(),
2142        ));
2143        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2144        rpc.program_subscribe(
2145            stake::program::id().to_string(),
2146            Some(RpcProgramAccountsConfig {
2147                account_config: RpcAccountInfoConfig {
2148                    commitment: Some(CommitmentConfig::confirmed()),
2149                    ..RpcAccountInfoConfig::default()
2150                },
2151                ..RpcProgramAccountsConfig::default()
2152            }),
2153        )
2154        .unwrap();
2155
2156        subscriptions
2157            .control
2158            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2159                pubkey: stake::program::id(),
2160                filters: Vec::new(),
2161                encoding: UiAccountEncoding::Binary,
2162                data_slice: None,
2163                commitment: CommitmentConfig::confirmed(),
2164                with_context: false,
2165            }));
2166
2167        let mut highest_confirmed_slot: Slot = 0;
2168        let mut highest_root_slot: Slot = 0;
2169        let mut last_notified_confirmed_slot: Slot = 0;
2170        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we do not
2171        // expect to see any RPC notifications.
2172        OptimisticallyConfirmedBankTracker::process_notification(
2173            (
2174                BankNotification::OptimisticallyConfirmed(3),
2175                None, /* no work sequence */
2176            ),
2177            &bank_forks,
2178            &optimistically_confirmed_bank,
2179            &subscriptions,
2180            &mut pending_optimistically_confirmed_banks,
2181            &mut last_notified_confirmed_slot,
2182            &mut highest_confirmed_slot,
2183            &mut highest_root_slot,
2184            &None,
2185            &PrioritizationFeeCache::default(),
2186            &None, // no dependency tracker
2187        );
2188
2189        // The following should panic
2190        let _response = receiver.recv();
2191    }
2192
2193    #[test]
2194    #[serial]
2195    fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() {
2196        // Testing if we can get the pubsub notification if a slot does not
2197        // receive OptimisticallyConfirmed but its descendant slot get the confirmed
2198        // notification. It differs from the test_check_program_subscribe_for_missing_optimistically_confirmed_slot
2199        // test in that when the descendant get confirmed, the descendant does not have a bank yet.
2200        let GenesisConfigInfo {
2201            genesis_config,
2202            mint_keypair,
2203            ..
2204        } = create_genesis_config(100);
2205        let bank = Bank::new_for_tests(&genesis_config);
2206
2207        let blockhash = bank.last_blockhash();
2208        let bank_forks = BankForks::new_rw_arc(bank);
2209
2210        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2211        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
2212        bank_forks.write().unwrap().insert(bank1);
2213        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
2214
2215        // add account for alice and process the transaction at bank1
2216        let alice = Keypair::new();
2217        let tx = system_transaction::create_account(
2218            &mint_keypair,
2219            &alice,
2220            blockhash,
2221            1,
2222            16,
2223            &stake::program::id(),
2224        );
2225
2226        bank1.process_transaction(&tx).unwrap();
2227
2228        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
2229        bank_forks.write().unwrap().insert(bank2);
2230
2231        // add account for bob and process the transaction at bank2
2232        let bob = Keypair::new();
2233        let tx = system_transaction::create_account(
2234            &mint_keypair,
2235            &bob,
2236            blockhash,
2237            2,
2238            16,
2239            &stake::program::id(),
2240        );
2241        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2242
2243        bank2.process_transaction(&tx).unwrap();
2244
2245        // now add programSubscribe at the "confirmed" commitment level
2246        let exit = Arc::new(AtomicBool::new(false));
2247        let optimistically_confirmed_bank =
2248            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2249        let mut pending_optimistically_confirmed_banks = HashSet::new();
2250        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2251        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2252            exit,
2253            max_complete_transaction_status_slot,
2254            bank_forks.clone(),
2255            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2256                1, 1,
2257            ))),
2258            optimistically_confirmed_bank.clone(),
2259        ));
2260        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2261        let sub_id = rpc
2262            .program_subscribe(
2263                stake::program::id().to_string(),
2264                Some(RpcProgramAccountsConfig {
2265                    account_config: RpcAccountInfoConfig {
2266                        commitment: Some(CommitmentConfig::confirmed()),
2267                        ..RpcAccountInfoConfig::default()
2268                    },
2269                    ..RpcProgramAccountsConfig::default()
2270                }),
2271            )
2272            .unwrap();
2273
2274        subscriptions
2275            .control
2276            .assert_subscribed(&SubscriptionParams::Program(ProgramSubscriptionParams {
2277                pubkey: stake::program::id(),
2278                filters: Vec::new(),
2279                encoding: UiAccountEncoding::Binary,
2280                data_slice: None,
2281                commitment: CommitmentConfig::confirmed(),
2282                with_context: false,
2283            }));
2284
2285        let mut highest_confirmed_slot: Slot = 0;
2286        let mut highest_root_slot: Slot = 0;
2287        let mut last_notified_confirmed_slot: Slot = 0;
2288        // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we expect
2289        // to see transaction for alice and bob to be notified only when bank3 is added to the fork and
2290        // frozen. The notifications should be in the increasing order of the slot.
2291        OptimisticallyConfirmedBankTracker::process_notification(
2292            (
2293                BankNotification::OptimisticallyConfirmed(3),
2294                None, /* no work sequence */
2295            ),
2296            &bank_forks,
2297            &optimistically_confirmed_bank,
2298            &subscriptions,
2299            &mut pending_optimistically_confirmed_banks,
2300            &mut last_notified_confirmed_slot,
2301            &mut highest_confirmed_slot,
2302            &mut highest_root_slot,
2303            &None,
2304            &PrioritizationFeeCache::default(),
2305            &None, // no dependency tracker
2306        );
2307
2308        // a closure to reduce code duplications in building expected responses:
2309        let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
2310            json!({
2311               "jsonrpc": "2.0",
2312               "method": "programNotification",
2313               "params": {
2314                   "result": {
2315                       "context": { "slot": slot },
2316                       "value": {
2317                           "account": {
2318                              "data": "1111111111111111",
2319                              "executable": false,
2320                              "lamports": lamports,
2321                              "owner": "Stake11111111111111111111111111111111111111",
2322                              "rentEpoch": u64::MAX,
2323                              "space": 16,
2324                           },
2325                           "pubkey": pubkey,
2326                        },
2327                   },
2328                   "subscription": subscription,
2329               }
2330            })
2331        };
2332
2333        let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
2334        bank_forks.write().unwrap().insert(bank3);
2335
2336        // add account for joe and process the transaction at bank3
2337        let joe = Keypair::new();
2338        let tx = system_transaction::create_account(
2339            &mint_keypair,
2340            &joe,
2341            blockhash,
2342            3,
2343            16,
2344            &stake::program::id(),
2345        );
2346        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
2347
2348        bank3.process_transaction(&tx).unwrap();
2349        bank3.freeze();
2350        OptimisticallyConfirmedBankTracker::process_notification(
2351            (
2352                BankNotification::Frozen(bank3),
2353                None, /* no work sequence */
2354            ),
2355            &bank_forks,
2356            &optimistically_confirmed_bank,
2357            &subscriptions,
2358            &mut pending_optimistically_confirmed_banks,
2359            &mut last_notified_confirmed_slot,
2360            &mut highest_confirmed_slot,
2361            &mut highest_root_slot,
2362            &None,
2363            &PrioritizationFeeCache::default(),
2364            &None, // no dependency tracker
2365        );
2366
2367        let response = receiver.recv();
2368        let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
2369        assert_eq!(
2370            expected,
2371            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2372        );
2373
2374        let response = receiver.recv();
2375        let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
2376        assert_eq!(
2377            expected,
2378            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2379        );
2380
2381        let response = receiver.recv();
2382        let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
2383        assert_eq!(
2384            expected,
2385            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2386        );
2387        rpc.program_unsubscribe(sub_id).unwrap();
2388    }
2389
2390    #[test]
2391    #[serial]
2392    fn test_check_signature_subscribe() {
2393        let GenesisConfigInfo {
2394            genesis_config,
2395            mint_keypair,
2396            ..
2397        } = create_genesis_config(100);
2398        let bank = Bank::new_for_tests(&genesis_config);
2399        let blockhash = bank.last_blockhash();
2400        let bank_forks = BankForks::new_rw_arc(bank);
2401        let alice = Keypair::new();
2402
2403        let past_bank_tx =
2404            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, blockhash);
2405        let unprocessed_tx =
2406            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, blockhash);
2407        let processed_tx =
2408            system_transaction::transfer(&mint_keypair, &alice.pubkey(), 3, blockhash);
2409
2410        bank_forks
2411            .read()
2412            .unwrap()
2413            .get(0)
2414            .unwrap()
2415            .process_transaction(&past_bank_tx)
2416            .unwrap();
2417
2418        let next_bank = Bank::new_from_parent(
2419            bank_forks.read().unwrap().get(0).unwrap(),
2420            &solana_pubkey::new_rand(),
2421            1,
2422        );
2423        bank_forks.write().unwrap().insert(next_bank);
2424
2425        bank_forks
2426            .read()
2427            .unwrap()
2428            .get(1)
2429            .unwrap()
2430            .process_transaction(&processed_tx)
2431            .unwrap();
2432        let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
2433
2434        let mut cache0 = BlockCommitment::default();
2435        cache0.increase_confirmation_stake(1, 10);
2436        let cache1 = BlockCommitment::default();
2437
2438        let mut block_commitment = HashMap::new();
2439        block_commitment.entry(0).or_insert(cache0);
2440        block_commitment.entry(1).or_insert(cache1);
2441        let block_commitment_cache = BlockCommitmentCache::new(
2442            block_commitment,
2443            10,
2444            CommitmentSlots {
2445                slot: bank1.slot(),
2446                ..CommitmentSlots::default()
2447            },
2448        );
2449
2450        let exit = Arc::new(AtomicBool::new(false));
2451        let optimistically_confirmed_bank =
2452            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2453        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2454        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2455            exit,
2456            max_complete_transaction_status_slot,
2457            bank_forks,
2458            Arc::new(RwLock::new(block_commitment_cache)),
2459            optimistically_confirmed_bank,
2460        ));
2461
2462        let (past_bank_rpc1, mut past_bank_receiver1) =
2463            rpc_pubsub_service::test_connection(&subscriptions);
2464        let (past_bank_rpc2, mut past_bank_receiver2) =
2465            rpc_pubsub_service::test_connection(&subscriptions);
2466        let (processed_rpc, mut processed_receiver) =
2467            rpc_pubsub_service::test_connection(&subscriptions);
2468        let (another_rpc, _another_receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2469        let (processed_rpc3, mut processed_receiver3) =
2470            rpc_pubsub_service::test_connection(&subscriptions);
2471
2472        let past_bank_sub_id1 = past_bank_rpc1
2473            .signature_subscribe(
2474                past_bank_tx.signatures[0].to_string(),
2475                Some(RpcSignatureSubscribeConfig {
2476                    commitment: Some(CommitmentConfig::processed()),
2477                    enable_received_notification: Some(false),
2478                }),
2479            )
2480            .unwrap();
2481        let past_bank_sub_id2 = past_bank_rpc2
2482            .signature_subscribe(
2483                past_bank_tx.signatures[0].to_string(),
2484                Some(RpcSignatureSubscribeConfig {
2485                    commitment: Some(CommitmentConfig::finalized()),
2486                    enable_received_notification: Some(false),
2487                }),
2488            )
2489            .unwrap();
2490        let processed_sub_id = processed_rpc
2491            .signature_subscribe(
2492                processed_tx.signatures[0].to_string(),
2493                Some(RpcSignatureSubscribeConfig {
2494                    commitment: Some(CommitmentConfig::processed()),
2495                    enable_received_notification: Some(false),
2496                }),
2497            )
2498            .unwrap();
2499        another_rpc
2500            .signature_subscribe(
2501                unprocessed_tx.signatures[0].to_string(),
2502                Some(RpcSignatureSubscribeConfig {
2503                    commitment: Some(CommitmentConfig::processed()),
2504                    enable_received_notification: Some(false),
2505                }),
2506            )
2507            .unwrap();
2508
2509        // Add a subscription that gets `received` notifications
2510        let processed_sub_id3 = processed_rpc3
2511            .signature_subscribe(
2512                unprocessed_tx.signatures[0].to_string(),
2513                Some(RpcSignatureSubscribeConfig {
2514                    commitment: Some(CommitmentConfig::processed()),
2515                    enable_received_notification: Some(true),
2516                }),
2517            )
2518            .unwrap();
2519
2520        assert!(subscriptions
2521            .control
2522            .signature_subscribed(&unprocessed_tx.signatures[0]));
2523        assert!(subscriptions
2524            .control
2525            .signature_subscribed(&processed_tx.signatures[0]));
2526
2527        let mut commitment_slots = CommitmentSlots::default();
2528        let received_slot = 1;
2529        commitment_slots.slot = received_slot;
2530        subscriptions
2531            .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
2532        subscriptions.notify_subscribers(commitment_slots);
2533        let expected_res =
2534            RpcSignatureResult::ProcessedSignature(ProcessedSignatureResult { err: None });
2535        let received_expected_res =
2536            RpcSignatureResult::ReceivedSignature(ReceivedSignatureResult::ReceivedSignature);
2537        struct Notification {
2538            slot: Slot,
2539            id: u64,
2540        }
2541
2542        let expected_notification =
2543            |exp: Notification, expected_res: &RpcSignatureResult| -> String {
2544                let json = json!({
2545                    "jsonrpc": "2.0",
2546                    "method": "signatureNotification",
2547                    "params": {
2548                        "result": {
2549                            "context": { "slot": exp.slot },
2550                            "value": expected_res,
2551                        },
2552                        "subscription": exp.id,
2553                    }
2554                });
2555                serde_json::to_string(&json).unwrap()
2556            };
2557
2558        // Expect to receive a notification from bank 1 because this subscription is
2559        // looking for 0 confirmations and so checks the current bank
2560        let expected = expected_notification(
2561            Notification {
2562                slot: 1,
2563                id: past_bank_sub_id1.into(),
2564            },
2565            &expected_res,
2566        );
2567        let response = past_bank_receiver1.recv();
2568        assert_eq!(expected, response);
2569
2570        // Expect to receive a notification from bank 0 because this subscription is
2571        // looking for 1 confirmation and so checks the past bank
2572        let expected = expected_notification(
2573            Notification {
2574                slot: 0,
2575                id: past_bank_sub_id2.into(),
2576            },
2577            &expected_res,
2578        );
2579        let response = past_bank_receiver2.recv();
2580        assert_eq!(expected, response);
2581
2582        let expected = expected_notification(
2583            Notification {
2584                slot: 1,
2585                id: processed_sub_id.into(),
2586            },
2587            &expected_res,
2588        );
2589        let response = processed_receiver.recv();
2590        assert_eq!(expected, response);
2591
2592        // Expect a "received" notification
2593        let expected = expected_notification(
2594            Notification {
2595                slot: received_slot,
2596                id: processed_sub_id3.into(),
2597            },
2598            &received_expected_res,
2599        );
2600        let response = processed_receiver3.recv();
2601        assert_eq!(expected, response);
2602
2603        // Subscription should be automatically removed after notification
2604
2605        assert!(!subscriptions
2606            .control
2607            .signature_subscribed(&processed_tx.signatures[0]));
2608        assert!(!subscriptions
2609            .control
2610            .signature_subscribed(&past_bank_tx.signatures[0]));
2611
2612        // Unprocessed signature subscription should not be removed
2613        assert!(subscriptions
2614            .control
2615            .signature_subscribed(&unprocessed_tx.signatures[0]));
2616    }
2617
2618    #[test]
2619    #[serial]
2620    fn test_check_slot_subscribe() {
2621        let exit = Arc::new(AtomicBool::new(false));
2622        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2623        let bank = Bank::new_for_tests(&genesis_config);
2624        let bank_forks = BankForks::new_rw_arc(bank);
2625        let optimistically_confirmed_bank =
2626            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2627        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2628        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2629            exit,
2630            max_complete_transaction_status_slot,
2631            bank_forks,
2632            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2633            optimistically_confirmed_bank,
2634        ));
2635        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2636        let sub_id = rpc.slot_subscribe().unwrap();
2637
2638        subscriptions
2639            .control
2640            .assert_subscribed(&SubscriptionParams::Slot);
2641
2642        subscriptions.notify_slot(0, 0, 0);
2643        let response = receiver.recv();
2644
2645        let expected_res = SlotInfo {
2646            parent: 0,
2647            slot: 0,
2648            root: 0,
2649        };
2650        let expected_res_str = serde_json::to_string(&expected_res).unwrap();
2651
2652        let expected = format!(
2653            r#"{{"jsonrpc":"2.0","method":"slotNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2654        );
2655        assert_eq!(expected, response);
2656
2657        rpc.slot_unsubscribe(sub_id).unwrap();
2658        subscriptions
2659            .control
2660            .assert_unsubscribed(&SubscriptionParams::Slot);
2661    }
2662
2663    #[test]
2664    #[serial]
2665    fn test_check_root_subscribe() {
2666        let exit = Arc::new(AtomicBool::new(false));
2667        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
2668        let bank = Bank::new_for_tests(&genesis_config);
2669        let bank_forks = BankForks::new_rw_arc(bank);
2670        let optimistically_confirmed_bank =
2671            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2672        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2673        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2674            exit,
2675            max_complete_transaction_status_slot,
2676            bank_forks,
2677            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2678            optimistically_confirmed_bank,
2679        ));
2680        let (rpc, mut receiver) = rpc_pubsub_service::test_connection(&subscriptions);
2681        let sub_id = rpc.root_subscribe().unwrap();
2682
2683        subscriptions
2684            .control
2685            .assert_subscribed(&SubscriptionParams::Root);
2686
2687        subscriptions.notify_roots(vec![2, 1, 3]);
2688
2689        for expected_root in 1..=3 {
2690            let response = receiver.recv();
2691
2692            let expected_res_str =
2693                serde_json::to_string(&serde_json::to_value(expected_root).unwrap()).unwrap();
2694            let expected = format!(
2695                r#"{{"jsonrpc":"2.0","method":"rootNotification","params":{{"result":{expected_res_str},"subscription":0}}}}"#
2696            );
2697            assert_eq!(expected, response);
2698        }
2699
2700        rpc.root_unsubscribe(sub_id).unwrap();
2701        subscriptions
2702            .control
2703            .assert_unsubscribed(&SubscriptionParams::Root);
2704    }
2705
2706    #[test]
2707    #[serial]
2708    fn test_gossip_separate_account_notifications() {
2709        let GenesisConfigInfo {
2710            genesis_config,
2711            mint_keypair,
2712            ..
2713        } = create_genesis_config(100);
2714        let bank = Bank::new_for_tests(&genesis_config);
2715        let blockhash = bank.last_blockhash();
2716        let bank_forks = BankForks::new_rw_arc(bank);
2717        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
2718        let bank1 = Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 1);
2719        bank_forks.write().unwrap().insert(bank1);
2720        let bank2 = Bank::new_from_parent(bank0, &Pubkey::default(), 2);
2721        bank_forks.write().unwrap().insert(bank2);
2722
2723        let alice = Keypair::from_base58_string("sfLnS4rZ5a8gXke3aGxCgM6usFAVPxLUaBSRdssGY9uS5eoiEWQ41CqDcpXbcekpKsie8Lyy3LNFdhEvjUE1wd9");
2724
2725        let optimistically_confirmed_bank =
2726            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2727        let mut pending_optimistically_confirmed_banks = HashSet::new();
2728
2729        let exit = Arc::new(AtomicBool::new(false));
2730        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2731        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2732            exit,
2733            max_complete_transaction_status_slot,
2734            bank_forks.clone(),
2735            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
2736                1, 1,
2737            ))),
2738            optimistically_confirmed_bank.clone(),
2739        ));
2740        let (rpc0, mut receiver0) = rpc_pubsub_service::test_connection(&subscriptions);
2741        let (rpc1, mut receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
2742        let sub_id0 = rpc0
2743            .account_subscribe(
2744                alice.pubkey().to_string(),
2745                Some(RpcAccountInfoConfig {
2746                    commitment: Some(CommitmentConfig::confirmed()),
2747                    encoding: None,
2748                    data_slice: None,
2749                    min_context_slot: None,
2750                }),
2751            )
2752            .unwrap();
2753
2754        assert!(subscriptions.control.account_subscribed(&alice.pubkey()));
2755        rpc0.block_until_processed(&subscriptions);
2756
2757        let tx = system_transaction::create_account(
2758            &mint_keypair,
2759            &alice,
2760            blockhash,
2761            1,
2762            16,
2763            &stake::program::id(),
2764        );
2765
2766        // Add the transaction to the 1st bank and then freeze the bank
2767        let bank1 = bank_forks.write().unwrap().get(1).unwrap();
2768        bank1.process_transaction(&tx).unwrap();
2769        bank1.freeze();
2770
2771        // Add the same transaction to the unfrozen 2nd bank
2772        bank_forks
2773            .read()
2774            .unwrap()
2775            .get(2)
2776            .unwrap()
2777            .process_transaction(&tx)
2778            .unwrap();
2779
2780        // First, notify the unfrozen bank first to queue pending notification
2781        let mut highest_confirmed_slot: Slot = 0;
2782        let mut highest_root_slot: Slot = 0;
2783        let mut last_notified_confirmed_slot: Slot = 0;
2784        OptimisticallyConfirmedBankTracker::process_notification(
2785            (BankNotification::OptimisticallyConfirmed(2), None),
2786            &bank_forks,
2787            &optimistically_confirmed_bank,
2788            &subscriptions,
2789            &mut pending_optimistically_confirmed_banks,
2790            &mut last_notified_confirmed_slot,
2791            &mut highest_confirmed_slot,
2792            &mut highest_root_slot,
2793            &None,
2794            &PrioritizationFeeCache::default(),
2795            &None, // no dependency tracker
2796        );
2797
2798        // Now, notify the frozen bank and ensure its notifications are processed
2799        highest_confirmed_slot = 0;
2800        OptimisticallyConfirmedBankTracker::process_notification(
2801            (
2802                BankNotification::OptimisticallyConfirmed(1),
2803                None, /* no work sequence */
2804            ),
2805            &bank_forks,
2806            &optimistically_confirmed_bank,
2807            &subscriptions,
2808            &mut pending_optimistically_confirmed_banks,
2809            &mut last_notified_confirmed_slot,
2810            &mut highest_confirmed_slot,
2811            &mut highest_root_slot,
2812            &None,
2813            &PrioritizationFeeCache::default(),
2814            &None, // no dependency tracker
2815        );
2816
2817        let response = receiver0.recv();
2818        let expected = json!({
2819           "jsonrpc": "2.0",
2820           "method": "accountNotification",
2821           "params": {
2822               "result": {
2823                   "context": { "slot": 1 },
2824                   "value": {
2825                       "data": "1111111111111111",
2826                       "executable": false,
2827                       "lamports": 1,
2828                       "owner": "Stake11111111111111111111111111111111111111",
2829                       "rentEpoch": u64::MAX,
2830                       "space": 16,
2831                    },
2832               },
2833               "subscription": 0,
2834           }
2835        });
2836        assert_eq!(
2837            expected,
2838            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2839        );
2840        rpc0.account_unsubscribe(sub_id0).unwrap();
2841        rpc0.block_until_processed(&subscriptions);
2842
2843        let sub_id1 = rpc1
2844            .account_subscribe(
2845                alice.pubkey().to_string(),
2846                Some(RpcAccountInfoConfig {
2847                    commitment: Some(CommitmentConfig::confirmed()),
2848                    encoding: None,
2849                    data_slice: None,
2850                    min_context_slot: None,
2851                }),
2852            )
2853            .unwrap();
2854        rpc1.block_until_processed(&subscriptions);
2855
2856        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
2857        bank2.freeze();
2858        highest_confirmed_slot = 0;
2859        OptimisticallyConfirmedBankTracker::process_notification(
2860            (
2861                BankNotification::Frozen(bank2),
2862                None, /* no work sequence */
2863            ),
2864            &bank_forks,
2865            &optimistically_confirmed_bank,
2866            &subscriptions,
2867            &mut pending_optimistically_confirmed_banks,
2868            &mut last_notified_confirmed_slot,
2869            &mut highest_confirmed_slot,
2870            &mut highest_root_slot,
2871            &None,
2872            &PrioritizationFeeCache::default(),
2873            &None, // no dependency tracker
2874        );
2875        let response = receiver1.recv();
2876        let expected = json!({
2877           "jsonrpc": "2.0",
2878           "method": "accountNotification",
2879           "params": {
2880               "result": {
2881                   "context": { "slot": 2 },
2882                   "value": {
2883                       "data": "1111111111111111",
2884                       "executable": false,
2885                       "lamports": 1,
2886                       "owner": "Stake11111111111111111111111111111111111111",
2887                       "rentEpoch": u64::MAX,
2888                       "space": 16,
2889                    },
2890               },
2891               "subscription": 3,
2892           }
2893        });
2894        assert_eq!(
2895            expected,
2896            serde_json::from_str::<serde_json::Value>(&response).unwrap(),
2897        );
2898        rpc1.account_unsubscribe(sub_id1).unwrap();
2899
2900        assert!(!subscriptions.control.account_subscribed(&alice.pubkey()));
2901    }
2902
2903    fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value {
2904        json!({
2905            "jsonrpc": "2.0",
2906            "method": "logsNotification",
2907            "params": {
2908                "result": {
2909                    "context": {
2910                        "slot": 0
2911                    },
2912                    "value": {
2913                        "signature": signature,
2914                        "err": null,
2915                        "logs": [
2916                            "Program 11111111111111111111111111111111 invoke [1]",
2917                            "Program 11111111111111111111111111111111 success"
2918                        ]
2919                    }
2920                },
2921                "subscription": subscription_id
2922            }
2923        })
2924    }
2925
2926    #[test]
2927    #[serial]
2928    fn test_logs_subscribe() {
2929        let GenesisConfigInfo {
2930            genesis_config,
2931            mint_keypair,
2932            ..
2933        } = create_genesis_config(100);
2934        let bank = Bank::new_for_tests(&genesis_config);
2935        let blockhash = bank.last_blockhash();
2936        let bank_forks = BankForks::new_rw_arc(bank);
2937
2938        let alice = Keypair::new();
2939
2940        let exit = Arc::new(AtomicBool::new(false));
2941        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
2942        let optimistically_confirmed_bank =
2943            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
2944        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
2945            exit,
2946            max_complete_transaction_status_slot,
2947            bank_forks.clone(),
2948            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
2949            optimistically_confirmed_bank,
2950        ));
2951
2952        let sub_config = RpcTransactionLogsConfig {
2953            commitment: Some(CommitmentConfig::processed()),
2954        };
2955
2956        let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions);
2957        let sub_id_for_all = rpc_all
2958            .logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone()))
2959            .unwrap();
2960        assert!(subscriptions.control.logs_subscribed(None));
2961
2962        let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions);
2963        let sub_id_for_alice = rpc_alice
2964            .logs_subscribe(
2965                RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]),
2966                Some(sub_config),
2967            )
2968            .unwrap();
2969        assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
2970        rpc_alice.block_until_processed(&subscriptions);
2971
2972        let tx = system_transaction::create_account(
2973            &mint_keypair,
2974            &alice,
2975            blockhash,
2976            1,
2977            0,
2978            &system_program::id(),
2979        );
2980
2981        assert!(bank_forks
2982            .read()
2983            .unwrap()
2984            .get(0)
2985            .unwrap()
2986            .process_transaction_with_metadata(tx.clone())
2987            .is_ok());
2988
2989        subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0));
2990
2991        let expected_response_all =
2992            make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all));
2993        let response_all = receiver_all.recv();
2994        assert_eq!(
2995            expected_response_all,
2996            serde_json::from_str::<serde_json::Value>(&response_all).unwrap(),
2997        );
2998        let expected_response_alice =
2999            make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice));
3000        let response_alice = receiver_alice.recv();
3001        assert_eq!(
3002            expected_response_alice,
3003            serde_json::from_str::<serde_json::Value>(&response_alice).unwrap(),
3004        );
3005
3006        rpc_all.logs_unsubscribe(sub_id_for_all).unwrap();
3007        assert!(!subscriptions.control.logs_subscribed(None));
3008        rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap();
3009        assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
3010    }
3011
3012    #[test]
3013    fn test_total_subscriptions() {
3014        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
3015        let bank = Bank::new_for_tests(&genesis_config);
3016        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
3017        let bank_forks = BankForks::new_rw_arc(bank);
3018        let subscriptions = Arc::new(RpcSubscriptions::default_with_bank_forks(
3019            max_complete_transaction_status_slot,
3020            bank_forks,
3021        ));
3022
3023        let (rpc1, _receiver1) = rpc_pubsub_service::test_connection(&subscriptions);
3024        let sub_id1 = rpc1
3025            .account_subscribe(Pubkey::default().to_string(), None)
3026            .unwrap();
3027
3028        assert_eq!(subscriptions.total(), 1);
3029
3030        let (rpc2, _receiver2) = rpc_pubsub_service::test_connection(&subscriptions);
3031        let sub_id2 = rpc2
3032            .program_subscribe(Pubkey::default().to_string(), None)
3033            .unwrap();
3034
3035        assert_eq!(subscriptions.total(), 2);
3036
3037        let (rpc3, _receiver3) = rpc_pubsub_service::test_connection(&subscriptions);
3038        let sub_id3 = rpc3
3039            .logs_subscribe(RpcTransactionLogsFilter::All, None)
3040            .unwrap();
3041        assert_eq!(subscriptions.total(), 3);
3042
3043        let (rpc4, _receiver4) = rpc_pubsub_service::test_connection(&subscriptions);
3044        let sub_id4 = rpc4
3045            .signature_subscribe(Signature::default().to_string(), None)
3046            .unwrap();
3047
3048        assert_eq!(subscriptions.total(), 4);
3049
3050        let (rpc5, _receiver5) = rpc_pubsub_service::test_connection(&subscriptions);
3051        let sub_id5 = rpc5.slot_subscribe().unwrap();
3052
3053        assert_eq!(subscriptions.total(), 5);
3054
3055        let (rpc6, _receiver6) = rpc_pubsub_service::test_connection(&subscriptions);
3056        let sub_id6 = rpc6.vote_subscribe().unwrap();
3057
3058        assert_eq!(subscriptions.total(), 6);
3059
3060        let (rpc7, _receiver7) = rpc_pubsub_service::test_connection(&subscriptions);
3061        let sub_id7 = rpc7.root_subscribe().unwrap();
3062
3063        assert_eq!(subscriptions.total(), 7);
3064
3065        // Add duplicate account subscription, but it shouldn't increment the count.
3066        let (rpc8, _receiver8) = rpc_pubsub_service::test_connection(&subscriptions);
3067        let sub_id8 = rpc8
3068            .account_subscribe(Pubkey::default().to_string(), None)
3069            .unwrap();
3070        assert_eq!(subscriptions.total(), 7);
3071
3072        rpc1.account_unsubscribe(sub_id1).unwrap();
3073        assert_eq!(subscriptions.total(), 7);
3074
3075        rpc8.account_unsubscribe(sub_id8).unwrap();
3076        assert_eq!(subscriptions.total(), 6);
3077
3078        rpc2.program_unsubscribe(sub_id2).unwrap();
3079        assert_eq!(subscriptions.total(), 5);
3080
3081        rpc3.logs_unsubscribe(sub_id3).unwrap();
3082        assert_eq!(subscriptions.total(), 4);
3083
3084        rpc4.signature_unsubscribe(sub_id4).unwrap();
3085        assert_eq!(subscriptions.total(), 3);
3086
3087        rpc5.slot_unsubscribe(sub_id5).unwrap();
3088        assert_eq!(subscriptions.total(), 2);
3089
3090        rpc6.vote_unsubscribe(sub_id6).unwrap();
3091        assert_eq!(subscriptions.total(), 1);
3092
3093        rpc7.root_unsubscribe(sub_id7).unwrap();
3094        assert_eq!(subscriptions.total(), 0);
3095    }
3096}