1use {
2 crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3 dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4 solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
5 solana_metrics::{CounterToken, TokenCounter},
6 solana_rpc_client_api::filter::RpcFilterType,
7 solana_runtime::{
8 bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
9 bank_forks::BankForks,
10 },
11 solana_sdk::{
12 clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature,
13 },
14 solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
15 std::{
16 collections::hash_map::{Entry, HashMap},
17 fmt,
18 sync::{
19 atomic::{AtomicU64, Ordering},
20 Arc, RwLock, Weak,
21 },
22 },
23 thiserror::Error,
24 tokio::sync::broadcast,
25};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
28pub struct SubscriptionId(u64);
29
30impl From<u64> for SubscriptionId {
31 fn from(value: u64) -> Self {
32 SubscriptionId(value)
33 }
34}
35
36impl From<SubscriptionId> for u64 {
37 fn from(value: SubscriptionId) -> Self {
38 value.0
39 }
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum SubscriptionParams {
44 Account(AccountSubscriptionParams),
45 Block(BlockSubscriptionParams),
46 Logs(LogsSubscriptionParams),
47 Program(ProgramSubscriptionParams),
48 Signature(SignatureSubscriptionParams),
49 Slot,
50 SlotsUpdates,
51 Root,
52 Vote,
53}
54
55impl SubscriptionParams {
56 fn method(&self) -> &'static str {
57 match self {
58 SubscriptionParams::Account(_) => "accountNotification",
59 SubscriptionParams::Logs(_) => "logsNotification",
60 SubscriptionParams::Program(_) => "programNotification",
61 SubscriptionParams::Signature(_) => "signatureNotification",
62 SubscriptionParams::Slot => "slotNotification",
63 SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
64 SubscriptionParams::Block(_) => "blockNotification",
65 SubscriptionParams::Root => "rootNotification",
66 SubscriptionParams::Vote => "voteNotification",
67 }
68 }
69
70 fn commitment(&self) -> Option<CommitmentConfig> {
71 match self {
72 SubscriptionParams::Account(params) => Some(params.commitment),
73 SubscriptionParams::Logs(params) => Some(params.commitment),
74 SubscriptionParams::Program(params) => Some(params.commitment),
75 SubscriptionParams::Signature(params) => Some(params.commitment),
76 SubscriptionParams::Block(params) => Some(params.commitment),
77 SubscriptionParams::Slot
78 | SubscriptionParams::SlotsUpdates
79 | SubscriptionParams::Root
80 | SubscriptionParams::Vote => None,
81 }
82 }
83
84 fn is_commitment_watcher(&self) -> bool {
85 let commitment = match self {
86 SubscriptionParams::Account(params) => ¶ms.commitment,
87 SubscriptionParams::Block(params) => ¶ms.commitment,
88 SubscriptionParams::Logs(params) => ¶ms.commitment,
89 SubscriptionParams::Program(params) => ¶ms.commitment,
90 SubscriptionParams::Signature(params) => ¶ms.commitment,
91 SubscriptionParams::Root
92 | SubscriptionParams::Slot
93 | SubscriptionParams::SlotsUpdates
94 | SubscriptionParams::Vote => return false,
95 };
96 !commitment.is_confirmed()
97 }
98
99 fn is_gossip_watcher(&self) -> bool {
100 let commitment = match self {
101 SubscriptionParams::Account(params) => ¶ms.commitment,
102 SubscriptionParams::Block(params) => ¶ms.commitment,
103 SubscriptionParams::Logs(params) => ¶ms.commitment,
104 SubscriptionParams::Program(params) => ¶ms.commitment,
105 SubscriptionParams::Signature(params) => ¶ms.commitment,
106 SubscriptionParams::Root
107 | SubscriptionParams::Slot
108 | SubscriptionParams::SlotsUpdates
109 | SubscriptionParams::Vote => return false,
110 };
111 commitment.is_confirmed()
112 }
113
114 fn is_node_progress_watcher(&self) -> bool {
115 matches!(
116 self,
117 SubscriptionParams::Slot
118 | SubscriptionParams::SlotsUpdates
119 | SubscriptionParams::Root
120 | SubscriptionParams::Vote
121 )
122 }
123}
124
125#[derive(Debug, Clone, PartialEq, Eq, Hash)]
126pub struct AccountSubscriptionParams {
127 pub pubkey: Pubkey,
128 pub encoding: UiAccountEncoding,
129 pub data_slice: Option<UiDataSliceConfig>,
130 pub commitment: CommitmentConfig,
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub struct BlockSubscriptionParams {
135 pub commitment: CommitmentConfig,
136 pub encoding: UiTransactionEncoding,
137 pub kind: BlockSubscriptionKind,
138 pub transaction_details: TransactionDetails,
139 pub show_rewards: bool,
140 pub max_supported_transaction_version: Option<u8>,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq, Hash)]
144pub enum BlockSubscriptionKind {
145 All,
146 MentionsAccountOrProgram(Pubkey),
147}
148
149#[derive(Debug, Clone, PartialEq, Eq, Hash)]
150pub struct LogsSubscriptionParams {
151 pub kind: LogsSubscriptionKind,
152 pub commitment: CommitmentConfig,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156pub enum LogsSubscriptionKind {
157 All,
158 AllWithVotes,
159 Single(Pubkey),
160}
161
162#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163pub struct ProgramSubscriptionParams {
164 pub pubkey: Pubkey,
165 pub filters: Vec<RpcFilterType>,
166 pub encoding: UiAccountEncoding,
167 pub data_slice: Option<UiDataSliceConfig>,
168 pub commitment: CommitmentConfig,
169 pub with_context: bool,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
173pub struct SignatureSubscriptionParams {
174 pub signature: Signature,
175 pub commitment: CommitmentConfig,
176 pub enable_received_notification: bool,
177}
178
179#[derive(Clone)]
180pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
181pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
182
183struct SubscriptionControlInner {
184 subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
185 next_id: AtomicU64,
186 max_active_subscriptions: usize,
187 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
188 broadcast_sender: broadcast::Sender<RpcNotification>,
189 counter: TokenCounter,
190}
191
192impl SubscriptionControl {
193 pub fn new(
194 max_active_subscriptions: usize,
195 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
196 broadcast_sender: broadcast::Sender<RpcNotification>,
197 ) -> Self {
198 Self(Arc::new(SubscriptionControlInner {
199 subscriptions: DashMap::new(),
200 next_id: AtomicU64::new(0),
201 max_active_subscriptions,
202 sender,
203 broadcast_sender,
204 counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
205 }))
206 }
207
208 pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
209 self.0.broadcast_sender.subscribe()
210 }
211
212 pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
213 debug!(
214 "Total existing subscriptions: {}",
215 self.0.subscriptions.len()
216 );
217 let count = self.0.subscriptions.len();
218 let create_token_and_weak_ref = |id, params| {
219 let token = SubscriptionToken(
220 Arc::new(SubscriptionTokenInner {
221 control: Arc::clone(&self.0),
222 params,
223 id,
224 }),
225 self.0.counter.create_token(),
226 );
227 let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
228 (token, weak_ref)
229 };
230
231 match self.0.subscriptions.entry(params) {
232 DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
233 Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
234 None => {
238 let (token, weak_ref) =
239 create_token_and_weak_ref(entry.get().1, entry.key().clone());
240 entry.insert(weak_ref);
241 Ok(token)
242 }
243 },
244 DashEntry::Vacant(entry) => {
245 if count >= self.0.max_active_subscriptions {
246 inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
247 return Err(Error::TooManySubscriptions);
248 }
249 let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
250 let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
251 let _ = self
252 .0
253 .sender
254 .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
255 entry.insert(weak_ref);
256 datapoint_info!(
257 "rpc-subscription",
258 ("total", self.0.subscriptions.len(), i64)
259 );
260 Ok(token)
261 }
262 }
263 }
264
265 pub fn total(&self) -> usize {
266 self.0.subscriptions.len()
267 }
268
269 #[cfg(test)]
270 pub fn assert_subscribed(&self, params: &SubscriptionParams) {
271 assert!(self.0.subscriptions.contains_key(params));
272 }
273
274 #[cfg(test)]
275 pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
276 assert!(!self.0.subscriptions.contains_key(params));
277 }
278
279 #[cfg(test)]
280 pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
281 self.0.subscriptions.iter().any(|item| {
282 if let SubscriptionParams::Account(params) = item.key() {
283 ¶ms.pubkey == pubkey
284 } else {
285 false
286 }
287 })
288 }
289
290 #[cfg(test)]
291 pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
292 self.0.subscriptions.iter().any(|item| {
293 if let SubscriptionParams::Logs(params) = item.key() {
294 let subscribed_pubkey = match ¶ms.kind {
295 LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
296 LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
297 };
298 subscribed_pubkey == pubkey
299 } else {
300 false
301 }
302 })
303 }
304
305 #[cfg(test)]
306 pub fn signature_subscribed(&self, signature: &Signature) -> bool {
307 self.0.subscriptions.iter().any(|item| {
308 if let SubscriptionParams::Signature(params) = item.key() {
309 ¶ms.signature == signature
310 } else {
311 false
312 }
313 })
314 }
315}
316
317#[derive(Debug)]
318pub struct SubscriptionInfo {
319 id: SubscriptionId,
320 params: SubscriptionParams,
321 method: &'static str,
322 pub last_notified_slot: RwLock<Slot>,
323 commitment: Option<CommitmentConfig>,
324}
325
326impl SubscriptionInfo {
327 pub fn id(&self) -> SubscriptionId {
328 self.id
329 }
330
331 pub fn method(&self) -> &'static str {
332 self.method
333 }
334
335 pub fn params(&self) -> &SubscriptionParams {
336 &self.params
337 }
338
339 pub fn commitment(&self) -> Option<CommitmentConfig> {
340 self.commitment
341 }
342}
343
344#[derive(Debug, Error)]
345pub enum Error {
346 #[error("node subscription limit reached")]
347 TooManySubscriptions,
348}
349
350struct LogsSubscriptionsIndex {
351 all_count: usize,
352 all_with_votes_count: usize,
353 single_count: HashMap<Pubkey, usize>,
354
355 bank_forks: Arc<RwLock<BankForks>>,
356}
357
358impl LogsSubscriptionsIndex {
359 fn add(&mut self, params: &LogsSubscriptionParams) {
360 match params.kind {
361 LogsSubscriptionKind::All => self.all_count += 1,
362 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
363 LogsSubscriptionKind::Single(key) => {
364 *self.single_count.entry(key).or_default() += 1;
365 }
366 }
367 self.update_config();
368 }
369
370 fn remove(&mut self, params: &LogsSubscriptionParams) {
371 match params.kind {
372 LogsSubscriptionKind::All => self.all_count -= 1,
373 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
374 LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
375 Entry::Occupied(mut entry) => {
376 *entry.get_mut() -= 1;
377 if *entry.get() == 0 {
378 entry.remove();
379 }
380 }
381 Entry::Vacant(_) => error!("missing entry in single_count"),
382 },
383 }
384 self.update_config();
385 }
386
387 fn update_config(&self) {
388 let mentioned_addresses = self.single_count.keys().copied().collect();
389 let config = if self.all_with_votes_count > 0 {
390 TransactionLogCollectorConfig {
391 filter: TransactionLogCollectorFilter::AllWithVotes,
392 mentioned_addresses,
393 }
394 } else if self.all_count > 0 {
395 TransactionLogCollectorConfig {
396 filter: TransactionLogCollectorFilter::All,
397 mentioned_addresses,
398 }
399 } else {
400 TransactionLogCollectorConfig {
401 filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
402 mentioned_addresses,
403 }
404 };
405
406 *self
407 .bank_forks
408 .read()
409 .unwrap()
410 .root_bank()
411 .transaction_log_collector_config
412 .write()
413 .unwrap() = config;
414 }
415}
416
417pub struct SubscriptionsTracker {
418 logs_subscriptions_index: LogsSubscriptionsIndex,
419 by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
420 commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
422 gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
424 node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
426}
427
428impl SubscriptionsTracker {
429 pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
430 SubscriptionsTracker {
431 logs_subscriptions_index: LogsSubscriptionsIndex {
432 all_count: 0,
433 all_with_votes_count: 0,
434 single_count: HashMap::new(),
435 bank_forks,
436 },
437 by_signature: HashMap::new(),
438 commitment_watchers: HashMap::new(),
439 gossip_watchers: HashMap::new(),
440 node_progress_watchers: HashMap::new(),
441 }
442 }
443
444 pub fn subscribe(
445 &mut self,
446 params: SubscriptionParams,
447 id: SubscriptionId,
448 last_notified_slot: impl FnOnce() -> Slot,
449 ) {
450 let info = Arc::new(SubscriptionInfo {
451 last_notified_slot: RwLock::new(last_notified_slot()),
452 id,
453 commitment: params.commitment(),
454 method: params.method(),
455 params: params.clone(),
456 });
457 match ¶ms {
458 SubscriptionParams::Logs(params) => {
459 self.logs_subscriptions_index.add(params);
460 }
461 SubscriptionParams::Signature(params) => {
462 self.by_signature
463 .entry(params.signature)
464 .or_default()
465 .insert(id, Arc::clone(&info));
466 }
467 _ => {}
468 }
469 if info.params.is_commitment_watcher() {
470 self.commitment_watchers.insert(id, Arc::clone(&info));
471 }
472 if info.params.is_gossip_watcher() {
473 self.gossip_watchers.insert(id, Arc::clone(&info));
474 }
475 if info.params.is_node_progress_watcher() {
476 self.node_progress_watchers
477 .insert(info.params.clone(), Arc::clone(&info));
478 }
479 }
480
481 #[allow(clippy::collapsible_if)]
482 pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
483 match ¶ms {
484 SubscriptionParams::Logs(params) => {
485 self.logs_subscriptions_index.remove(params);
486 }
487 SubscriptionParams::Signature(params) => {
488 if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
489 if entry.get_mut().remove(&id).is_none() {
490 warn!("Subscriptions inconsistency (missing entry in by_signature)");
491 }
492 if entry.get_mut().is_empty() {
493 entry.remove();
494 }
495 } else {
496 warn!("Subscriptions inconsistency (missing entry in by_signature)");
497 }
498 }
499 _ => {}
500 }
501 if params.is_commitment_watcher() {
502 if self.commitment_watchers.remove(&id).is_none() {
503 warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
504 }
505 }
506 if params.is_gossip_watcher() {
507 if self.gossip_watchers.remove(&id).is_none() {
508 warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
509 }
510 }
511 if params.is_node_progress_watcher() {
512 if self.node_progress_watchers.remove(¶ms).is_none() {
513 warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
514 }
515 }
516 }
517
518 pub fn by_signature(
519 &self,
520 ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
521 &self.by_signature
522 }
523
524 pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
525 &self.commitment_watchers
526 }
527
528 pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
529 &self.gossip_watchers
530 }
531
532 pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
533 &self.node_progress_watchers
534 }
535}
536
537struct SubscriptionTokenInner {
538 control: Arc<SubscriptionControlInner>,
539 params: SubscriptionParams,
540 id: SubscriptionId,
541}
542
543impl fmt::Debug for SubscriptionTokenInner {
544 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545 f.debug_struct("SubscriptionTokenInner")
546 .field("id", &self.id)
547 .finish()
548 }
549}
550
551impl Drop for SubscriptionTokenInner {
552 #[allow(clippy::collapsible_if)]
553 fn drop(&mut self) {
554 match self.control.subscriptions.entry(self.params.clone()) {
555 DashEntry::Vacant(_) => {
556 warn!("Subscriptions inconsistency (missing entry in by_params)");
557 }
558 DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
561 let _ = self
562 .control
563 .sender
564 .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
565 entry.remove();
566 datapoint_info!(
567 "rpc-subscription",
568 ("total", self.control.subscriptions.len(), i64)
569 );
570 }
571 DashEntry::Occupied(_entry) => (),
574 }
575 }
576}
577
578#[allow(dead_code)]
581#[derive(Clone)]
582pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
583
584impl SubscriptionToken {
585 pub fn id(&self) -> SubscriptionId {
586 self.0.id
587 }
588
589 pub fn params(&self) -> &SubscriptionParams {
590 &self.0.params
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use {
597 super::*,
598 crate::rpc_pubsub_service::PubSubConfig,
599 solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
600 solana_runtime::bank::Bank,
601 };
602
603 struct ControlWrapper {
604 control: SubscriptionControl,
605 receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
606 }
607
608 impl ControlWrapper {
609 fn new() -> Self {
610 let (sender, receiver) = crossbeam_channel::unbounded();
611 let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
612
613 let control = SubscriptionControl::new(
614 PubSubConfig::default().max_active_subscriptions,
615 sender,
616 broadcast_sender,
617 );
618 Self { control, receiver }
619 }
620
621 fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
622 if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
623 assert_eq!(¶ms, expected_params);
624 assert_eq!(id, SubscriptionId::from(expected_id));
625 } else {
626 panic!("unexpected notification");
627 }
628 self.assert_silence();
629 }
630
631 fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
632 if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
633 {
634 assert_eq!(¶ms, expected_params);
635 assert_eq!(id, SubscriptionId::from(expected_id));
636 } else {
637 panic!("unexpected notification");
638 }
639 self.assert_silence();
640 }
641
642 fn assert_silence(&self) {
643 assert!(self.receiver.try_recv().is_err());
644 }
645 }
646
647 #[test]
648 fn notify_subscribe() {
649 let control = ControlWrapper::new();
650 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
651 control.assert_subscribed(&SubscriptionParams::Slot, 0);
652 drop(token1);
653 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
654 }
655
656 #[test]
657 fn notify_subscribe_multiple() {
658 let control = ControlWrapper::new();
659 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
660 control.assert_subscribed(&SubscriptionParams::Slot, 0);
661 let token2 = token1.clone();
662 drop(token1);
663 let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
664 drop(token3);
665 control.assert_silence();
666 drop(token2);
667 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
668 }
669
670 #[test]
671 fn notify_subscribe_two_subscriptions() {
672 let control = ControlWrapper::new();
673 let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
674 control.assert_subscribed(&SubscriptionParams::Slot, 0);
675
676 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
677 signature: Signature::default(),
678 commitment: CommitmentConfig::processed(),
679 enable_received_notification: false,
680 });
681 let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
682 control.assert_subscribed(&signature_params, 1);
683
684 let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
685 let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
686 drop(token_slot1);
687 control.assert_silence();
688 drop(token_slot2);
689 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
690 drop(token_signature2);
691 control.assert_silence();
692 drop(token_signature1);
693 control.assert_unsubscribed(&signature_params, 1);
694
695 let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
696 control.assert_subscribed(&SubscriptionParams::Slot, 2);
697 drop(token_slot3);
698 control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
699 }
700
701 #[test]
702 fn subscription_info() {
703 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
704 let bank = Bank::new_for_tests(&genesis_config);
705 let bank_forks = BankForks::new_rw_arc(bank);
706 let mut tracker = SubscriptionsTracker::new(bank_forks);
707
708 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
709 let info = tracker
710 .node_progress_watchers
711 .get(&SubscriptionParams::Slot)
712 .unwrap();
713 assert_eq!(info.commitment, None);
714 assert_eq!(info.params, SubscriptionParams::Slot);
715 assert_eq!(info.method, SubscriptionParams::Slot.method());
716 assert_eq!(info.id, SubscriptionId::from(0));
717 assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
718
719 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
720 pubkey: solana_inline_spl::token::id(),
721 commitment: CommitmentConfig::finalized(),
722 encoding: UiAccountEncoding::Base64Zstd,
723 data_slice: None,
724 });
725 tracker.subscribe(account_params.clone(), 1.into(), || 42);
726
727 let info = tracker
728 .commitment_watchers
729 .get(&SubscriptionId::from(1))
730 .unwrap();
731 assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
732 assert_eq!(info.params, account_params);
733 assert_eq!(info.method, account_params.method());
734 assert_eq!(info.id, SubscriptionId::from(1));
735 assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
736 }
737
738 #[test]
739 fn subscription_indexes() {
740 fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
741 (
742 tracker.by_signature.len(),
743 tracker.commitment_watchers.len(),
744 tracker.gossip_watchers.len(),
745 tracker.node_progress_watchers.len(),
746 )
747 }
748
749 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
750 let bank = Bank::new_for_tests(&genesis_config);
751 let bank_forks = BankForks::new_rw_arc(bank);
752 let mut tracker = SubscriptionsTracker::new(bank_forks);
753
754 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
755 assert_eq!(counts(&tracker), (0, 0, 0, 1));
756 tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
757 assert_eq!(counts(&tracker), (0, 0, 0, 0));
758
759 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
760 pubkey: solana_inline_spl::token::id(),
761 commitment: CommitmentConfig::finalized(),
762 encoding: UiAccountEncoding::Base64Zstd,
763 data_slice: None,
764 });
765 tracker.subscribe(account_params.clone(), 1.into(), || 0);
766 assert_eq!(counts(&tracker), (0, 1, 0, 0));
767 tracker.unsubscribe(account_params, 1.into());
768 assert_eq!(counts(&tracker), (0, 0, 0, 0));
769
770 let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
771 pubkey: solana_inline_spl::token::id(),
772 commitment: CommitmentConfig::confirmed(),
773 encoding: UiAccountEncoding::Base64Zstd,
774 data_slice: None,
775 });
776 tracker.subscribe(account_params2.clone(), 2.into(), || 0);
777 assert_eq!(counts(&tracker), (0, 0, 1, 0));
778 tracker.unsubscribe(account_params2, 2.into());
779 assert_eq!(counts(&tracker), (0, 0, 0, 0));
780
781 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
782 signature: Signature::default(),
783 commitment: CommitmentConfig::processed(),
784 enable_received_notification: false,
785 });
786 tracker.subscribe(signature_params.clone(), 3.into(), || 0);
787 assert_eq!(counts(&tracker), (1, 1, 0, 0));
788 tracker.unsubscribe(signature_params, 3.into());
789 assert_eq!(counts(&tracker), (0, 0, 0, 0));
790 }
791}