1use {
2 crate::rpc_subscriptions::{NotificationEntry, RpcNotification, TimestampedNotificationEntry},
3 dashmap::{mapref::entry::Entry as DashEntry, DashMap},
4 solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig},
5 solana_clock::Slot,
6 solana_commitment_config::CommitmentConfig,
7 solana_metrics::{CounterToken, TokenCounter},
8 solana_pubkey::Pubkey,
9 solana_rpc_client_api::filter::RpcFilterType,
10 solana_runtime::{
11 bank::{TransactionLogCollectorConfig, TransactionLogCollectorFilter},
12 bank_forks::BankForks,
13 },
14 solana_signature::Signature,
15 solana_transaction_status::{TransactionDetails, UiTransactionEncoding},
16 std::{
17 collections::hash_map::{Entry, HashMap},
18 fmt,
19 sync::{
20 atomic::{AtomicU64, Ordering},
21 Arc, RwLock, Weak,
22 },
23 },
24 thiserror::Error,
25 tokio::sync::broadcast,
26};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
29pub struct SubscriptionId(u64);
30
31impl From<u64> for SubscriptionId {
32 fn from(value: u64) -> Self {
33 SubscriptionId(value)
34 }
35}
36
37impl From<SubscriptionId> for u64 {
38 fn from(value: SubscriptionId) -> Self {
39 value.0
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum SubscriptionParams {
45 Account(AccountSubscriptionParams),
46 Block(BlockSubscriptionParams),
47 Logs(LogsSubscriptionParams),
48 Program(ProgramSubscriptionParams),
49 Signature(SignatureSubscriptionParams),
50 Slot,
51 SlotsUpdates,
52 Root,
53 Vote,
54}
55
56impl SubscriptionParams {
57 fn method(&self) -> &'static str {
58 match self {
59 SubscriptionParams::Account(_) => "accountNotification",
60 SubscriptionParams::Logs(_) => "logsNotification",
61 SubscriptionParams::Program(_) => "programNotification",
62 SubscriptionParams::Signature(_) => "signatureNotification",
63 SubscriptionParams::Slot => "slotNotification",
64 SubscriptionParams::SlotsUpdates => "slotsUpdatesNotification",
65 SubscriptionParams::Block(_) => "blockNotification",
66 SubscriptionParams::Root => "rootNotification",
67 SubscriptionParams::Vote => "voteNotification",
68 }
69 }
70
71 fn commitment(&self) -> Option<CommitmentConfig> {
72 match self {
73 SubscriptionParams::Account(params) => Some(params.commitment),
74 SubscriptionParams::Logs(params) => Some(params.commitment),
75 SubscriptionParams::Program(params) => Some(params.commitment),
76 SubscriptionParams::Signature(params) => Some(params.commitment),
77 SubscriptionParams::Block(params) => Some(params.commitment),
78 SubscriptionParams::Slot
79 | SubscriptionParams::SlotsUpdates
80 | SubscriptionParams::Root
81 | SubscriptionParams::Vote => None,
82 }
83 }
84
85 fn is_commitment_watcher(&self) -> bool {
86 let commitment = match self {
87 SubscriptionParams::Account(params) => ¶ms.commitment,
88 SubscriptionParams::Block(params) => ¶ms.commitment,
89 SubscriptionParams::Logs(params) => ¶ms.commitment,
90 SubscriptionParams::Program(params) => ¶ms.commitment,
91 SubscriptionParams::Signature(params) => ¶ms.commitment,
92 SubscriptionParams::Root
93 | SubscriptionParams::Slot
94 | SubscriptionParams::SlotsUpdates
95 | SubscriptionParams::Vote => return false,
96 };
97 !commitment.is_confirmed()
98 }
99
100 fn is_gossip_watcher(&self) -> bool {
101 let commitment = match self {
102 SubscriptionParams::Account(params) => ¶ms.commitment,
103 SubscriptionParams::Block(params) => ¶ms.commitment,
104 SubscriptionParams::Logs(params) => ¶ms.commitment,
105 SubscriptionParams::Program(params) => ¶ms.commitment,
106 SubscriptionParams::Signature(params) => ¶ms.commitment,
107 SubscriptionParams::Root
108 | SubscriptionParams::Slot
109 | SubscriptionParams::SlotsUpdates
110 | SubscriptionParams::Vote => return false,
111 };
112 commitment.is_confirmed()
113 }
114
115 fn is_node_progress_watcher(&self) -> bool {
116 matches!(
117 self,
118 SubscriptionParams::Slot
119 | SubscriptionParams::SlotsUpdates
120 | SubscriptionParams::Root
121 | SubscriptionParams::Vote
122 )
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Hash)]
127pub struct AccountSubscriptionParams {
128 pub pubkey: Pubkey,
129 pub encoding: UiAccountEncoding,
130 pub data_slice: Option<UiDataSliceConfig>,
131 pub commitment: CommitmentConfig,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Hash)]
135pub struct BlockSubscriptionParams {
136 pub commitment: CommitmentConfig,
137 pub encoding: UiTransactionEncoding,
138 pub kind: BlockSubscriptionKind,
139 pub transaction_details: TransactionDetails,
140 pub show_rewards: bool,
141 pub max_supported_transaction_version: Option<u8>,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash)]
145pub enum BlockSubscriptionKind {
146 All,
147 MentionsAccountOrProgram(Pubkey),
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Hash)]
151pub struct LogsSubscriptionParams {
152 pub kind: LogsSubscriptionKind,
153 pub commitment: CommitmentConfig,
154}
155
156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum LogsSubscriptionKind {
158 All,
159 AllWithVotes,
160 Single(Pubkey),
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164pub struct ProgramSubscriptionParams {
165 pub pubkey: Pubkey,
166 pub filters: Vec<RpcFilterType>,
167 pub encoding: UiAccountEncoding,
168 pub data_slice: Option<UiDataSliceConfig>,
169 pub commitment: CommitmentConfig,
170 pub with_context: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Hash)]
174pub struct SignatureSubscriptionParams {
175 pub signature: Signature,
176 pub commitment: CommitmentConfig,
177 pub enable_received_notification: bool,
178}
179
180#[derive(Clone)]
181pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
182pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);
183
184struct SubscriptionControlInner {
185 subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
186 next_id: AtomicU64,
187 max_active_subscriptions: usize,
188 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
189 broadcast_sender: broadcast::Sender<RpcNotification>,
190 counter: TokenCounter,
191}
192
193impl SubscriptionControl {
194 pub fn new(
195 max_active_subscriptions: usize,
196 sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
197 broadcast_sender: broadcast::Sender<RpcNotification>,
198 ) -> Self {
199 Self(Arc::new(SubscriptionControlInner {
200 subscriptions: DashMap::new(),
201 next_id: AtomicU64::new(0),
202 max_active_subscriptions,
203 sender,
204 broadcast_sender,
205 counter: TokenCounter::new("rpc_pubsub_total_subscriptions"),
206 }))
207 }
208
209 pub fn broadcast_receiver(&self) -> broadcast::Receiver<RpcNotification> {
210 self.0.broadcast_sender.subscribe()
211 }
212
213 pub fn subscribe(&self, params: SubscriptionParams) -> Result<SubscriptionToken, Error> {
214 debug!(
215 "Total existing subscriptions: {}",
216 self.0.subscriptions.len()
217 );
218 let count = self.0.subscriptions.len();
219 let create_token_and_weak_ref = |id, params| {
220 let token = SubscriptionToken(
221 Arc::new(SubscriptionTokenInner {
222 control: Arc::clone(&self.0),
223 params,
224 id,
225 }),
226 self.0.counter.create_token(),
227 );
228 let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
229 (token, weak_ref)
230 };
231
232 match self.0.subscriptions.entry(params) {
233 DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
234 Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
235 None => {
239 let (token, weak_ref) =
240 create_token_and_weak_ref(entry.get().1, entry.key().clone());
241 entry.insert(weak_ref);
242 Ok(token)
243 }
244 },
245 DashEntry::Vacant(entry) => {
246 if count >= self.0.max_active_subscriptions {
247 inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
248 return Err(Error::TooManySubscriptions);
249 }
250 let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
251 let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
252 let _ = self
253 .0
254 .sender
255 .send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
256 entry.insert(weak_ref);
257 datapoint_info!(
258 "rpc-subscription",
259 ("total", self.0.subscriptions.len(), i64)
260 );
261 Ok(token)
262 }
263 }
264 }
265
266 pub fn total(&self) -> usize {
267 self.0.subscriptions.len()
268 }
269
270 #[cfg(test)]
271 pub fn assert_subscribed(&self, params: &SubscriptionParams) {
272 assert!(self.0.subscriptions.contains_key(params));
273 }
274
275 #[cfg(test)]
276 pub fn assert_unsubscribed(&self, params: &SubscriptionParams) {
277 assert!(!self.0.subscriptions.contains_key(params));
278 }
279
280 #[cfg(test)]
281 pub fn account_subscribed(&self, pubkey: &Pubkey) -> bool {
282 self.0.subscriptions.iter().any(|item| {
283 if let SubscriptionParams::Account(params) = item.key() {
284 ¶ms.pubkey == pubkey
285 } else {
286 false
287 }
288 })
289 }
290
291 #[cfg(test)]
292 pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool {
293 self.0.subscriptions.iter().any(|item| {
294 if let SubscriptionParams::Logs(params) = item.key() {
295 let subscribed_pubkey = match ¶ms.kind {
296 LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None,
297 LogsSubscriptionKind::Single(pubkey) => Some(pubkey),
298 };
299 subscribed_pubkey == pubkey
300 } else {
301 false
302 }
303 })
304 }
305
306 #[cfg(test)]
307 pub fn signature_subscribed(&self, signature: &Signature) -> bool {
308 self.0.subscriptions.iter().any(|item| {
309 if let SubscriptionParams::Signature(params) = item.key() {
310 ¶ms.signature == signature
311 } else {
312 false
313 }
314 })
315 }
316}
317
318#[derive(Debug)]
319pub struct SubscriptionInfo {
320 id: SubscriptionId,
321 params: SubscriptionParams,
322 method: &'static str,
323 pub last_notified_slot: RwLock<Slot>,
324 commitment: Option<CommitmentConfig>,
325}
326
327impl SubscriptionInfo {
328 pub fn id(&self) -> SubscriptionId {
329 self.id
330 }
331
332 pub fn method(&self) -> &'static str {
333 self.method
334 }
335
336 pub fn params(&self) -> &SubscriptionParams {
337 &self.params
338 }
339
340 pub fn commitment(&self) -> Option<CommitmentConfig> {
341 self.commitment
342 }
343}
344
345#[derive(Debug, Error)]
346pub enum Error {
347 #[error("node subscription limit reached")]
348 TooManySubscriptions,
349}
350
351struct LogsSubscriptionsIndex {
352 all_count: usize,
353 all_with_votes_count: usize,
354 single_count: HashMap<Pubkey, usize>,
355
356 bank_forks: Arc<RwLock<BankForks>>,
357}
358
359impl LogsSubscriptionsIndex {
360 fn add(&mut self, params: &LogsSubscriptionParams) {
361 match params.kind {
362 LogsSubscriptionKind::All => self.all_count += 1,
363 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count += 1,
364 LogsSubscriptionKind::Single(key) => {
365 *self.single_count.entry(key).or_default() += 1;
366 }
367 }
368 self.update_config();
369 }
370
371 fn remove(&mut self, params: &LogsSubscriptionParams) {
372 match params.kind {
373 LogsSubscriptionKind::All => self.all_count -= 1,
374 LogsSubscriptionKind::AllWithVotes => self.all_with_votes_count -= 1,
375 LogsSubscriptionKind::Single(key) => match self.single_count.entry(key) {
376 Entry::Occupied(mut entry) => {
377 *entry.get_mut() -= 1;
378 if *entry.get() == 0 {
379 entry.remove();
380 }
381 }
382 Entry::Vacant(_) => error!("missing entry in single_count"),
383 },
384 }
385 self.update_config();
386 }
387
388 fn update_config(&self) {
389 let mentioned_addresses = self.single_count.keys().copied().collect();
390 let config = if self.all_with_votes_count > 0 {
391 TransactionLogCollectorConfig {
392 filter: TransactionLogCollectorFilter::AllWithVotes,
393 mentioned_addresses,
394 }
395 } else if self.all_count > 0 {
396 TransactionLogCollectorConfig {
397 filter: TransactionLogCollectorFilter::All,
398 mentioned_addresses,
399 }
400 } else {
401 TransactionLogCollectorConfig {
402 filter: TransactionLogCollectorFilter::OnlyMentionedAddresses,
403 mentioned_addresses,
404 }
405 };
406
407 *self
408 .bank_forks
409 .read()
410 .unwrap()
411 .root_bank()
412 .transaction_log_collector_config
413 .write()
414 .unwrap() = config;
415 }
416}
417
418pub struct SubscriptionsTracker {
419 logs_subscriptions_index: LogsSubscriptionsIndex,
420 by_signature: HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>>,
421 commitment_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
423 gossip_watchers: HashMap<SubscriptionId, Arc<SubscriptionInfo>>,
425 node_progress_watchers: HashMap<SubscriptionParams, Arc<SubscriptionInfo>>,
427}
428
429impl SubscriptionsTracker {
430 pub fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
431 SubscriptionsTracker {
432 logs_subscriptions_index: LogsSubscriptionsIndex {
433 all_count: 0,
434 all_with_votes_count: 0,
435 single_count: HashMap::new(),
436 bank_forks,
437 },
438 by_signature: HashMap::new(),
439 commitment_watchers: HashMap::new(),
440 gossip_watchers: HashMap::new(),
441 node_progress_watchers: HashMap::new(),
442 }
443 }
444
445 pub fn subscribe(
446 &mut self,
447 params: SubscriptionParams,
448 id: SubscriptionId,
449 last_notified_slot: impl FnOnce() -> Slot,
450 ) {
451 let info = Arc::new(SubscriptionInfo {
452 last_notified_slot: RwLock::new(last_notified_slot()),
453 id,
454 commitment: params.commitment(),
455 method: params.method(),
456 params: params.clone(),
457 });
458 match ¶ms {
459 SubscriptionParams::Logs(params) => {
460 self.logs_subscriptions_index.add(params);
461 }
462 SubscriptionParams::Signature(params) => {
463 self.by_signature
464 .entry(params.signature)
465 .or_default()
466 .insert(id, Arc::clone(&info));
467 }
468 _ => {}
469 }
470 if info.params.is_commitment_watcher() {
471 self.commitment_watchers.insert(id, Arc::clone(&info));
472 }
473 if info.params.is_gossip_watcher() {
474 self.gossip_watchers.insert(id, Arc::clone(&info));
475 }
476 if info.params.is_node_progress_watcher() {
477 self.node_progress_watchers
478 .insert(info.params.clone(), Arc::clone(&info));
479 }
480 }
481
482 #[allow(clippy::collapsible_if)]
483 pub fn unsubscribe(&mut self, params: SubscriptionParams, id: SubscriptionId) {
484 match ¶ms {
485 SubscriptionParams::Logs(params) => {
486 self.logs_subscriptions_index.remove(params);
487 }
488 SubscriptionParams::Signature(params) => {
489 if let Entry::Occupied(mut entry) = self.by_signature.entry(params.signature) {
490 if entry.get_mut().remove(&id).is_none() {
491 warn!("Subscriptions inconsistency (missing entry in by_signature)");
492 }
493 if entry.get_mut().is_empty() {
494 entry.remove();
495 }
496 } else {
497 warn!("Subscriptions inconsistency (missing entry in by_signature)");
498 }
499 }
500 _ => {}
501 }
502 if params.is_commitment_watcher() {
503 if self.commitment_watchers.remove(&id).is_none() {
504 warn!("Subscriptions inconsistency (missing entry in commitment_watchers)");
505 }
506 }
507 if params.is_gossip_watcher() {
508 if self.gossip_watchers.remove(&id).is_none() {
509 warn!("Subscriptions inconsistency (missing entry in gossip_watchers)");
510 }
511 }
512 if params.is_node_progress_watcher() {
513 if self.node_progress_watchers.remove(¶ms).is_none() {
514 warn!("Subscriptions inconsistency (missing entry in node_progress_watchers)");
515 }
516 }
517 }
518
519 pub fn by_signature(
520 &self,
521 ) -> &HashMap<Signature, HashMap<SubscriptionId, Arc<SubscriptionInfo>>> {
522 &self.by_signature
523 }
524
525 pub fn commitment_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
526 &self.commitment_watchers
527 }
528
529 pub fn gossip_watchers(&self) -> &HashMap<SubscriptionId, Arc<SubscriptionInfo>> {
530 &self.gossip_watchers
531 }
532
533 pub fn node_progress_watchers(&self) -> &HashMap<SubscriptionParams, Arc<SubscriptionInfo>> {
534 &self.node_progress_watchers
535 }
536}
537
538struct SubscriptionTokenInner {
539 control: Arc<SubscriptionControlInner>,
540 params: SubscriptionParams,
541 id: SubscriptionId,
542}
543
544impl fmt::Debug for SubscriptionTokenInner {
545 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
546 f.debug_struct("SubscriptionTokenInner")
547 .field("id", &self.id)
548 .finish()
549 }
550}
551
552impl Drop for SubscriptionTokenInner {
553 #[allow(clippy::collapsible_if)]
554 fn drop(&mut self) {
555 match self.control.subscriptions.entry(self.params.clone()) {
556 DashEntry::Vacant(_) => {
557 warn!("Subscriptions inconsistency (missing entry in by_params)");
558 }
559 DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
562 let _ = self
563 .control
564 .sender
565 .send(NotificationEntry::Unsubscribed(self.params.clone(), self.id).into());
566 entry.remove();
567 datapoint_info!(
568 "rpc-subscription",
569 ("total", self.control.subscriptions.len(), i64)
570 );
571 }
572 DashEntry::Occupied(_entry) => (),
575 }
576 }
577}
578
579#[allow(dead_code)]
582#[derive(Clone)]
583pub struct SubscriptionToken(Arc<SubscriptionTokenInner>, CounterToken);
584
585impl SubscriptionToken {
586 pub fn id(&self) -> SubscriptionId {
587 self.0.id
588 }
589
590 pub fn params(&self) -> &SubscriptionParams {
591 &self.0.params
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use {
598 super::*,
599 crate::rpc_pubsub_service::PubSubConfig,
600 solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
601 solana_runtime::bank::Bank,
602 };
603
604 struct ControlWrapper {
605 control: SubscriptionControl,
606 receiver: crossbeam_channel::Receiver<TimestampedNotificationEntry>,
607 }
608
609 impl ControlWrapper {
610 fn new() -> Self {
611 let (sender, receiver) = crossbeam_channel::unbounded();
612 let (broadcast_sender, _broadcast_receiver) = broadcast::channel(42);
613
614 let control = SubscriptionControl::new(
615 PubSubConfig::default().max_active_subscriptions,
616 sender,
617 broadcast_sender,
618 );
619 Self { control, receiver }
620 }
621
622 fn assert_subscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
623 if let NotificationEntry::Subscribed(params, id) = self.receiver.recv().unwrap().entry {
624 assert_eq!(¶ms, expected_params);
625 assert_eq!(id, SubscriptionId::from(expected_id));
626 } else {
627 panic!("unexpected notification");
628 }
629 self.assert_silence();
630 }
631
632 fn assert_unsubscribed(&self, expected_params: &SubscriptionParams, expected_id: u64) {
633 if let NotificationEntry::Unsubscribed(params, id) = self.receiver.recv().unwrap().entry
634 {
635 assert_eq!(¶ms, expected_params);
636 assert_eq!(id, SubscriptionId::from(expected_id));
637 } else {
638 panic!("unexpected notification");
639 }
640 self.assert_silence();
641 }
642
643 fn assert_silence(&self) {
644 assert!(self.receiver.try_recv().is_err());
645 }
646 }
647
648 #[test]
649 fn notify_subscribe() {
650 let control = ControlWrapper::new();
651 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
652 control.assert_subscribed(&SubscriptionParams::Slot, 0);
653 drop(token1);
654 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
655 }
656
657 #[test]
658 fn notify_subscribe_multiple() {
659 let control = ControlWrapper::new();
660 let token1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
661 control.assert_subscribed(&SubscriptionParams::Slot, 0);
662 let token2 = token1.clone();
663 drop(token1);
664 let token3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
665 drop(token3);
666 control.assert_silence();
667 drop(token2);
668 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
669 }
670
671 #[test]
672 fn notify_subscribe_two_subscriptions() {
673 let control = ControlWrapper::new();
674 let token_slot1 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
675 control.assert_subscribed(&SubscriptionParams::Slot, 0);
676
677 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
678 signature: Signature::default(),
679 commitment: CommitmentConfig::processed(),
680 enable_received_notification: false,
681 });
682 let token_signature1 = control.control.subscribe(signature_params.clone()).unwrap();
683 control.assert_subscribed(&signature_params, 1);
684
685 let token_slot2 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
686 let token_signature2 = control.control.subscribe(signature_params.clone()).unwrap();
687 drop(token_slot1);
688 control.assert_silence();
689 drop(token_slot2);
690 control.assert_unsubscribed(&SubscriptionParams::Slot, 0);
691 drop(token_signature2);
692 control.assert_silence();
693 drop(token_signature1);
694 control.assert_unsubscribed(&signature_params, 1);
695
696 let token_slot3 = control.control.subscribe(SubscriptionParams::Slot).unwrap();
697 control.assert_subscribed(&SubscriptionParams::Slot, 2);
698 drop(token_slot3);
699 control.assert_unsubscribed(&SubscriptionParams::Slot, 2);
700 }
701
702 #[test]
703 fn subscription_info() {
704 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
705 let bank = Bank::new_for_tests(&genesis_config);
706 let bank_forks = BankForks::new_rw_arc(bank);
707 let mut tracker = SubscriptionsTracker::new(bank_forks);
708
709 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
710 let info = tracker
711 .node_progress_watchers
712 .get(&SubscriptionParams::Slot)
713 .unwrap();
714 assert_eq!(info.commitment, None);
715 assert_eq!(info.params, SubscriptionParams::Slot);
716 assert_eq!(info.method, SubscriptionParams::Slot.method());
717 assert_eq!(info.id, SubscriptionId::from(0));
718 assert_eq!(*info.last_notified_slot.read().unwrap(), 0);
719
720 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
721 pubkey: spl_generic_token::token::id(),
722 commitment: CommitmentConfig::finalized(),
723 encoding: UiAccountEncoding::Base64Zstd,
724 data_slice: None,
725 });
726 tracker.subscribe(account_params.clone(), 1.into(), || 42);
727
728 let info = tracker
729 .commitment_watchers
730 .get(&SubscriptionId::from(1))
731 .unwrap();
732 assert_eq!(info.commitment, Some(CommitmentConfig::finalized()));
733 assert_eq!(info.params, account_params);
734 assert_eq!(info.method, account_params.method());
735 assert_eq!(info.id, SubscriptionId::from(1));
736 assert_eq!(*info.last_notified_slot.read().unwrap(), 42);
737 }
738
739 #[test]
740 fn subscription_indexes() {
741 fn counts(tracker: &SubscriptionsTracker) -> (usize, usize, usize, usize) {
742 (
743 tracker.by_signature.len(),
744 tracker.commitment_watchers.len(),
745 tracker.gossip_watchers.len(),
746 tracker.node_progress_watchers.len(),
747 )
748 }
749
750 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
751 let bank = Bank::new_for_tests(&genesis_config);
752 let bank_forks = BankForks::new_rw_arc(bank);
753 let mut tracker = SubscriptionsTracker::new(bank_forks);
754
755 tracker.subscribe(SubscriptionParams::Slot, 0.into(), || 0);
756 assert_eq!(counts(&tracker), (0, 0, 0, 1));
757 tracker.unsubscribe(SubscriptionParams::Slot, 0.into());
758 assert_eq!(counts(&tracker), (0, 0, 0, 0));
759
760 let account_params = SubscriptionParams::Account(AccountSubscriptionParams {
761 pubkey: spl_generic_token::token::id(),
762 commitment: CommitmentConfig::finalized(),
763 encoding: UiAccountEncoding::Base64Zstd,
764 data_slice: None,
765 });
766 tracker.subscribe(account_params.clone(), 1.into(), || 0);
767 assert_eq!(counts(&tracker), (0, 1, 0, 0));
768 tracker.unsubscribe(account_params, 1.into());
769 assert_eq!(counts(&tracker), (0, 0, 0, 0));
770
771 let account_params2 = SubscriptionParams::Account(AccountSubscriptionParams {
772 pubkey: spl_generic_token::token::id(),
773 commitment: CommitmentConfig::confirmed(),
774 encoding: UiAccountEncoding::Base64Zstd,
775 data_slice: None,
776 });
777 tracker.subscribe(account_params2.clone(), 2.into(), || 0);
778 assert_eq!(counts(&tracker), (0, 0, 1, 0));
779 tracker.unsubscribe(account_params2, 2.into());
780 assert_eq!(counts(&tracker), (0, 0, 0, 0));
781
782 let signature_params = SubscriptionParams::Signature(SignatureSubscriptionParams {
783 signature: Signature::default(),
784 commitment: CommitmentConfig::processed(),
785 enable_received_notification: false,
786 });
787 tracker.subscribe(signature_params.clone(), 3.into(), || 0);
788 assert_eq!(counts(&tracker), (1, 1, 0, 0));
789 tracker.unsubscribe(signature_params, 3.into());
790 assert_eq!(counts(&tracker), (0, 0, 0, 0));
791 }
792}