solana_rpc/
rpc_subscription_tracker.rs

1use {
2    crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3    dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4    solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
5    solana_clock::Slot,
6    solana_commitment_config::CommitmentConfig,
7    solana_metrics::{CounterToken, TokenCounter},
8    solana_pubkey::Pubkey,
9    solana_rpc_client_api::filter::RpcFilterType,
10    solana_runtime::{
11        bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
12        bank_forks::BankForks,
13    },
14    solana_signature::Signature,
15    solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
16    std::{
17        collections::hash_map::{Entry, HashMap},
18        fmt,
19        sync::{
20            atomic::{AtomicU64, Ordering},
21            Arc, RwLock, Weak,
22        },
23    },
24    thiserror::Error,
25    tokio::sync::broadcast,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct SubscriptionId(u64);
30
31impl From<u64> for SubscriptionId {
32    fn from(value: u64) -> Self {
33        SubscriptionId(value)
34    }
35}
36
37impl From<SubscriptionId> for u64 {
38    fn from(value: SubscriptionId) -> Self {
39        value.0
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum SubscriptionParams {
45    Account(AccountSubscriptionParams),
46    Block(BlockSubscriptionParams),
47    Logs(LogsSubscriptionParams),
48    Program(ProgramSubscriptionParams),
49    Signature(SignatureSubscriptionParams),
50    Slot,
51    SlotsUpdates,
52    Root,
53    Vote,
54}
55
56impl SubscriptionParams {
57    fn method(&self) -> &'static str {
58        match self {
59            SubscriptionParams::Account(_) => "accountNotification",
60            SubscriptionParams::Logs(_) => "logsNotification",
61            SubscriptionParams::Program(_) => "programNotification",
62            SubscriptionParams::Signature(_) => "signatureNotification",
63            SubscriptionParams::Slot => "slotNotification",
64            SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
65            SubscriptionParams::Block(_) => "blockNotification",
66            SubscriptionParams::Root => "rootNotification",
67            SubscriptionParams::Vote => "voteNotification",
68        }
69    }
70
71    fn commitment(&self) -> Option<CommitmentConfig> {
72        match self {
73            SubscriptionParams::Account(params) => Some(params.commitment),
74            SubscriptionParams::Logs(params) => Some(params.commitment),
75            SubscriptionParams::Program(params) => Some(params.commitment),
76            SubscriptionParams::Signature(params) => Some(params.commitment),
77            SubscriptionParams::Block(params) => Some(params.commitment),
78            SubscriptionParams::Slot
79            | SubscriptionParams::SlotsUpdates
80            | SubscriptionParams::Root
81            | SubscriptionParams::Vote => None,
82        }
83    }
84
85    fn is_commitment_watcher(&self) -> bool {
86        let commitment = match self {
87            SubscriptionParams::Account(params) => &params.commitment,
88            SubscriptionParams::Block(params) => &params.commitment,
89            SubscriptionParams::Logs(params) => &params.commitment,
90            SubscriptionParams::Program(params) => &params.commitment,
91            SubscriptionParams::Signature(params) => &params.commitment,
92            SubscriptionParams::Root
93            | SubscriptionParams::Slot
94            | SubscriptionParams::SlotsUpdates
95            | SubscriptionParams::Vote => return false,
96        };
97        !commitment.is_confirmed()
98    }
99
100    fn is_gossip_watcher(&self) -> bool {
101        let commitment = match self {
102            SubscriptionParams::Account(params) => &params.commitment,
103            SubscriptionParams::Block(params) => &params.commitment,
104            SubscriptionParams::Logs(params) => &params.commitment,
105            SubscriptionParams::Program(params) => &params.commitment,
106            SubscriptionParams::Signature(params) => &params.commitment,
107            SubscriptionParams::Root
108            | SubscriptionParams::Slot
109            | SubscriptionParams::SlotsUpdates
110            | SubscriptionParams::Vote => return false,
111        };
112        commitment.is_confirmed()
113    }
114
115    fn is_node_progress_watcher(&self) -> bool {
116        matches!(
117            self,
118            SubscriptionParams::Slot
119                | SubscriptionParams::SlotsUpdates
120                | SubscriptionParams::Root
121                | SubscriptionParams::Vote
122        )
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Hash)]
127pub struct AccountSubscriptionParams {
128    pub pubkey: Pubkey,
129    pub encoding: UiAccountEncoding,
130    pub data_slice: Option<UiDataSliceConfig>,
131    pub commitment: CommitmentConfig,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Hash)]
135pub struct BlockSubscriptionParams {
136    pub commitment: CommitmentConfig,
137    pub encoding: UiTransactionEncoding,
138    pub kind: BlockSubscriptionKind,
139    pub transaction_details: TransactionDetails,
140    pub show_rewards: bool,
141    pub max_supported_transaction_version: Option<u8>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash)]
145pub enum BlockSubscriptionKind {
146    All,
147    MentionsAccountOrProgram(Pubkey),
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Hash)]
151pub struct LogsSubscriptionParams {
152    pub kind: LogsSubscriptionKind,
153    pub commitment: CommitmentConfig,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum LogsSubscriptionKind {
158    All,
159    AllWithVotes,
160    Single(Pubkey),
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164pub struct ProgramSubscriptionParams {
165    pub pubkey: Pubkey,
166    pub filters: Vec<RpcFilterType>,
167    pub encoding: UiAccountEncoding,
168    pub data_slice: Option<UiDataSliceConfig>,
169    pub commitment: CommitmentConfig,
170    pub with_context: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Hash)]
174pub struct SignatureSubscriptionParams {
175    pub signature: Signature,
176    pub commitment: CommitmentConfig,
177    pub enable_received_notification: bool,
178}
179
180#[derive(Clone)]
181pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
182pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
183
184struct SubscriptionControlInner {
185    subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
186    next_id: AtomicU64,
187    max_active_subscriptions: usize,
188    sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
189    broadcast_sender: broadcast::Sender<RpcNotification>,
190    counter: TokenCounter,
191}
192
193impl SubscriptionControl {
194    pub fn new(
195        max_active_subscriptions: usize,
196        sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
197        broadcast_sender: broadcast::Sender<RpcNotification>,
198    ) -> Self {
199        Self(Arc::new(SubscriptionControlInner {
200            subscriptions: DashMap::new(),
201            next_id: AtomicU64::new(0),
202            max_active_subscriptions,
203            sender,
204            broadcast_sender,
205            counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
206        }))
207    }
208
209    pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
210        self.0.broadcast_sender.subscribe()
211    }
212
213    pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
214        debug!(
215            "Total existing subscriptions: {}",
216            self.0.subscriptions.len()
217        );
218        let count = self.0.subscriptions.len();
219        let create_token_and_weak_ref = |id, params| {
220            let token = SubscriptionToken(
221                Arc::new(SubscriptionTokenInner {
222                    control: Arc::clone(&self.0),
223                    params,
224                    id,
225                }),
226                self.0.counter.create_token(),
227            );
228            let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
229            (token, weak_ref)
230        };
231
232        match self.0.subscriptions.entry(params) {
233            DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
234                Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
235                // This means the last Arc for this Weak pointer entered the drop just before us,
236                // but could not remove the entry since we are holding the write lock.
237                // See `Drop` implementation for `SubscriptionTokenInner` for further info.
238                None => {
239                    let (token, weak_ref) =
240                        create_token_and_weak_ref(entry.get().1, entry.key().clone());
241                    entry.insert(weak_ref);
242                    Ok(token)
243                }
244            },
245            DashEntry::Vacant(entry) => {
246                if count >= self.0.max_active_subscriptions {
247                    inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
248                    return Err(Error::TooManySubscriptions);
249                }
250                let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
251                let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
252                let _ = self
253                    .0
254                    .sender
255                    .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
256                entry.insert(weak_ref);
257                datapoint_info!(
258                    "rpc-subscription",
259                    ("total", self.0.subscriptions.len(), i64)
260                );
261                Ok(token)
262            }
263        }
264    }
265
266    pub fn total(&self) -> usize {
267        self.0.subscriptions.len()
268    }
269
270    #[cfg(test)]
271    pub fn assert_subscribed(&self, params: &SubscriptionParams) {
272        assert!(self.0.subscriptions.contains_key(params));
273    }
274
275    #[cfg(test)]
276    pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
277        assert!(!self.0.subscriptions.contains_key(params));
278    }
279
280    #[cfg(test)]
281    pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
282        self.0.subscriptions.iter().any(|item| {
283            if let SubscriptionParams::Account(params) = item.key() {
284                &params.pubkey == pubkey
285            } else {
286                false
287            }
288        })
289    }
290
291    #[cfg(test)]
292    pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
293        self.0.subscriptions.iter().any(|item| {
294            if let SubscriptionParams::Logs(params) = item.key() {
295                let subscribed_pubkey = match &params.kind {
296                    LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
297                    LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
298                };
299                subscribed_pubkey == pubkey
300            } else {
301                false
302            }
303        })
304    }
305
306    #[cfg(test)]
307    pub fn signature_subscribed(&self, signature: &Signature) -> bool {
308        self.0.subscriptions.iter().any(|item| {
309            if let SubscriptionParams::Signature(params) = item.key() {
310                &params.signature == signature
311            } else {
312                false
313            }
314        })
315    }
316}
317
318#[derive(Debug)]
319pub struct SubscriptionInfo {
320    id: SubscriptionId,
321    params: SubscriptionParams,
322    method: &'static str,
323    pub last_notified_slot: RwLock<Slot>,
324    commitment: Option<CommitmentConfig>,
325}
326
327impl SubscriptionInfo {
328    pub fn id(&self) -> SubscriptionId {
329        self.id
330    }
331
332    pub fn method(&self) -> &'static str {
333        self.method
334    }
335
336    pub fn params(&self) -> &SubscriptionParams {
337        &self.params
338    }
339
340    pub fn commitment(&self) -> Option<CommitmentConfig> {
341        self.commitment
342    }
343}
344
345#[derive(Debug, Error)]
346pub enum Error {
347    #[error("node subscription limit reached")]
348    TooManySubscriptions,
349}
350
351struct LogsSubscriptionsIndex {
352    all_count: usize,
353    all_with_votes_count: usize,
354    single_count: HashMap<Pubkey, usize>,
355
356    bank_forks: Arc<RwLock<BankForks>>,
357}
358
359impl LogsSubscriptionsIndex {
360    fn add(&mut self, params: &LogsSubscriptionParams) {
361        match params.kind {
362            LogsSubscriptionKind::All => self.all_count += 1,
363            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
364            LogsSubscriptionKind::Single(key) => {
365                *self.single_count.entry(key).or_default() += 1;
366            }
367        }
368        self.update_config();
369    }
370
371    fn remove(&mut self, params: &LogsSubscriptionParams) {
372        match params.kind {
373            LogsSubscriptionKind::All => self.all_count -= 1,
374            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
375            LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
376                Entry::Occupied(mut entry) => {
377                    *entry.get_mut() -= 1;
378                    if *entry.get() == 0 {
379                        entry.remove();
380                    }
381                }
382                Entry::Vacant(_) => error!("missing entry in single_count"),
383            },
384        }
385        self.update_config();
386    }
387
388    fn update_config(&self) {
389        let mentioned_addresses = self.single_count.keys().copied().collect();
390        let config = if self.all_with_votes_count > 0 {
391            TransactionLogCollectorConfig {
392                filter: TransactionLogCollectorFilter::AllWithVotes,
393                mentioned_addresses,
394            }
395        } else if self.all_count > 0 {
396            TransactionLogCollectorConfig {
397                filter: TransactionLogCollectorFilter::All,
398                mentioned_addresses,
399            }
400        } else {
401            TransactionLogCollectorConfig {
402                filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
403                mentioned_addresses,
404            }
405        };
406
407        *self
408            .bank_forks
409            .read()
410            .unwrap()
411            .root_bank()
412            .transaction_log_collector_config
413            .write()
414            .unwrap() = config;
415    }
416}
417
418pub struct SubscriptionsTracker {
419    logs_subscriptions_index: LogsSubscriptionsIndex,
420    by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
421    // Accounts, logs, programs, signatures (not gossip)
422    commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
423    // Accounts, logs, programs, signatures (gossip)
424    gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
425    // Slots, slots updates, roots, votes.
426    node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
427}
428
429impl SubscriptionsTracker {
430    pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
431        SubscriptionsTracker {
432            logs_subscriptions_index: LogsSubscriptionsIndex {
433                all_count: 0,
434                all_with_votes_count: 0,
435                single_count: HashMap::new(),
436                bank_forks,
437            },
438            by_signature: HashMap::new(),
439            commitment_watchers: HashMap::new(),
440            gossip_watchers: HashMap::new(),
441            node_progress_watchers: HashMap::new(),
442        }
443    }
444
445    pub fn subscribe(
446        &mut self,
447        params: SubscriptionParams,
448        id: SubscriptionId,
449        last_notified_slot: impl FnOnce() -> Slot,
450    ) {
451        let info = Arc::new(SubscriptionInfo {
452            last_notified_slot: RwLock::new(last_notified_slot()),
453            id,
454            commitment: params.commitment(),
455            method: params.method(),
456            params: params.clone(),
457        });
458        match &params {
459            SubscriptionParams::Logs(params) => {
460                self.logs_subscriptions_index.add(params);
461            }
462            SubscriptionParams::Signature(params) => {
463                self.by_signature
464                    .entry(params.signature)
465                    .or_default()
466                    .insert(id, Arc::clone(&info));
467            }
468            _ => {}
469        }
470        if info.params.is_commitment_watcher() {
471            self.commitment_watchers.insert(id, Arc::clone(&info));
472        }
473        if info.params.is_gossip_watcher() {
474            self.gossip_watchers.insert(id, Arc::clone(&info));
475        }
476        if info.params.is_node_progress_watcher() {
477            self.node_progress_watchers
478                .insert(info.params.clone(), Arc::clone(&info));
479        }
480    }
481
482    #[allow(clippy::collapsible_if)]
483    pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
484        match &params {
485            SubscriptionParams::Logs(params) => {
486                self.logs_subscriptions_index.remove(params);
487            }
488            SubscriptionParams::Signature(params) => {
489                if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
490                    if entry.get_mut().remove(&id).is_none() {
491                        warn!("Subscriptions inconsistency (missing entry in by_signature)");
492                    }
493                    if entry.get_mut().is_empty() {
494                        entry.remove();
495                    }
496                } else {
497                    warn!("Subscriptions inconsistency (missing entry in by_signature)");
498                }
499            }
500            _ => {}
501        }
502        if params.is_commitment_watcher() {
503            if self.commitment_watchers.remove(&id).is_none() {
504                warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
505            }
506        }
507        if params.is_gossip_watcher() {
508            if self.gossip_watchers.remove(&id).is_none() {
509                warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
510            }
511        }
512        if params.is_node_progress_watcher() {
513            if self.node_progress_watchers.remove(&params).is_none() {
514                warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
515            }
516        }
517    }
518
519    pub fn by_signature(
520        &self,
521    ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
522        &self.by_signature
523    }
524
525    pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
526        &self.commitment_watchers
527    }
528
529    pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
530        &self.gossip_watchers
531    }
532
533    pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
534        &self.node_progress_watchers
535    }
536}
537
538struct SubscriptionTokenInner {
539    control: Arc<SubscriptionControlInner>,
540    params: SubscriptionParams,
541    id: SubscriptionId,
542}
543
544impl fmt::Debug for SubscriptionTokenInner {
545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546        f.debug_struct("SubscriptionTokenInner")
547            .field("id", &self.id)
548            .finish()
549    }
550}
551
552impl Drop for SubscriptionTokenInner {
553    #[allow(clippy::collapsible_if)]
554    fn drop(&mut self) {
555        match self.control.subscriptions.entry(self.params.clone()) {
556            DashEntry::Vacant(_) => {
557                warn!("Subscriptions inconsistency (missing entry in by_params)");
558            }
559            // Check the strong refs count to ensure no other thread recreated this subscription (not token)
560            // while we were acquiring the lock.
561            DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
562                let _ = self
563                    .control
564                    .sender
565                    .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
566                entry.remove();
567                datapoint_info!(
568                    "rpc-subscription",
569                    ("total", self.control.subscriptions.len(), i64)
570                );
571            }
572            // This branch handles the case in which this entry got recreated
573            // while we were waiting for the lock (inside the `DashMap::entry` method).
574            DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
575        }
576    }
577}
578
579// allowing dead code here to appease clippy, but unsure if/how the CounterToken is actually used.
580// further investigation would be necessary before removing
581#[allow(dead_code)]
582#[derive(Clone)]
583pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
584
585impl SubscriptionToken {
586    pub fn id(&self) -> SubscriptionId {
587        self.0.id
588    }
589
590    pub fn params(&self) -> &SubscriptionParams {
591        &self.0.params
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use {
598        super::*,
599        crate::rpc_pubsub_service::PubSubConfig,
600        solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
601        solana_runtime::bank::Bank,
602    };
603
604    struct ControlWrapper {
605        control: SubscriptionControl,
606        receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
607    }
608
609    impl ControlWrapper {
610        fn new() -> Self {
611            let (sender, receiver) = crossbeam_channel::unbounded();
612            let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
613
614            let control = SubscriptionControl::new(
615                PubSubConfig::default().max_active_subscriptions,
616                sender,
617                broadcast_sender,
618            );
619            Self { control, receiver }
620        }
621
622        fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
623            if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
624                assert_eq!(&params, expected_params);
625                assert_eq!(id, SubscriptionId::from(expected_id));
626            } else {
627                panic!("unexpected notification");
628            }
629            self.assert_silence();
630        }
631
632        fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
633            if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
634            {
635                assert_eq!(&params, expected_params);
636                assert_eq!(id, SubscriptionId::from(expected_id));
637            } else {
638                panic!("unexpected notification");
639            }
640            self.assert_silence();
641        }
642
643        fn assert_silence(&self) {
644            assert!(self.receiver.try_recv().is_err());
645        }
646    }
647
648    #[test]
649    fn notify_subscribe() {
650        let control = ControlWrapper::new();
651        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
652        control.assert_subscribed(&SubscriptionParams::Slot, 0);
653        drop(token1);
654        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
655    }
656
657    #[test]
658    fn notify_subscribe_multiple() {
659        let control = ControlWrapper::new();
660        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
661        control.assert_subscribed(&SubscriptionParams::Slot, 0);
662        let token2 = token1.clone();
663        drop(token1);
664        let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
665        drop(token3);
666        control.assert_silence();
667        drop(token2);
668        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
669    }
670
671    #[test]
672    fn notify_subscribe_two_subscriptions() {
673        let control = ControlWrapper::new();
674        let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
675        control.assert_subscribed(&SubscriptionParams::Slot, 0);
676
677        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
678            signature: Signature::default(),
679            commitment: CommitmentConfig::processed(),
680            enable_received_notification: false,
681        });
682        let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
683        control.assert_subscribed(&signature_params, 1);
684
685        let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
686        let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
687        drop(token_slot1);
688        control.assert_silence();
689        drop(token_slot2);
690        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
691        drop(token_signature2);
692        control.assert_silence();
693        drop(token_signature1);
694        control.assert_unsubscribed(&signature_params, 1);
695
696        let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
697        control.assert_subscribed(&SubscriptionParams::Slot, 2);
698        drop(token_slot3);
699        control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
700    }
701
702    #[test]
703    fn subscription_info() {
704        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
705        let bank = Bank::new_for_tests(&genesis_config);
706        let bank_forks = BankForks::new_rw_arc(bank);
707        let mut tracker = SubscriptionsTracker::new(bank_forks);
708
709        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
710        let info = tracker
711            .node_progress_watchers
712            .get(&SubscriptionParams::Slot)
713            .unwrap();
714        assert_eq!(info.commitment, None);
715        assert_eq!(info.params, SubscriptionParams::Slot);
716        assert_eq!(info.method, SubscriptionParams::Slot.method());
717        assert_eq!(info.id, SubscriptionId::from(0));
718        assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
719
720        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
721            pubkey: spl_generic_token::token::id(),
722            commitment: CommitmentConfig::finalized(),
723            encoding: UiAccountEncoding::Base64Zstd,
724            data_slice: None,
725        });
726        tracker.subscribe(account_params.clone(), 1.into(), || 42);
727
728        let info = tracker
729            .commitment_watchers
730            .get(&SubscriptionId::from(1))
731            .unwrap();
732        assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
733        assert_eq!(info.params, account_params);
734        assert_eq!(info.method, account_params.method());
735        assert_eq!(info.id, SubscriptionId::from(1));
736        assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
737    }
738
739    #[test]
740    fn subscription_indexes() {
741        fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
742            (
743                tracker.by_signature.len(),
744                tracker.commitment_watchers.len(),
745                tracker.gossip_watchers.len(),
746                tracker.node_progress_watchers.len(),
747            )
748        }
749
750        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
751        let bank = Bank::new_for_tests(&genesis_config);
752        let bank_forks = BankForks::new_rw_arc(bank);
753        let mut tracker = SubscriptionsTracker::new(bank_forks);
754
755        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
756        assert_eq!(counts(&tracker), (0, 0, 0, 1));
757        tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
758        assert_eq!(counts(&tracker), (0, 0, 0, 0));
759
760        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
761            pubkey: spl_generic_token::token::id(),
762            commitment: CommitmentConfig::finalized(),
763            encoding: UiAccountEncoding::Base64Zstd,
764            data_slice: None,
765        });
766        tracker.subscribe(account_params.clone(), 1.into(), || 0);
767        assert_eq!(counts(&tracker), (0, 1, 0, 0));
768        tracker.unsubscribe(account_params, 1.into());
769        assert_eq!(counts(&tracker), (0, 0, 0, 0));
770
771        let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
772            pubkey: spl_generic_token::token::id(),
773            commitment: CommitmentConfig::confirmed(),
774            encoding: UiAccountEncoding::Base64Zstd,
775            data_slice: None,
776        });
777        tracker.subscribe(account_params2.clone(), 2.into(), || 0);
778        assert_eq!(counts(&tracker), (0, 0, 1, 0));
779        tracker.unsubscribe(account_params2, 2.into());
780        assert_eq!(counts(&tracker), (0, 0, 0, 0));
781
782        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
783            signature: Signature::default(),
784            commitment: CommitmentConfig::processed(),
785            enable_received_notification: false,
786        });
787        tracker.subscribe(signature_params.clone(), 3.into(), || 0);
788        assert_eq!(counts(&tracker), (1, 1, 0, 0));
789        tracker.unsubscribe(signature_params, 3.into());
790        assert_eq!(counts(&tracker), (0, 0, 0, 0));
791    }
792}