solana_rpc/
rpc_subscription_tracker.rs

1use {
2    crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3    dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4    serde::{Deserialize, Serialize},
5    solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
6    solana_clock::Slot,
7    solana_commitment_config::CommitmentConfig,
8    solana_metrics::{CounterToken, TokenCounter},
9    solana_pubkey::Pubkey,
10    solana_rpc_client_api::filter::RpcFilterType,
11    solana_runtime::{
12        bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
13        bank_forks::BankForks,
14    },
15    solana_signature::Signature,
16    solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
17    std::{
18        collections::hash_map::{Entry, HashMap},
19        fmt,
20        sync::{
21            atomic::{AtomicU64, Ordering},
22            Arc, RwLock, Weak,
23        },
24    },
25    thiserror::Error,
26    tokio::sync::broadcast,
27};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct SubscriptionId(u64);
31
32impl From<u64> for SubscriptionId {
33    fn from(value: u64) -> Self {
34        SubscriptionId(value)
35    }
36}
37
38impl From<SubscriptionId> for u64 {
39    fn from(value: SubscriptionId) -> Self {
40        value.0
41    }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub enum SubscriptionParams {
46    Account(AccountSubscriptionParams),
47    Block(BlockSubscriptionParams),
48    Logs(LogsSubscriptionParams),
49    Program(ProgramSubscriptionParams),
50    Signature(SignatureSubscriptionParams),
51    Slot,
52    SlotsUpdates,
53    Root,
54    Vote,
55}
56
57impl SubscriptionParams {
58    fn method(&self) -> &'static str {
59        match self {
60            SubscriptionParams::Account(_) => "accountNotification",
61            SubscriptionParams::Logs(_) => "logsNotification",
62            SubscriptionParams::Program(_) => "programNotification",
63            SubscriptionParams::Signature(_) => "signatureNotification",
64            SubscriptionParams::Slot => "slotNotification",
65            SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
66            SubscriptionParams::Block(_) => "blockNotification",
67            SubscriptionParams::Root => "rootNotification",
68            SubscriptionParams::Vote => "voteNotification",
69        }
70    }
71
72    fn commitment(&self) -> Option<CommitmentConfig> {
73        match self {
74            SubscriptionParams::Account(params) => Some(params.commitment),
75            SubscriptionParams::Logs(params) => Some(params.commitment),
76            SubscriptionParams::Program(params) => Some(params.commitment),
77            SubscriptionParams::Signature(params) => Some(params.commitment),
78            SubscriptionParams::Block(params) => Some(params.commitment),
79            SubscriptionParams::Slot
80            | SubscriptionParams::SlotsUpdates
81            | SubscriptionParams::Root
82            | SubscriptionParams::Vote => None,
83        }
84    }
85
86    fn is_commitment_watcher(&self) -> bool {
87        let commitment = match self {
88            SubscriptionParams::Account(params) => &params.commitment,
89            SubscriptionParams::Block(params) => &params.commitment,
90            SubscriptionParams::Logs(params) => &params.commitment,
91            SubscriptionParams::Program(params) => &params.commitment,
92            SubscriptionParams::Signature(params) => &params.commitment,
93            SubscriptionParams::Root
94            | SubscriptionParams::Slot
95            | SubscriptionParams::SlotsUpdates
96            | SubscriptionParams::Vote => return false,
97        };
98        !commitment.is_confirmed()
99    }
100
101    fn is_gossip_watcher(&self) -> bool {
102        let commitment = match self {
103            SubscriptionParams::Account(params) => &params.commitment,
104            SubscriptionParams::Block(params) => &params.commitment,
105            SubscriptionParams::Logs(params) => &params.commitment,
106            SubscriptionParams::Program(params) => &params.commitment,
107            SubscriptionParams::Signature(params) => &params.commitment,
108            SubscriptionParams::Root
109            | SubscriptionParams::Slot
110            | SubscriptionParams::SlotsUpdates
111            | SubscriptionParams::Vote => return false,
112        };
113        commitment.is_confirmed()
114    }
115
116    fn is_node_progress_watcher(&self) -> bool {
117        matches!(
118            self,
119            SubscriptionParams::Slot
120                | SubscriptionParams::SlotsUpdates
121                | SubscriptionParams::Root
122                | SubscriptionParams::Vote
123        )
124    }
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct AccountSubscriptionParams {
129    pub pubkey: Pubkey,
130    pub encoding: UiAccountEncoding,
131    pub data_slice: Option<UiDataSliceConfig>,
132    pub commitment: CommitmentConfig,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Hash)]
136pub struct BlockSubscriptionParams {
137    pub commitment: CommitmentConfig,
138    pub encoding: UiTransactionEncoding,
139    pub kind: BlockSubscriptionKind,
140    pub transaction_details: TransactionDetails,
141    pub show_rewards: bool,
142    pub max_supported_transaction_version: Option<u8>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146pub enum BlockSubscriptionKind {
147    All,
148    MentionsAccountOrProgram(Pubkey),
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Hash)]
152pub struct LogsSubscriptionParams {
153    pub kind: LogsSubscriptionKind,
154    pub commitment: CommitmentConfig,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum LogsSubscriptionKind {
159    All,
160    AllWithVotes,
161    Single(Pubkey),
162}
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
165pub struct ProgramSubscriptionParams {
166    pub pubkey: Pubkey,
167    pub filters: Vec<RpcFilterType>,
168    pub encoding: UiAccountEncoding,
169    pub data_slice: Option<UiDataSliceConfig>,
170    pub commitment: CommitmentConfig,
171    pub with_context: bool,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Hash)]
175pub struct SignatureSubscriptionParams {
176    pub signature: Signature,
177    pub commitment: CommitmentConfig,
178    pub enable_received_notification: bool,
179}
180
181#[derive(Clone)]
182pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
183pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
184
185struct SubscriptionControlInner {
186    subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
187    next_id: AtomicU64,
188    max_active_subscriptions: usize,
189    sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
190    broadcast_sender: broadcast::Sender<RpcNotification>,
191    counter: TokenCounter,
192}
193
194impl SubscriptionControl {
195    pub fn new(
196        max_active_subscriptions: usize,
197        sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
198        broadcast_sender: broadcast::Sender<RpcNotification>,
199    ) -> Self {
200        Self(Arc::new(SubscriptionControlInner {
201            subscriptions: DashMap::new(),
202            next_id: AtomicU64::new(0),
203            max_active_subscriptions,
204            sender,
205            broadcast_sender,
206            counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
207        }))
208    }
209
210    pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
211        self.0.broadcast_sender.subscribe()
212    }
213
214    pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
215        debug!(
216            "Total existing subscriptions: {}",
217            self.0.subscriptions.len()
218        );
219        let count = self.0.subscriptions.len();
220        let create_token_and_weak_ref = |id, params| {
221            let token = SubscriptionToken(
222                Arc::new(SubscriptionTokenInner {
223                    control: Arc::clone(&self.0),
224                    params,
225                    id,
226                }),
227                self.0.counter.create_token(),
228            );
229            let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
230            (token, weak_ref)
231        };
232
233        match self.0.subscriptions.entry(params) {
234            DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
235                Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
236                // This means the last Arc for this Weak pointer entered the drop just before us,
237                // but could not remove the entry since we are holding the write lock.
238                // See `Drop` implementation for `SubscriptionTokenInner` for further info.
239                None => {
240                    let (token, weak_ref) =
241                        create_token_and_weak_ref(entry.get().1, entry.key().clone());
242                    entry.insert(weak_ref);
243                    Ok(token)
244                }
245            },
246            DashEntry::Vacant(entry) => {
247                if count >= self.0.max_active_subscriptions {
248                    inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
249                    return Err(Error::TooManySubscriptions);
250                }
251                let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
252                let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
253                let _ = self
254                    .0
255                    .sender
256                    .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
257                entry.insert(weak_ref);
258                datapoint_info!(
259                    "rpc-subscription",
260                    ("total", self.0.subscriptions.len(), i64)
261                );
262                Ok(token)
263            }
264        }
265    }
266
267    pub fn total(&self) -> usize {
268        self.0.subscriptions.len()
269    }
270
271    #[cfg(test)]
272    pub fn assert_subscribed(&self, params: &SubscriptionParams) {
273        assert!(self.0.subscriptions.contains_key(params));
274    }
275
276    #[cfg(test)]
277    pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
278        assert!(!self.0.subscriptions.contains_key(params));
279    }
280
281    #[cfg(test)]
282    pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
283        self.0.subscriptions.iter().any(|item| {
284            if let SubscriptionParams::Account(params) = item.key() {
285                &params.pubkey == pubkey
286            } else {
287                false
288            }
289        })
290    }
291
292    #[cfg(test)]
293    pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
294        self.0.subscriptions.iter().any(|item| {
295            if let SubscriptionParams::Logs(params) = item.key() {
296                let subscribed_pubkey = match &params.kind {
297                    LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
298                    LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
299                };
300                subscribed_pubkey == pubkey
301            } else {
302                false
303            }
304        })
305    }
306
307    #[cfg(test)]
308    pub fn signature_subscribed(&self, signature: &Signature) -> bool {
309        self.0.subscriptions.iter().any(|item| {
310            if let SubscriptionParams::Signature(params) = item.key() {
311                &params.signature == signature
312            } else {
313                false
314            }
315        })
316    }
317}
318
319#[derive(Debug)]
320pub struct SubscriptionInfo {
321    id: SubscriptionId,
322    params: SubscriptionParams,
323    method: &'static str,
324    pub last_notified_slot: RwLock<Slot>,
325    commitment: Option<CommitmentConfig>,
326}
327
328impl SubscriptionInfo {
329    pub fn id(&self) -> SubscriptionId {
330        self.id
331    }
332
333    pub fn method(&self) -> &'static str {
334        self.method
335    }
336
337    pub fn params(&self) -> &SubscriptionParams {
338        &self.params
339    }
340
341    pub fn commitment(&self) -> Option<CommitmentConfig> {
342        self.commitment
343    }
344}
345
346#[derive(Debug, Error)]
347pub enum Error {
348    #[error("node subscription limit reached")]
349    TooManySubscriptions,
350}
351
352struct LogsSubscriptionsIndex {
353    all_count: usize,
354    all_with_votes_count: usize,
355    single_count: HashMap<Pubkey, usize>,
356
357    bank_forks: Arc<RwLock<BankForks>>,
358}
359
360impl LogsSubscriptionsIndex {
361    fn add(&mut self, params: &LogsSubscriptionParams) {
362        match params.kind {
363            LogsSubscriptionKind::All => self.all_count += 1,
364            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
365            LogsSubscriptionKind::Single(key) => {
366                *self.single_count.entry(key).or_default() += 1;
367            }
368        }
369        self.update_config();
370    }
371
372    fn remove(&mut self, params: &LogsSubscriptionParams) {
373        match params.kind {
374            LogsSubscriptionKind::All => self.all_count -= 1,
375            LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
376            LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
377                Entry::Occupied(mut entry) => {
378                    *entry.get_mut() -= 1;
379                    if *entry.get() == 0 {
380                        entry.remove();
381                    }
382                }
383                Entry::Vacant(_) => error!("missing entry in single_count"),
384            },
385        }
386        self.update_config();
387    }
388
389    fn update_config(&self) {
390        let mentioned_addresses = self.single_count.keys().copied().collect();
391        let config = if self.all_with_votes_count > 0 {
392            TransactionLogCollectorConfig {
393                filter: TransactionLogCollectorFilter::AllWithVotes,
394                mentioned_addresses,
395            }
396        } else if self.all_count > 0 {
397            TransactionLogCollectorConfig {
398                filter: TransactionLogCollectorFilter::All,
399                mentioned_addresses,
400            }
401        } else {
402            TransactionLogCollectorConfig {
403                filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
404                mentioned_addresses,
405            }
406        };
407
408        *self
409            .bank_forks
410            .read()
411            .unwrap()
412            .root_bank()
413            .transaction_log_collector_config
414            .write()
415            .unwrap() = config;
416    }
417}
418
419pub struct SubscriptionsTracker {
420    logs_subscriptions_index: LogsSubscriptionsIndex,
421    by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
422    // Accounts, logs, programs, signatures (not gossip)
423    commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
424    // Accounts, logs, programs, signatures (gossip)
425    gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
426    // Slots, slots updates, roots, votes.
427    node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
428}
429
430impl SubscriptionsTracker {
431    pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
432        SubscriptionsTracker {
433            logs_subscriptions_index: LogsSubscriptionsIndex {
434                all_count: 0,
435                all_with_votes_count: 0,
436                single_count: HashMap::new(),
437                bank_forks,
438            },
439            by_signature: HashMap::new(),
440            commitment_watchers: HashMap::new(),
441            gossip_watchers: HashMap::new(),
442            node_progress_watchers: HashMap::new(),
443        }
444    }
445
446    pub fn subscribe(
447        &mut self,
448        params: SubscriptionParams,
449        id: SubscriptionId,
450        last_notified_slot: impl FnOnce() -> Slot,
451    ) {
452        let info = Arc::new(SubscriptionInfo {
453            last_notified_slot: RwLock::new(last_notified_slot()),
454            id,
455            commitment: params.commitment(),
456            method: params.method(),
457            params: params.clone(),
458        });
459        match &params {
460            SubscriptionParams::Logs(params) => {
461                self.logs_subscriptions_index.add(params);
462            }
463            SubscriptionParams::Signature(params) => {
464                self.by_signature
465                    .entry(params.signature)
466                    .or_default()
467                    .insert(id, Arc::clone(&info));
468            }
469            _ => {}
470        }
471        if info.params.is_commitment_watcher() {
472            self.commitment_watchers.insert(id, Arc::clone(&info));
473        }
474        if info.params.is_gossip_watcher() {
475            self.gossip_watchers.insert(id, Arc::clone(&info));
476        }
477        if info.params.is_node_progress_watcher() {
478            self.node_progress_watchers
479                .insert(info.params.clone(), Arc::clone(&info));
480        }
481    }
482
483    #[allow(clippy::collapsible_if)]
484    pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
485        match &params {
486            SubscriptionParams::Logs(params) => {
487                self.logs_subscriptions_index.remove(params);
488            }
489            SubscriptionParams::Signature(params) => {
490                if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
491                    if entry.get_mut().remove(&id).is_none() {
492                        warn!("Subscriptions inconsistency (missing entry in by_signature)");
493                    }
494                    if entry.get_mut().is_empty() {
495                        entry.remove();
496                    }
497                } else {
498                    warn!("Subscriptions inconsistency (missing entry in by_signature)");
499                }
500            }
501            _ => {}
502        }
503        if params.is_commitment_watcher() {
504            if self.commitment_watchers.remove(&id).is_none() {
505                warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
506            }
507        }
508        if params.is_gossip_watcher() {
509            if self.gossip_watchers.remove(&id).is_none() {
510                warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
511            }
512        }
513        if params.is_node_progress_watcher() {
514            if self.node_progress_watchers.remove(&params).is_none() {
515                warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
516            }
517        }
518    }
519
520    pub fn by_signature(
521        &self,
522    ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
523        &self.by_signature
524    }
525
526    pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
527        &self.commitment_watchers
528    }
529
530    pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
531        &self.gossip_watchers
532    }
533
534    pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
535        &self.node_progress_watchers
536    }
537}
538
539struct SubscriptionTokenInner {
540    control: Arc<SubscriptionControlInner>,
541    params: SubscriptionParams,
542    id: SubscriptionId,
543}
544
545impl fmt::Debug for SubscriptionTokenInner {
546    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547        f.debug_struct("SubscriptionTokenInner")
548            .field("id", &self.id)
549            .finish()
550    }
551}
552
553impl Drop for SubscriptionTokenInner {
554    #[allow(clippy::collapsible_if)]
555    fn drop(&mut self) {
556        match self.control.subscriptions.entry(self.params.clone()) {
557            DashEntry::Vacant(_) => {
558                warn!("Subscriptions inconsistency (missing entry in by_params)");
559            }
560            // Check the strong refs count to ensure no other thread recreated this subscription (not token)
561            // while we were acquiring the lock.
562            DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
563                let _ = self
564                    .control
565                    .sender
566                    .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
567                entry.remove();
568                datapoint_info!(
569                    "rpc-subscription",
570                    ("total", self.control.subscriptions.len(), i64)
571                );
572            }
573            // This branch handles the case in which this entry got recreated
574            // while we were waiting for the lock (inside the `DashMap::entry` method).
575            DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
576        }
577    }
578}
579
580// allowing dead code here to appease clippy, but unsure if/how the CounterToken is actually used.
581// further investigation would be necessary before removing
582#[allow(dead_code)]
583#[derive(Clone)]
584pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
585
586impl SubscriptionToken {
587    pub fn id(&self) -> SubscriptionId {
588        self.0.id
589    }
590
591    pub fn params(&self) -> &SubscriptionParams {
592        &self.0.params
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use {
599        super::*,
600        crate::rpc_pubsub_service::PubSubConfig,
601        solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
602        solana_runtime::bank::Bank,
603    };
604
605    struct ControlWrapper {
606        control: SubscriptionControl,
607        receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
608    }
609
610    impl ControlWrapper {
611        fn new() -> Self {
612            let (sender, receiver) = crossbeam_channel::unbounded();
613            let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
614
615            let control = SubscriptionControl::new(
616                PubSubConfig::default().max_active_subscriptions,
617                sender,
618                broadcast_sender,
619            );
620            Self { control, receiver }
621        }
622
623        fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
624            if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
625                assert_eq!(&params, expected_params);
626                assert_eq!(id, SubscriptionId::from(expected_id));
627            } else {
628                panic!("unexpected notification");
629            }
630            self.assert_silence();
631        }
632
633        fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
634            if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
635            {
636                assert_eq!(&params, expected_params);
637                assert_eq!(id, SubscriptionId::from(expected_id));
638            } else {
639                panic!("unexpected notification");
640            }
641            self.assert_silence();
642        }
643
644        fn assert_silence(&self) {
645            assert!(self.receiver.try_recv().is_err());
646        }
647    }
648
649    #[test]
650    fn notify_subscribe() {
651        let control = ControlWrapper::new();
652        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
653        control.assert_subscribed(&SubscriptionParams::Slot, 0);
654        drop(token1);
655        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
656    }
657
658    #[test]
659    fn notify_subscribe_multiple() {
660        let control = ControlWrapper::new();
661        let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
662        control.assert_subscribed(&SubscriptionParams::Slot, 0);
663        let token2 = token1.clone();
664        drop(token1);
665        let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
666        drop(token3);
667        control.assert_silence();
668        drop(token2);
669        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
670    }
671
672    #[test]
673    fn notify_subscribe_two_subscriptions() {
674        let control = ControlWrapper::new();
675        let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
676        control.assert_subscribed(&SubscriptionParams::Slot, 0);
677
678        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
679            signature: Signature::default(),
680            commitment: CommitmentConfig::processed(),
681            enable_received_notification: false,
682        });
683        let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
684        control.assert_subscribed(&signature_params, 1);
685
686        let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
687        let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
688        drop(token_slot1);
689        control.assert_silence();
690        drop(token_slot2);
691        control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
692        drop(token_signature2);
693        control.assert_silence();
694        drop(token_signature1);
695        control.assert_unsubscribed(&signature_params, 1);
696
697        let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
698        control.assert_subscribed(&SubscriptionParams::Slot, 2);
699        drop(token_slot3);
700        control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
701    }
702
703    #[test]
704    fn subscription_info() {
705        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
706        let bank = Bank::new_for_tests(&genesis_config);
707        let bank_forks = BankForks::new_rw_arc(bank);
708        let mut tracker = SubscriptionsTracker::new(bank_forks);
709
710        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
711        let info = tracker
712            .node_progress_watchers
713            .get(&SubscriptionParams::Slot)
714            .unwrap();
715        assert_eq!(info.commitment, None);
716        assert_eq!(info.params, SubscriptionParams::Slot);
717        assert_eq!(info.method, SubscriptionParams::Slot.method());
718        assert_eq!(info.id, SubscriptionId::from(0));
719        assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
720
721        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
722            pubkey: spl_generic_token::token::id(),
723            commitment: CommitmentConfig::finalized(),
724            encoding: UiAccountEncoding::Base64Zstd,
725            data_slice: None,
726        });
727        tracker.subscribe(account_params.clone(), 1.into(), || 42);
728
729        let info = tracker
730            .commitment_watchers
731            .get(&SubscriptionId::from(1))
732            .unwrap();
733        assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
734        assert_eq!(info.params, account_params);
735        assert_eq!(info.method, account_params.method());
736        assert_eq!(info.id, SubscriptionId::from(1));
737        assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
738    }
739
740    #[test]
741    fn subscription_indexes() {
742        fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
743            (
744                tracker.by_signature.len(),
745                tracker.commitment_watchers.len(),
746                tracker.gossip_watchers.len(),
747                tracker.node_progress_watchers.len(),
748            )
749        }
750
751        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
752        let bank = Bank::new_for_tests(&genesis_config);
753        let bank_forks = BankForks::new_rw_arc(bank);
754        let mut tracker = SubscriptionsTracker::new(bank_forks);
755
756        tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
757        assert_eq!(counts(&tracker), (0, 0, 0, 1));
758        tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
759        assert_eq!(counts(&tracker), (0, 0, 0, 0));
760
761        let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
762            pubkey: spl_generic_token::token::id(),
763            commitment: CommitmentConfig::finalized(),
764            encoding: UiAccountEncoding::Base64Zstd,
765            data_slice: None,
766        });
767        tracker.subscribe(account_params.clone(), 1.into(), || 0);
768        assert_eq!(counts(&tracker), (0, 1, 0, 0));
769        tracker.unsubscribe(account_params, 1.into());
770        assert_eq!(counts(&tracker), (0, 0, 0, 0));
771
772        let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
773            pubkey: spl_generic_token::token::id(),
774            commitment: CommitmentConfig::confirmed(),
775            encoding: UiAccountEncoding::Base64Zstd,
776            data_slice: None,
777        });
778        tracker.subscribe(account_params2.clone(), 2.into(), || 0);
779        assert_eq!(counts(&tracker), (0, 0, 1, 0));
780        tracker.unsubscribe(account_params2, 2.into());
781        assert_eq!(counts(&tracker), (0, 0, 0, 0));
782
783        let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
784            signature: Signature::default(),
785            commitment: CommitmentConfig::processed(),
786            enable_received_notification: false,
787        });
788        tracker.subscribe(signature_params.clone(), 3.into(), || 0);
789        assert_eq!(counts(&tracker), (1, 1, 0, 0));
790        tracker.unsubscribe(signature_params, 3.into());
791        assert_eq!(counts(&tracker), (0, 0, 0, 0));
792    }
793}