1use {
2 crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3 dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4 serde::{Deserialize, Serialize},
5 solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
6 solana_clock::Slot,
7 solana_commitment_config::CommitmentConfig,
8 solana_metrics::{CounterToken, TokenCounter},
9 solana_pubkey::Pubkey,
10 solana_rpc_client_api::filter::RpcFilterType,
11 solana_runtime::{
12 bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
13 bank_forks::BankForks,
14 },
15 solana_signature::Signature,
16 solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
17 std::{
18 collections::hash_map::{Entry, HashMap},
19 fmt,
20 sync::{
21 atomic::{AtomicU64, Ordering},
22 Arc, RwLock, Weak,
23 },
24 },
25 thiserror::Error,
26 tokio::sync::broadcast,
27};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
30pub struct SubscriptionId(u64);
31
32impl From<u64> for SubscriptionId {
33 fn from(value: u64) -> Self {
34 SubscriptionId(value)
35 }
36}
37
38impl From<SubscriptionId> for u64 {
39 fn from(value: SubscriptionId) -> Self {
40 value.0
41 }
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub enum SubscriptionParams {
46 Account(AccountSubscriptionParams),
47 Block(BlockSubscriptionParams),
48 Logs(LogsSubscriptionParams),
49 Program(ProgramSubscriptionParams),
50 Signature(SignatureSubscriptionParams),
51 Slot,
52 SlotsUpdates,
53 Root,
54 Vote,
55}
56
57impl SubscriptionParams {
58 fn method(&self) -> &'static str {
59 match self {
60 SubscriptionParams::Account(_) => "accountNotification",
61 SubscriptionParams::Logs(_) => "logsNotification",
62 SubscriptionParams::Program(_) => "programNotification",
63 SubscriptionParams::Signature(_) => "signatureNotification",
64 SubscriptionParams::Slot => "slotNotification",
65 SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
66 SubscriptionParams::Block(_) => "blockNotification",
67 SubscriptionParams::Root => "rootNotification",
68 SubscriptionParams::Vote => "voteNotification",
69 }
70 }
71
72 fn commitment(&self) -> Option<CommitmentConfig> {
73 match self {
74 SubscriptionParams::Account(params) => Some(params.commitment),
75 SubscriptionParams::Logs(params) => Some(params.commitment),
76 SubscriptionParams::Program(params) => Some(params.commitment),
77 SubscriptionParams::Signature(params) => Some(params.commitment),
78 SubscriptionParams::Block(params) => Some(params.commitment),
79 SubscriptionParams::Slot
80 | SubscriptionParams::SlotsUpdates
81 | SubscriptionParams::Root
82 | SubscriptionParams::Vote => None,
83 }
84 }
85
86 fn is_commitment_watcher(&self) -> bool {
87 let commitment = match self {
88 SubscriptionParams::Account(params) => ¶ms.commitment,
89 SubscriptionParams::Block(params) => ¶ms.commitment,
90 SubscriptionParams::Logs(params) => ¶ms.commitment,
91 SubscriptionParams::Program(params) => ¶ms.commitment,
92 SubscriptionParams::Signature(params) => ¶ms.commitment,
93 SubscriptionParams::Root
94 | SubscriptionParams::Slot
95 | SubscriptionParams::SlotsUpdates
96 | SubscriptionParams::Vote => return false,
97 };
98 !commitment.is_confirmed()
99 }
100
101 fn is_gossip_watcher(&self) -> bool {
102 let commitment = match self {
103 SubscriptionParams::Account(params) => ¶ms.commitment,
104 SubscriptionParams::Block(params) => ¶ms.commitment,
105 SubscriptionParams::Logs(params) => ¶ms.commitment,
106 SubscriptionParams::Program(params) => ¶ms.commitment,
107 SubscriptionParams::Signature(params) => ¶ms.commitment,
108 SubscriptionParams::Root
109 | SubscriptionParams::Slot
110 | SubscriptionParams::SlotsUpdates
111 | SubscriptionParams::Vote => return false,
112 };
113 commitment.is_confirmed()
114 }
115
116 fn is_node_progress_watcher(&self) -> bool {
117 matches!(
118 self,
119 SubscriptionParams::Slot
120 | SubscriptionParams::SlotsUpdates
121 | SubscriptionParams::Root
122 | SubscriptionParams::Vote
123 )
124 }
125}
126
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct AccountSubscriptionParams {
129 pub pubkey: Pubkey,
130 pub encoding: UiAccountEncoding,
131 pub data_slice: Option<UiDataSliceConfig>,
132 pub commitment: CommitmentConfig,
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Hash)]
136pub struct BlockSubscriptionParams {
137 pub commitment: CommitmentConfig,
138 pub encoding: UiTransactionEncoding,
139 pub kind: BlockSubscriptionKind,
140 pub transaction_details: TransactionDetails,
141 pub show_rewards: bool,
142 pub max_supported_transaction_version: Option<u8>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146pub enum BlockSubscriptionKind {
147 All,
148 MentionsAccountOrProgram(Pubkey),
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Hash)]
152pub struct LogsSubscriptionParams {
153 pub kind: LogsSubscriptionKind,
154 pub commitment: CommitmentConfig,
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum LogsSubscriptionKind {
159 All,
160 AllWithVotes,
161 Single(Pubkey),
162}
163
164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
165pub struct ProgramSubscriptionParams {
166 pub pubkey: Pubkey,
167 pub filters: Vec<RpcFilterType>,
168 pub encoding: UiAccountEncoding,
169 pub data_slice: Option<UiDataSliceConfig>,
170 pub commitment: CommitmentConfig,
171 pub with_context: bool,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Hash)]
175pub struct SignatureSubscriptionParams {
176 pub signature: Signature,
177 pub commitment: CommitmentConfig,
178 pub enable_received_notification: bool,
179}
180
181#[derive(Clone)]
182pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
183pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
184
185struct SubscriptionControlInner {
186 subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
187 next_id: AtomicU64,
188 max_active_subscriptions: usize,
189 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
190 broadcast_sender: broadcast::Sender<RpcNotification>,
191 counter: TokenCounter,
192}
193
194impl SubscriptionControl {
195 pub fn new(
196 max_active_subscriptions: usize,
197 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
198 broadcast_sender: broadcast::Sender<RpcNotification>,
199 ) -> Self {
200 Self(Arc::new(SubscriptionControlInner {
201 subscriptions: DashMap::new(),
202 next_id: AtomicU64::new(0),
203 max_active_subscriptions,
204 sender,
205 broadcast_sender,
206 counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
207 }))
208 }
209
210 pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
211 self.0.broadcast_sender.subscribe()
212 }
213
214 pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
215 debug!(
216 "Total existing subscriptions: {}",
217 self.0.subscriptions.len()
218 );
219 let count = self.0.subscriptions.len();
220 let create_token_and_weak_ref = |id, params| {
221 let token = SubscriptionToken(
222 Arc::new(SubscriptionTokenInner {
223 control: Arc::clone(&self.0),
224 params,
225 id,
226 }),
227 self.0.counter.create_token(),
228 );
229 let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
230 (token, weak_ref)
231 };
232
233 match self.0.subscriptions.entry(params) {
234 DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
235 Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
236 None => {
240 let (token, weak_ref) =
241 create_token_and_weak_ref(entry.get().1, entry.key().clone());
242 entry.insert(weak_ref);
243 Ok(token)
244 }
245 },
246 DashEntry::Vacant(entry) => {
247 if count >= self.0.max_active_subscriptions {
248 inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
249 return Err(Error::TooManySubscriptions);
250 }
251 let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
252 let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
253 let _ = self
254 .0
255 .sender
256 .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
257 entry.insert(weak_ref);
258 datapoint_info!(
259 "rpc-subscription",
260 ("total", self.0.subscriptions.len(), i64)
261 );
262 Ok(token)
263 }
264 }
265 }
266
267 pub fn total(&self) -> usize {
268 self.0.subscriptions.len()
269 }
270
271 #[cfg(test)]
272 pub fn assert_subscribed(&self, params: &SubscriptionParams) {
273 assert!(self.0.subscriptions.contains_key(params));
274 }
275
276 #[cfg(test)]
277 pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
278 assert!(!self.0.subscriptions.contains_key(params));
279 }
280
281 #[cfg(test)]
282 pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
283 self.0.subscriptions.iter().any(|item| {
284 if let SubscriptionParams::Account(params) = item.key() {
285 ¶ms.pubkey == pubkey
286 } else {
287 false
288 }
289 })
290 }
291
292 #[cfg(test)]
293 pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
294 self.0.subscriptions.iter().any(|item| {
295 if let SubscriptionParams::Logs(params) = item.key() {
296 let subscribed_pubkey = match ¶ms.kind {
297 LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
298 LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
299 };
300 subscribed_pubkey == pubkey
301 } else {
302 false
303 }
304 })
305 }
306
307 #[cfg(test)]
308 pub fn signature_subscribed(&self, signature: &Signature) -> bool {
309 self.0.subscriptions.iter().any(|item| {
310 if let SubscriptionParams::Signature(params) = item.key() {
311 ¶ms.signature == signature
312 } else {
313 false
314 }
315 })
316 }
317}
318
319#[derive(Debug)]
320pub struct SubscriptionInfo {
321 id: SubscriptionId,
322 params: SubscriptionParams,
323 method: &'static str,
324 pub last_notified_slot: RwLock<Slot>,
325 commitment: Option<CommitmentConfig>,
326}
327
328impl SubscriptionInfo {
329 pub fn id(&self) -> SubscriptionId {
330 self.id
331 }
332
333 pub fn method(&self) -> &'static str {
334 self.method
335 }
336
337 pub fn params(&self) -> &SubscriptionParams {
338 &self.params
339 }
340
341 pub fn commitment(&self) -> Option<CommitmentConfig> {
342 self.commitment
343 }
344}
345
346#[derive(Debug, Error)]
347pub enum Error {
348 #[error("node subscription limit reached")]
349 TooManySubscriptions,
350}
351
352struct LogsSubscriptionsIndex {
353 all_count: usize,
354 all_with_votes_count: usize,
355 single_count: HashMap<Pubkey, usize>,
356
357 bank_forks: Arc<RwLock<BankForks>>,
358}
359
360impl LogsSubscriptionsIndex {
361 fn add(&mut self, params: &LogsSubscriptionParams) {
362 match params.kind {
363 LogsSubscriptionKind::All => self.all_count += 1,
364 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
365 LogsSubscriptionKind::Single(key) => {
366 *self.single_count.entry(key).or_default() += 1;
367 }
368 }
369 self.update_config();
370 }
371
372 fn remove(&mut self, params: &LogsSubscriptionParams) {
373 match params.kind {
374 LogsSubscriptionKind::All => self.all_count -= 1,
375 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
376 LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
377 Entry::Occupied(mut entry) => {
378 *entry.get_mut() -= 1;
379 if *entry.get() == 0 {
380 entry.remove();
381 }
382 }
383 Entry::Vacant(_) => error!("missing entry in single_count"),
384 },
385 }
386 self.update_config();
387 }
388
389 fn update_config(&self) {
390 let mentioned_addresses = self.single_count.keys().copied().collect();
391 let config = if self.all_with_votes_count > 0 {
392 TransactionLogCollectorConfig {
393 filter: TransactionLogCollectorFilter::AllWithVotes,
394 mentioned_addresses,
395 }
396 } else if self.all_count > 0 {
397 TransactionLogCollectorConfig {
398 filter: TransactionLogCollectorFilter::All,
399 mentioned_addresses,
400 }
401 } else {
402 TransactionLogCollectorConfig {
403 filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
404 mentioned_addresses,
405 }
406 };
407
408 *self
409 .bank_forks
410 .read()
411 .unwrap()
412 .root_bank()
413 .transaction_log_collector_config
414 .write()
415 .unwrap() = config;
416 }
417}
418
419pub struct SubscriptionsTracker {
420 logs_subscriptions_index: LogsSubscriptionsIndex,
421 by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
422 commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
424 gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
426 node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
428}
429
430impl SubscriptionsTracker {
431 pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
432 SubscriptionsTracker {
433 logs_subscriptions_index: LogsSubscriptionsIndex {
434 all_count: 0,
435 all_with_votes_count: 0,
436 single_count: HashMap::new(),
437 bank_forks,
438 },
439 by_signature: HashMap::new(),
440 commitment_watchers: HashMap::new(),
441 gossip_watchers: HashMap::new(),
442 node_progress_watchers: HashMap::new(),
443 }
444 }
445
446 pub fn subscribe(
447 &mut self,
448 params: SubscriptionParams,
449 id: SubscriptionId,
450 last_notified_slot: impl FnOnce() -> Slot,
451 ) {
452 let info = Arc::new(SubscriptionInfo {
453 last_notified_slot: RwLock::new(last_notified_slot()),
454 id,
455 commitment: params.commitment(),
456 method: params.method(),
457 params: params.clone(),
458 });
459 match ¶ms {
460 SubscriptionParams::Logs(params) => {
461 self.logs_subscriptions_index.add(params);
462 }
463 SubscriptionParams::Signature(params) => {
464 self.by_signature
465 .entry(params.signature)
466 .or_default()
467 .insert(id, Arc::clone(&info));
468 }
469 _ => {}
470 }
471 if info.params.is_commitment_watcher() {
472 self.commitment_watchers.insert(id, Arc::clone(&info));
473 }
474 if info.params.is_gossip_watcher() {
475 self.gossip_watchers.insert(id, Arc::clone(&info));
476 }
477 if info.params.is_node_progress_watcher() {
478 self.node_progress_watchers
479 .insert(info.params.clone(), Arc::clone(&info));
480 }
481 }
482
483 #[allow(clippy::collapsible_if)]
484 pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
485 match ¶ms {
486 SubscriptionParams::Logs(params) => {
487 self.logs_subscriptions_index.remove(params);
488 }
489 SubscriptionParams::Signature(params) => {
490 if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
491 if entry.get_mut().remove(&id).is_none() {
492 warn!("Subscriptions inconsistency (missing entry in by_signature)");
493 }
494 if entry.get_mut().is_empty() {
495 entry.remove();
496 }
497 } else {
498 warn!("Subscriptions inconsistency (missing entry in by_signature)");
499 }
500 }
501 _ => {}
502 }
503 if params.is_commitment_watcher() {
504 if self.commitment_watchers.remove(&id).is_none() {
505 warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
506 }
507 }
508 if params.is_gossip_watcher() {
509 if self.gossip_watchers.remove(&id).is_none() {
510 warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
511 }
512 }
513 if params.is_node_progress_watcher() {
514 if self.node_progress_watchers.remove(¶ms).is_none() {
515 warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
516 }
517 }
518 }
519
520 pub fn by_signature(
521 &self,
522 ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
523 &self.by_signature
524 }
525
526 pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
527 &self.commitment_watchers
528 }
529
530 pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
531 &self.gossip_watchers
532 }
533
534 pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
535 &self.node_progress_watchers
536 }
537}
538
539struct SubscriptionTokenInner {
540 control: Arc<SubscriptionControlInner>,
541 params: SubscriptionParams,
542 id: SubscriptionId,
543}
544
545impl fmt::Debug for SubscriptionTokenInner {
546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547 f.debug_struct("SubscriptionTokenInner")
548 .field("id", &self.id)
549 .finish()
550 }
551}
552
553impl Drop for SubscriptionTokenInner {
554 #[allow(clippy::collapsible_if)]
555 fn drop(&mut self) {
556 match self.control.subscriptions.entry(self.params.clone()) {
557 DashEntry::Vacant(_) => {
558 warn!("Subscriptions inconsistency (missing entry in by_params)");
559 }
560 DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
563 let _ = self
564 .control
565 .sender
566 .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
567 entry.remove();
568 datapoint_info!(
569 "rpc-subscription",
570 ("total", self.control.subscriptions.len(), i64)
571 );
572 }
573 DashEntry::Occupied(_entry) => (),
576 }
577 }
578}
579
580#[allow(dead_code)]
583#[derive(Clone)]
584pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
585
586impl SubscriptionToken {
587 pub fn id(&self) -> SubscriptionId {
588 self.0.id
589 }
590
591 pub fn params(&self) -> &SubscriptionParams {
592 &self.0.params
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use {
599 super::*,
600 crate::rpc_pubsub_service::PubSubConfig,
601 solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
602 solana_runtime::bank::Bank,
603 };
604
605 struct ControlWrapper {
606 control: SubscriptionControl,
607 receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
608 }
609
610 impl ControlWrapper {
611 fn new() -> Self {
612 let (sender, receiver) = crossbeam_channel::unbounded();
613 let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
614
615 let control = SubscriptionControl::new(
616 PubSubConfig::default().max_active_subscriptions,
617 sender,
618 broadcast_sender,
619 );
620 Self { control, receiver }
621 }
622
623 fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
624 if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
625 assert_eq!(¶ms, expected_params);
626 assert_eq!(id, SubscriptionId::from(expected_id));
627 } else {
628 panic!("unexpected notification");
629 }
630 self.assert_silence();
631 }
632
633 fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
634 if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
635 {
636 assert_eq!(¶ms, expected_params);
637 assert_eq!(id, SubscriptionId::from(expected_id));
638 } else {
639 panic!("unexpected notification");
640 }
641 self.assert_silence();
642 }
643
644 fn assert_silence(&self) {
645 assert!(self.receiver.try_recv().is_err());
646 }
647 }
648
649 #[test]
650 fn notify_subscribe() {
651 let control = ControlWrapper::new();
652 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
653 control.assert_subscribed(&SubscriptionParams::Slot, 0);
654 drop(token1);
655 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
656 }
657
658 #[test]
659 fn notify_subscribe_multiple() {
660 let control = ControlWrapper::new();
661 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
662 control.assert_subscribed(&SubscriptionParams::Slot, 0);
663 let token2 = token1.clone();
664 drop(token1);
665 let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
666 drop(token3);
667 control.assert_silence();
668 drop(token2);
669 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
670 }
671
672 #[test]
673 fn notify_subscribe_two_subscriptions() {
674 let control = ControlWrapper::new();
675 let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
676 control.assert_subscribed(&SubscriptionParams::Slot, 0);
677
678 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
679 signature: Signature::default(),
680 commitment: CommitmentConfig::processed(),
681 enable_received_notification: false,
682 });
683 let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
684 control.assert_subscribed(&signature_params, 1);
685
686 let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
687 let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
688 drop(token_slot1);
689 control.assert_silence();
690 drop(token_slot2);
691 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
692 drop(token_signature2);
693 control.assert_silence();
694 drop(token_signature1);
695 control.assert_unsubscribed(&signature_params, 1);
696
697 let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
698 control.assert_subscribed(&SubscriptionParams::Slot, 2);
699 drop(token_slot3);
700 control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
701 }
702
703 #[test]
704 fn subscription_info() {
705 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
706 let bank = Bank::new_for_tests(&genesis_config);
707 let bank_forks = BankForks::new_rw_arc(bank);
708 let mut tracker = SubscriptionsTracker::new(bank_forks);
709
710 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
711 let info = tracker
712 .node_progress_watchers
713 .get(&SubscriptionParams::Slot)
714 .unwrap();
715 assert_eq!(info.commitment, None);
716 assert_eq!(info.params, SubscriptionParams::Slot);
717 assert_eq!(info.method, SubscriptionParams::Slot.method());
718 assert_eq!(info.id, SubscriptionId::from(0));
719 assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
720
721 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
722 pubkey: spl_generic_token::token::id(),
723 commitment: CommitmentConfig::finalized(),
724 encoding: UiAccountEncoding::Base64Zstd,
725 data_slice: None,
726 });
727 tracker.subscribe(account_params.clone(), 1.into(), || 42);
728
729 let info = tracker
730 .commitment_watchers
731 .get(&SubscriptionId::from(1))
732 .unwrap();
733 assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
734 assert_eq!(info.params, account_params);
735 assert_eq!(info.method, account_params.method());
736 assert_eq!(info.id, SubscriptionId::from(1));
737 assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
738 }
739
740 #[test]
741 fn subscription_indexes() {
742 fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
743 (
744 tracker.by_signature.len(),
745 tracker.commitment_watchers.len(),
746 tracker.gossip_watchers.len(),
747 tracker.node_progress_watchers.len(),
748 )
749 }
750
751 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
752 let bank = Bank::new_for_tests(&genesis_config);
753 let bank_forks = BankForks::new_rw_arc(bank);
754 let mut tracker = SubscriptionsTracker::new(bank_forks);
755
756 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
757 assert_eq!(counts(&tracker), (0, 0, 0, 1));
758 tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
759 assert_eq!(counts(&tracker), (0, 0, 0, 0));
760
761 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
762 pubkey: spl_generic_token::token::id(),
763 commitment: CommitmentConfig::finalized(),
764 encoding: UiAccountEncoding::Base64Zstd,
765 data_slice: None,
766 });
767 tracker.subscribe(account_params.clone(), 1.into(), || 0);
768 assert_eq!(counts(&tracker), (0, 1, 0, 0));
769 tracker.unsubscribe(account_params, 1.into());
770 assert_eq!(counts(&tracker), (0, 0, 0, 0));
771
772 let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
773 pubkey: spl_generic_token::token::id(),
774 commitment: CommitmentConfig::confirmed(),
775 encoding: UiAccountEncoding::Base64Zstd,
776 data_slice: None,
777 });
778 tracker.subscribe(account_params2.clone(), 2.into(), || 0);
779 assert_eq!(counts(&tracker), (0, 0, 1, 0));
780 tracker.unsubscribe(account_params2, 2.into());
781 assert_eq!(counts(&tracker), (0, 0, 0, 0));
782
783 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
784 signature: Signature::default(),
785 commitment: CommitmentConfig::processed(),
786 enable_received_notification: false,
787 });
788 tracker.subscribe(signature_params.clone(), 3.into(), || 0);
789 assert_eq!(counts(&tracker), (1, 1, 0, 0));
790 tracker.unsubscribe(signature_params, 3.into());
791 assert_eq!(counts(&tracker), (0, 0, 0, 0));
792 }
793}