solana_rpc/
rpc_subscription_tracker.rs

1use {
2    crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3    dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4    solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
5    solana_metrics::{CounterToken, TokenCounter},
6    solana_rpc_client_api::filter::RpcFilterType,
7    solana_runtime::{
8        bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
9        bank_forks::BankForks,
10    },
11    solana_sdk::{
12        clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature,
13    },
14    solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
15    std::{
16        collections::hash_map::{Entry, HashMap},
17        fmt,
18        sync::{
19            atomic::{AtomicU64, Ordering},
20            Arc, RwLock, Weak,
21        },
22    },
23    thiserror::Error,
24    tokio::sync::broadcast,
25};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
28pub struct SubscriptionId(u64);
29
30impl From<u64> for SubscriptionId {
31    fn from(value: u64) -> Self {
32        SubscriptionId(value)
33    }
34}
35
36impl From<SubscriptionId> for u64 {
37    fn from(value: SubscriptionId) -> Self {
38        value.0
39    }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum SubscriptionParams {
44    Account(AccountSubscriptionParams),
45    Block(BlockSubscriptionParams),
46    Logs(LogsSubscriptionParams),
47    Program(ProgramSubscriptionParams),
48    Signature(SignatureSubscriptionParams),
49    Slot,
50    SlotsUpdates,
51    Root,
52    Vote,
53}
54
55impl SubscriptionParams {
56    fn method(&self) -> &'static str {
57        match self {
58            SubscriptionParams::Account(_) => "accountNotification",
59            SubscriptionParams::Logs(_) => "logsNotification",
60            SubscriptionParams::Program(_) => "programNotification",
61            SubscriptionParams::Signature(_) => "signatureNotification",
62            SubscriptionParams::Slot => "slotNotification",
63            SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
64            SubscriptionParams::Block(_) => "blockNotification",
65            SubscriptionParams::Root => "rootNotification",
66            SubscriptionParams::Vote => "voteNotification",
67        }
68    }
69
70    fn commitment(&self) -> Option<CommitmentConfig> {
71        match self {
72            SubscriptionParams::Account(params) => Some(params.commitment),
73            SubscriptionParams::Logs(params) => Some(params.commitment),
74            SubscriptionParams::Program(params) => Some(params.commitment),
75            SubscriptionParams::Signature(params) => Some(params.commitment),
76            SubscriptionParams::Block(params) => Some(params.commitment),
77            SubscriptionParams::Slot
78            | SubscriptionParams::SlotsUpdates
79            | SubscriptionParams::Root
80            | SubscriptionParams::Vote => None,
81        }
82    }
83
84    fn is_commitment_watcher(&self) -> bool {
85        let commitment = match self {
86            SubscriptionParams::Account(params) => &params.commitment,
87            SubscriptionParams::Block(params) => &params.commitment,
88            SubscriptionParams::Logs(params) => &params.commitment,
89            SubscriptionParams::Program(params) => &params.commitment,
90            SubscriptionParams::Signature(params) => &params.commitment,
91            SubscriptionParams::Root
92            | SubscriptionParams::Slot
93            | SubscriptionParams::SlotsUpdates
94            | SubscriptionParams::Vote => return false,
95        };
96        !commitment.is_confirmed()
97    }
98
99    fn is_gossip_watcher(&self) -> bool {
100        let commitment = match self {
101            SubscriptionParams::Account(params) => &params.commitment,
102            SubscriptionParams::Block(params) => &params.commitment,
103            SubscriptionParams::Logs(params) => &params.commitment,
104            SubscriptionParams::Program(params) => &params.commitment,
105            SubscriptionParams::Signature(params) => &params.commitment,
106            SubscriptionParams::Root
107            | SubscriptionParams::Slot
108            | SubscriptionParams::SlotsUpdates
109            | SubscriptionParams::Vote => return false,
110        };
111        commitment.is_confirmed()
112    }
113
114    fn is_node_progress_watcher(&self) -> bool {
115        matches!(
116            self,
117            SubscriptionParams::Slot
118                | SubscriptionParams::SlotsUpdates
119                | SubscriptionParams::Root
120                | SubscriptionParams::Vote
121        )
122    }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq, Hash)]
126pub struct AccountSubscriptionParams {
127    pub pubkey: Pubkey,
128    pub encoding: UiAccountEncoding,
129    pub data_slice: Option<UiDataSliceConfig>,
130    pub commitment: CommitmentConfig,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct BlockSubscriptionParams {
135    pub commitment: CommitmentConfig,
136    pub encoding: UiTransactionEncoding,
137    pub kind: BlockSubscriptionKind,
138    pub transaction_details: TransactionDetails,
139    pub show_rewards: bool,
140    pub max_supported_transaction_version: Option<u8>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Hash)]
144pub enum BlockSubscriptionKind {
145    All,
146    MentionsAccountOrProgram(Pubkey),
147}
148
149#[derive(Debug, Clone, PartialEq, Eq, Hash)]
150pub struct LogsSubscriptionParams {
151    pub kind: LogsSubscriptionKind,
152    pub commitment: CommitmentConfig,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156pub enum LogsSubscriptionKind {
157    All,
158    AllWithVotes,
159    Single(Pubkey),
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163pub struct ProgramSubscriptionParams {
164    pub pubkey: Pubkey,
165    pub filters: Vec<RpcFilterType>,
166    pub encoding: UiAccountEncoding,
167    pub data_slice: Option<UiDataSliceConfig>,
168    pub commitment: CommitmentConfig,
169    pub with_context: bool,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct SignatureSubscriptionParams {
174    pub signature: Signature,
175    pub commitment: CommitmentConfig,
176    pub enable_received_notification: bool,
177}
178
179#[derive(Clone)]
180pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
181pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
182
183struct SubscriptionControlInner {
184    subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
185    next_id: AtomicU64,
186    max_active_subscriptions: usize,
187    sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
188    broadcast_sender: broadcast::Sender<RpcNotification>,
189    counter: TokenCounter,
190}
191
192impl SubscriptionControl {
193    pub fn new(
194        max_active_subscriptions: usize,
195        sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
196        broadcast_sender: broadcast::Sender<RpcNotification>,
197    ) -> Self {
198        Self(Arc::new(SubscriptionControlInner {
199            subscriptions: DashMap::new(),
200            next_id: AtomicU64::new(0),
201            max_active_subscriptions,
202            sender,
203            broadcast_sender,
204            counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
205        }))
206    }
207
208    pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
209        self.0.broadcast_sender.subscribe()
210    }
211
212    pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
213        debug!(
214            "Total existing subscriptions: {}",
215            self.0.subscriptions.len()
216        );
217        let count = self.0.subscriptions.len();
218        let create_token_and_weak_ref = |id, params| {
219            let token = SubscriptionToken(
220                Arc::new(SubscriptionTokenInner {
221                    control: Arc::clone(&self.0),
222                    params,
223                    id,
224                }),
225                self.0.counter.create_token(),
226            );
227            let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
228            (token, weak_ref)
229        };
230
231        match self.0.subscriptions.entry(params) {
232            DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
233                Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
234                // This means the last Arc for this Weak pointer entered the drop just before us,
235                // but could not remove the entry since we are holding the write lock.
236                // See `Drop` implementation for `SubscriptionTokenInner` for further info.
237                None => {
238                    let (token, weak_ref) =
239                        create_token_and_weak_ref(entry.get().1, entry.key().clone());
240                    entry.insert(weak_ref);
241                    Ok(token)
242                }
243            },
244            DashEntry::Vacant(entry) => {
245                if count >= self.0.max_active_subscriptions {
246                    inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
247                    return Err(Error::TooManySubscriptions);
248                }
249                let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
250                let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
251                let _ = self
252                    .0
253                    .sender
254                    .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
255                entry.insert(weak_ref);
256                datapoint_info!(
257                    "rpc-subscription",
258                    ("total", self.0.subscriptions.len(), i64)
259                );
260                Ok(token)
261            }
262        }
263    }
264
265    pub fn total(&self) -> usize {
266        self.0.subscriptions.len()
267    }
268
269    #[cfg(test)]
270    pub fn assert_subscribed(&self, params: &SubscriptionParams) {
271        assert!(self.0.subscriptions.contains_key(params));
272    }
273
274    #[cfg(test)]
275    pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
276        assert!(!self.0.subscriptions.contains_key(params));
277    }
278
279    #[cfg(test)]
280    pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
281        self.0.subscriptions.iter().any(|item| {
282            if let SubscriptionParams::Account(params) = item.key() {
283                &params.pubkey == pubkey
284            } else {
285                false
286            }
287        })
288    }
289
290    #[cfg(test)]
291    pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
292        self.0.subscriptions.iter().any(|item| {
293            if let SubscriptionParams::Logs(params) = item.key() {
294                let subscribed_pubkey = match &params.kind {
295                    LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
296                    LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
297                };
298                subscribed_pubkey == pubkey
299            } else {
300                false
301            }
302        })
303    }
304
305    #[cfg(test)]
306    pub fn signature_subscribed(&self, signature: &Signature) -> bool {
307        self.0.subscriptions.iter().any(|item| {
308            if let SubscriptionParams::Signature(params) = item.key() {
309                &params.signature == signature
310            } else {
311                false
312            }
313        })
314    }
315}
316
317#[derive(Debug)]
318pub struct SubscriptionInfo {
319    id: SubscriptionId,
320    params: SubscriptionParams,
321    method: &'static str,
322    pub last_notified_slot: RwLock<Slot>,
323    commitment: Option<CommitmentConfig>,
324}
325
326impl SubscriptionInfo {
327    pub fn id(&self) -> SubscriptionId {
328        self.id
329    }
330
331    pub fn method(&self) -> &'static str {
332        self.method
333    }
334
335    pub fn params(&self) -> &SubscriptionParams {
336        &self.params
337    }
338
339    pub fn commitment(&self) -> Option<CommitmentConfig> {
340        self.commitment
341    }
342}
343
344#[derive(Debug, Error)]
345pub enum Error {
346    #[error("node subscription limit reached")]
347    TooManySubscriptions,
348}
349
350struct LogsSubscriptionsIndex {
351    all_count: usize,
352    all_with_votes_count: usize,
353    single_count: HashMap<Pubkey, usize>,
354
355    bank_forks: Arc<RwLock<BankForks>>,
356}
357
358impl LogsSubscriptionsIndex {
359    fn add(&mut self, params: &LogsSubscriptionParams) {
360        match params.kind {
361            LogsSubscriptionKind::All => self.all_count += 1,
362            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
363            LogsSubscriptionKind::Single(key) => {
364                *self.single_count.entry(key).or_default() += 1;
365            }
366        }
367        self.update_config();
368    }
369
370    fn remove(&mut self, params: &LogsSubscriptionParams) {
371        match params.kind {
372            LogsSubscriptionKind::All => self.all_count -= 1,
373            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
374            LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
375                Entry::Occupied(mut entry) => {
376                    *entry.get_mut() -= 1;
377                    if *entry.get() == 0 {
378                        entry.remove();
379                    }
380                }
381                Entry::Vacant(_) => error!("missing entry in single_count"),
382            },
383        }
384        self.update_config();
385    }
386
387    fn update_config(&self) {
388        let mentioned_addresses = self.single_count.keys().copied().collect();
389        let config = if self.all_with_votes_count > 0 {
390            TransactionLogCollectorConfig {
391                filter: TransactionLogCollectorFilter::AllWithVotes,
392                mentioned_addresses,
393            }
394        } else if self.all_count > 0 {
395            TransactionLogCollectorConfig {
396                filter: TransactionLogCollectorFilter::All,
397                mentioned_addresses,
398            }
399        } else {
400            TransactionLogCollectorConfig {
401                filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
402                mentioned_addresses,
403            }
404        };
405
406        *self
407            .bank_forks
408            .read()
409            .unwrap()
410            .root_bank()
411            .transaction_log_collector_config
412            .write()
413            .unwrap() = config;
414    }
415}
416
417pub struct SubscriptionsTracker {
418    logs_subscriptions_index: LogsSubscriptionsIndex,
419    by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
420    // Accounts, logs, programs, signatures (not gossip)
421    commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
422    // Accounts, logs, programs, signatures (gossip)
423    gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
424    // Slots, slots updates, roots, votes.
425    node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
426}
427
428impl SubscriptionsTracker {
429    pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
430        SubscriptionsTracker {
431            logs_subscriptions_index: LogsSubscriptionsIndex {
432                all_count: 0,
433                all_with_votes_count: 0,
434                single_count: HashMap::new(),
435                bank_forks,
436            },
437            by_signature: HashMap::new(),
438            commitment_watchers: HashMap::new(),
439            gossip_watchers: HashMap::new(),
440            node_progress_watchers: HashMap::new(),
441        }
442    }
443
444    pub fn subscribe(
445        &mut self,
446        params: SubscriptionParams,
447        id: SubscriptionId,
448        last_notified_slot: impl FnOnce() -> Slot,
449    ) {
450        let info = Arc::new(SubscriptionInfo {
451            last_notified_slot: RwLock::new(last_notified_slot()),
452            id,
453            commitment: params.commitment(),
454            method: params.method(),
455            params: params.clone(),
456        });
457        match &params {
458            SubscriptionParams::Logs(params) => {
459                self.logs_subscriptions_index.add(params);
460            }
461            SubscriptionParams::Signature(params) => {
462                self.by_signature
463                    .entry(params.signature)
464                    .or_default()
465                    .insert(id, Arc::clone(&info));
466            }
467            _ => {}
468        }
469        if info.params.is_commitment_watcher() {
470            self.commitment_watchers.insert(id, Arc::clone(&info));
471        }
472        if info.params.is_gossip_watcher() {
473            self.gossip_watchers.insert(id, Arc::clone(&info));
474        }
475        if info.params.is_node_progress_watcher() {
476            self.node_progress_watchers
477                .insert(info.params.clone(), Arc::clone(&info));
478        }
479    }
480
481    #[allow(clippy::collapsible_if)]
482    pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
483        match &params {
484            SubscriptionParams::Logs(params) => {
485                self.logs_subscriptions_index.remove(params);
486            }
487            SubscriptionParams::Signature(params) => {
488                if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
489                    if entry.get_mut().remove(&id).is_none() {
490                        warn!("Subscriptions inconsistency (missing entry in by_signature)");
491                    }
492                    if entry.get_mut().is_empty() {
493                        entry.remove();
494                    }
495                } else {
496                    warn!("Subscriptions inconsistency (missing entry in by_signature)");
497                }
498            }
499            _ => {}
500        }
501        if params.is_commitment_watcher() {
502            if self.commitment_watchers.remove(&id).is_none() {
503                warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
504            }
505        }
506        if params.is_gossip_watcher() {
507            if self.gossip_watchers.remove(&id).is_none() {
508                warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
509            }
510        }
511        if params.is_node_progress_watcher() {
512            if self.node_progress_watchers.remove(&params).is_none() {
513                warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
514            }
515        }
516    }
517
518    pub fn by_signature(
519        &self,
520    ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
521        &self.by_signature
522    }
523
524    pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
525        &self.commitment_watchers
526    }
527
528    pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
529        &self.gossip_watchers
530    }
531
532    pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
533        &self.node_progress_watchers
534    }
535}
536
537struct SubscriptionTokenInner {
538    control: Arc<SubscriptionControlInner>,
539    params: SubscriptionParams,
540    id: SubscriptionId,
541}
542
543impl fmt::Debug for SubscriptionTokenInner {
544    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545        f.debug_struct("SubscriptionTokenInner")
546            .field("id", &self.id)
547            .finish()
548    }
549}
550
551impl Drop for SubscriptionTokenInner {
552    #[allow(clippy::collapsible_if)]
553    fn drop(&mut self) {
554        match self.control.subscriptions.entry(self.params.clone()) {
555            DashEntry::Vacant(_) => {
556                warn!("Subscriptions inconsistency (missing entry in by_params)");
557            }
558            // Check the strong refs count to ensure no other thread recreated this subscription (not token)
559            // while we were acquiring the lock.
560            DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
561                let _ = self
562                    .control
563                    .sender
564                    .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
565                entry.remove();
566                datapoint_info!(
567                    "rpc-subscription",
568                    ("total", self.control.subscriptions.len(), i64)
569                );
570            }
571            // This branch handles the case in which this entry got recreated
572            // while we were waiting for the lock (inside the `DashMap::entry` method).
573            DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
574        }
575    }
576}
577
578// allowing dead code here to appease clippy, but unsure if/how the CounterToken is actually used.
579// further investigation would be necessary before removing
580#[allow(dead_code)]
581#[derive(Clone)]
582pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
583
584impl SubscriptionToken {
585    pub fn id(&self) -> SubscriptionId {
586        self.0.id
587    }
588
589    pub fn params(&self) -> &SubscriptionParams {
590        &self.0.params
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use {
597        super::*,
598        crate::rpc_pubsub_service::PubSubConfig,
599        solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
600        solana_runtime::bank::Bank,
601    };
602
603    struct ControlWrapper {
604        control: SubscriptionControl,
605        receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
606    }
607
608    impl ControlWrapper {
609        fn new() -> Self {
610            let (sender, receiver) = crossbeam_channel::unbounded();
611            let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
612
613            let control = SubscriptionControl::new(
614                PubSubConfig::default().max_active_subscriptions,
615                sender,
616                broadcast_sender,
617            );
618            Self { control, receiver }
619        }
620
621        fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
622            if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
623                assert_eq!(&params, expected_params);
624                assert_eq!(id, SubscriptionId::from(expected_id));
625            } else {
626                panic!("unexpected notification");
627            }
628            self.assert_silence();
629        }
630
631        fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
632            if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
633            {
634                assert_eq!(&params, expected_params);
635                assert_eq!(id, SubscriptionId::from(expected_id));
636            } else {
637                panic!("unexpected notification");
638            }
639            self.assert_silence();
640        }
641
642        fn assert_silence(&self) {
643            assert!(self.receiver.try_recv().is_err());
644        }
645    }
646
647    #[test]
648    fn notify_subscribe() {
649        let control = ControlWrapper::new();
650        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
651        control.assert_subscribed(&SubscriptionParams::Slot, 0);
652        drop(token1);
653        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
654    }
655
656    #[test]
657    fn notify_subscribe_multiple() {
658        let control = ControlWrapper::new();
659        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
660        control.assert_subscribed(&SubscriptionParams::Slot, 0);
661        let token2 = token1.clone();
662        drop(token1);
663        let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
664        drop(token3);
665        control.assert_silence();
666        drop(token2);
667        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
668    }
669
670    #[test]
671    fn notify_subscribe_two_subscriptions() {
672        let control = ControlWrapper::new();
673        let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
674        control.assert_subscribed(&SubscriptionParams::Slot, 0);
675
676        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
677            signature: Signature::default(),
678            commitment: CommitmentConfig::processed(),
679            enable_received_notification: false,
680        });
681        let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
682        control.assert_subscribed(&signature_params, 1);
683
684        let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
685        let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
686        drop(token_slot1);
687        control.assert_silence();
688        drop(token_slot2);
689        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
690        drop(token_signature2);
691        control.assert_silence();
692        drop(token_signature1);
693        control.assert_unsubscribed(&signature_params, 1);
694
695        let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
696        control.assert_subscribed(&SubscriptionParams::Slot, 2);
697        drop(token_slot3);
698        control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
699    }
700
701    #[test]
702    fn subscription_info() {
703        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
704        let bank = Bank::new_for_tests(&genesis_config);
705        let bank_forks = BankForks::new_rw_arc(bank);
706        let mut tracker = SubscriptionsTracker::new(bank_forks);
707
708        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
709        let info = tracker
710            .node_progress_watchers
711            .get(&SubscriptionParams::Slot)
712            .unwrap();
713        assert_eq!(info.commitment, None);
714        assert_eq!(info.params, SubscriptionParams::Slot);
715        assert_eq!(info.method, SubscriptionParams::Slot.method());
716        assert_eq!(info.id, SubscriptionId::from(0));
717        assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
718
719        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
720            pubkey: solana_inline_spl::token::id(),
721            commitment: CommitmentConfig::finalized(),
722            encoding: UiAccountEncoding::Base64Zstd,
723            data_slice: None,
724        });
725        tracker.subscribe(account_params.clone(), 1.into(), || 42);
726
727        let info = tracker
728            .commitment_watchers
729            .get(&SubscriptionId::from(1))
730            .unwrap();
731        assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
732        assert_eq!(info.params, account_params);
733        assert_eq!(info.method, account_params.method());
734        assert_eq!(info.id, SubscriptionId::from(1));
735        assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
736    }
737
738    #[test]
739    fn subscription_indexes() {
740        fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
741            (
742                tracker.by_signature.len(),
743                tracker.commitment_watchers.len(),
744                tracker.gossip_watchers.len(),
745                tracker.node_progress_watchers.len(),
746            )
747        }
748
749        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
750        let bank = Bank::new_for_tests(&genesis_config);
751        let bank_forks = BankForks::new_rw_arc(bank);
752        let mut tracker = SubscriptionsTracker::new(bank_forks);
753
754        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
755        assert_eq!(counts(&tracker), (0, 0, 0, 1));
756        tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
757        assert_eq!(counts(&tracker), (0, 0, 0, 0));
758
759        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
760            pubkey: solana_inline_spl::token::id(),
761            commitment: CommitmentConfig::finalized(),
762            encoding: UiAccountEncoding::Base64Zstd,
763            data_slice: None,
764        });
765        tracker.subscribe(account_params.clone(), 1.into(), || 0);
766        assert_eq!(counts(&tracker), (0, 1, 0, 0));
767        tracker.unsubscribe(account_params, 1.into());
768        assert_eq!(counts(&tracker), (0, 0, 0, 0));
769
770        let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
771            pubkey: solana_inline_spl::token::id(),
772            commitment: CommitmentConfig::confirmed(),
773            encoding: UiAccountEncoding::Base64Zstd,
774            data_slice: None,
775        });
776        tracker.subscribe(account_params2.clone(), 2.into(), || 0);
777        assert_eq!(counts(&tracker), (0, 0, 1, 0));
778        tracker.unsubscribe(account_params2, 2.into());
779        assert_eq!(counts(&tracker), (0, 0, 0, 0));
780
781        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
782            signature: Signature::default(),
783            commitment: CommitmentConfig::processed(),
784            enable_received_notification: false,
785        });
786        tracker.subscribe(signature_params.clone(), 3.into(), || 0);
787        assert_eq!(counts(&tracker), (1, 1, 0, 0));
788        tracker.unsubscribe(signature_params, 3.into());
789        assert_eq!(counts(&tracker), (0, 0, 0, 0));
790    }
791}