1use {
12 crate::rpc_subscriptions::RpcSubscriptions,
13 crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
14 solana_clock::Slot,
15 solana_rpc_client_api::response::{SlotTransactionStats, SlotUpdate},
16 solana_runtime::{
17 bank::Bank, bank_forks::BankForks, dependency_tracker::DependencyTracker,
18 prioritization_fee_cache::PrioritizationFeeCache,
19 },
20 solana_time_utils::timestamp,
21 std::{
22 collections::HashSet,
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc, RwLock,
26 },
27 thread::{self, Builder, JoinHandle},
28 time::Duration,
29 },
30};
31
32pub struct OptimisticallyConfirmedBank {
33 pub bank: Arc<Bank>,
34}
35
36impl OptimisticallyConfirmedBank {
37 pub fn locked_from_bank_forks_root(bank_forks: &Arc<RwLock<BankForks>>) -> Arc<RwLock<Self>> {
38 Arc::new(RwLock::new(Self {
39 bank: bank_forks.read().unwrap().root_bank(),
40 }))
41 }
42}
43
44#[derive(Clone)]
45pub enum BankNotification {
46 OptimisticallyConfirmed(Slot),
47 Frozen(Arc<Bank>),
48 NewRootBank(Arc<Bank>),
49 NewRootedChain(Vec<Slot>),
51}
52
53#[derive(Clone, Debug)]
54pub enum SlotNotification {
55 OptimisticallyConfirmed(Slot),
56 Frozen((Slot, Slot)),
58 Root((Slot, Slot)),
60}
61
62impl std::fmt::Debug for BankNotification {
63 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64 match self {
65 BankNotification::OptimisticallyConfirmed(slot) => {
66 write!(f, "OptimisticallyConfirmed({slot:?})")
67 }
68 BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()),
69 BankNotification::NewRootBank(bank) => write!(f, "Root({})", bank.slot()),
70 BankNotification::NewRootedChain(chain) => write!(f, "RootedChain({chain:?})"),
71 }
72 }
73}
74
75pub type BankNotificationWithEventSequence = (
76 BankNotification,
77 Option<u64>, );
79
80pub type BankNotificationReceiver = Receiver<BankNotificationWithEventSequence>;
81pub type BankNotificationSender = Sender<BankNotificationWithEventSequence>;
82
83#[derive(Clone)]
84pub struct BankNotificationSenderConfig {
85 pub sender: BankNotificationSender,
86 pub should_send_parents: bool,
87 pub dependency_tracker: Option<Arc<DependencyTracker>>,
88}
89
90pub type SlotNotificationReceiver = Receiver<SlotNotification>;
91pub type SlotNotificationSender = Sender<SlotNotification>;
92
93pub struct OptimisticallyConfirmedBankTracker {
94 thread_hdl: JoinHandle<()>,
95}
96
97impl OptimisticallyConfirmedBankTracker {
98 pub fn new(
99 receiver: BankNotificationReceiver,
100 exit: Arc<AtomicBool>,
101 bank_forks: Arc<RwLock<BankForks>>,
102 optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
103 subscriptions: Arc<RpcSubscriptions>,
104 slot_notification_subscribers: Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
105 prioritization_fee_cache: Arc<PrioritizationFeeCache>,
106 dependency_tracker: Option<Arc<DependencyTracker>>,
107 ) -> Self {
108 let mut pending_optimistically_confirmed_banks = HashSet::new();
109 let mut last_notified_confirmed_slot: Slot = 0;
110 let mut highest_confirmed_slot: Slot = 0;
111 let mut newest_root_slot: Slot = 0;
112 let thread_hdl = Builder::new()
113 .name("solOpConfBnkTrk".to_string())
114 .spawn(move || loop {
115 if exit.load(Ordering::Relaxed) {
116 break;
117 }
118
119 if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification(
120 &receiver,
121 &bank_forks,
122 &optimistically_confirmed_bank,
123 &subscriptions,
124 &mut pending_optimistically_confirmed_banks,
125 &mut last_notified_confirmed_slot,
126 &mut highest_confirmed_slot,
127 &mut newest_root_slot,
128 &slot_notification_subscribers,
129 &prioritization_fee_cache,
130 &dependency_tracker,
131 ) {
132 break;
133 }
134 })
135 .unwrap();
136 Self { thread_hdl }
137 }
138
139 #[allow(clippy::too_many_arguments)]
140 fn recv_notification(
141 receiver: &Receiver<BankNotificationWithEventSequence>,
142 bank_forks: &RwLock<BankForks>,
143 optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
144 subscriptions: &RpcSubscriptions,
145 pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
146 last_notified_confirmed_slot: &mut Slot,
147 highest_confirmed_slot: &mut Slot,
148 newest_root_slot: &mut Slot,
149 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
150 prioritization_fee_cache: &PrioritizationFeeCache,
151 dependency_tracker: &Option<Arc<DependencyTracker>>,
152 ) -> Result<(), RecvTimeoutError> {
153 let notification = receiver.recv_timeout(Duration::from_secs(1))?;
154 Self::process_notification(
155 notification,
156 bank_forks,
157 optimistically_confirmed_bank,
158 subscriptions,
159 pending_optimistically_confirmed_banks,
160 last_notified_confirmed_slot,
161 highest_confirmed_slot,
162 newest_root_slot,
163 slot_notification_subscribers,
164 prioritization_fee_cache,
165 dependency_tracker,
166 );
167 Ok(())
168 }
169
170 fn notify_slot_status(
171 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
172 notification: SlotNotification,
173 ) {
174 if let Some(slot_notification_subscribers) = slot_notification_subscribers {
175 for sender in slot_notification_subscribers.read().unwrap().iter() {
176 match sender.send(notification.clone()) {
177 Ok(_) => {}
178 Err(err) => {
179 info!("Failed to send notification {notification:?}, error: {err:?}");
180 }
181 }
182 }
183 }
184 }
185
186 fn notify_or_defer(
187 subscriptions: &RpcSubscriptions,
188 bank_forks: &RwLock<BankForks>,
189 bank: &Bank,
190 last_notified_confirmed_slot: &mut Slot,
191 pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
192 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
193 prioritization_fee_cache: &PrioritizationFeeCache,
194 ) {
195 if bank.is_frozen() {
196 if bank.slot() > *last_notified_confirmed_slot {
197 debug!(
198 "notify_or_defer notifying via notify_gossip_subscribers for slot {:?}",
199 bank.slot()
200 );
201 subscriptions.notify_gossip_subscribers(bank.slot());
202 *last_notified_confirmed_slot = bank.slot();
203 Self::notify_slot_status(
204 slot_notification_subscribers,
205 SlotNotification::OptimisticallyConfirmed(bank.slot()),
206 );
207
208 prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id());
210 }
211 } else if bank.slot() > bank_forks.read().unwrap().root() {
212 pending_optimistically_confirmed_banks.insert(bank.slot());
213 debug!("notify_or_defer defer notifying for slot {:?}", bank.slot());
214 }
215 }
216
217 fn notify_or_defer_confirmed_banks(
218 subscriptions: &RpcSubscriptions,
219 bank_forks: &RwLock<BankForks>,
220 bank: Arc<Bank>,
221 slot_threshold: Slot,
222 last_notified_confirmed_slot: &mut Slot,
223 pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
224 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
225 prioritization_fee_cache: &PrioritizationFeeCache,
226 ) {
227 for confirmed_bank in bank.parents_inclusive().iter().rev() {
228 if confirmed_bank.slot() > slot_threshold {
229 debug!(
230 "Calling notify_or_defer for confirmed_bank {:?}",
231 confirmed_bank.slot()
232 );
233 Self::notify_or_defer(
234 subscriptions,
235 bank_forks,
236 confirmed_bank,
237 last_notified_confirmed_slot,
238 pending_optimistically_confirmed_banks,
239 slot_notification_subscribers,
240 prioritization_fee_cache,
241 );
242 }
243 }
244 }
245
246 fn notify_new_root_slots(
247 roots: &mut [Slot],
248 newest_root_slot: &mut Slot,
249 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
250 ) {
251 if slot_notification_subscribers.is_none() {
252 return;
253 }
254 roots.sort_unstable();
255 assert!(roots.len() >= 2);
257 for i in 1..roots.len() {
258 let root = roots[i];
259 if root > *newest_root_slot {
260 let parent = roots[i - 1];
261 debug!("Doing SlotNotification::Root for root {root}, parent: {parent}");
262 Self::notify_slot_status(
263 slot_notification_subscribers,
264 SlotNotification::Root((root, parent)),
265 );
266 *newest_root_slot = root;
267 }
268 }
269 }
270
271 #[allow(clippy::too_many_arguments)]
272 pub fn process_notification(
273 (notification, dependency_work): BankNotificationWithEventSequence,
274 bank_forks: &RwLock<BankForks>,
275 optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
276 subscriptions: &RpcSubscriptions,
277 pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
278 last_notified_confirmed_slot: &mut Slot,
279 highest_confirmed_slot: &mut Slot,
280 newest_root_slot: &mut Slot,
281 slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
282 prioritization_fee_cache: &PrioritizationFeeCache,
283 dependency_tracker: &Option<Arc<DependencyTracker>>,
284 ) {
285 debug!("received bank notification: {notification:?} event: {dependency_work:?}");
286
287 if let Some(tracker) = dependency_tracker.as_ref() {
288 if let Some(dependency_work) = dependency_work {
289 tracker.wait_for_dependency(dependency_work);
290 }
291 }
292 match notification {
293 BankNotification::OptimisticallyConfirmed(slot) => {
294 let bank = bank_forks.read().unwrap().get(slot);
295 if let Some(bank) = bank {
296 let mut w_optimistically_confirmed_bank =
297 optimistically_confirmed_bank.write().unwrap();
298
299 if bank.slot() > w_optimistically_confirmed_bank.bank.slot() && bank.is_frozen()
300 {
301 w_optimistically_confirmed_bank.bank = bank.clone();
302 }
303
304 if slot > *highest_confirmed_slot {
305 Self::notify_or_defer_confirmed_banks(
306 subscriptions,
307 bank_forks,
308 bank,
309 *highest_confirmed_slot,
310 last_notified_confirmed_slot,
311 pending_optimistically_confirmed_banks,
312 slot_notification_subscribers,
313 prioritization_fee_cache,
314 );
315
316 *highest_confirmed_slot = slot;
317 }
318 drop(w_optimistically_confirmed_bank);
319 } else if slot > bank_forks.read().unwrap().root() {
320 pending_optimistically_confirmed_banks.insert(slot);
321 } else {
322 inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1);
323 }
324
325 subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation {
327 slot,
328 timestamp: timestamp(),
329 });
330 }
334 BankNotification::Frozen(bank) => {
335 let frozen_slot = bank.slot();
336 if let Some(parent) = bank.parent() {
337 let num_successful_transactions = bank
338 .transaction_count()
339 .saturating_sub(parent.transaction_count());
340 subscriptions.notify_slot_update(SlotUpdate::Frozen {
341 slot: frozen_slot,
342 timestamp: timestamp(),
343 stats: SlotTransactionStats {
344 num_transaction_entries: bank.transaction_entries_count(),
345 num_successful_transactions,
346 num_failed_transactions: bank.transaction_error_count(),
347 max_transactions_per_entry: bank.transactions_per_entry_max(),
348 },
349 });
350
351 Self::notify_slot_status(
352 slot_notification_subscribers,
353 SlotNotification::Frozen((bank.slot(), bank.parent_slot())),
354 );
355 }
356
357 if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
358 debug!(
359 "Calling notify_gossip_subscribers to send deferred notification \
360 {frozen_slot:?}"
361 );
362
363 Self::notify_or_defer_confirmed_banks(
364 subscriptions,
365 bank_forks,
366 bank.clone(),
367 *last_notified_confirmed_slot,
368 last_notified_confirmed_slot,
369 pending_optimistically_confirmed_banks,
370 slot_notification_subscribers,
371 prioritization_fee_cache,
372 );
373
374 let mut w_optimistically_confirmed_bank =
375 optimistically_confirmed_bank.write().unwrap();
376 if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
377 w_optimistically_confirmed_bank.bank = bank;
378 }
379 drop(w_optimistically_confirmed_bank);
380 }
381 }
382 BankNotification::NewRootBank(bank) => {
383 let root_slot = bank.slot();
384 let mut w_optimistically_confirmed_bank =
385 optimistically_confirmed_bank.write().unwrap();
386 if root_slot > w_optimistically_confirmed_bank.bank.slot() {
387 w_optimistically_confirmed_bank.bank = bank;
388 }
389 drop(w_optimistically_confirmed_bank);
390
391 pending_optimistically_confirmed_banks.retain(|&s| s > root_slot);
392 }
393 BankNotification::NewRootedChain(mut roots) => {
394 Self::notify_new_root_slots(
395 &mut roots,
396 newest_root_slot,
397 slot_notification_subscribers,
398 );
399 }
400 }
401 }
402
403 pub fn close(self) -> thread::Result<()> {
404 self.join()
405 }
406
407 pub fn join(self) -> thread::Result<()> {
408 self.thread_hdl.join()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use {
415 super::*,
416 crossbeam_channel::unbounded,
417 solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
418 solana_pubkey::Pubkey,
419 solana_runtime::{commitment::BlockCommitmentCache, dependency_tracker},
420 std::sync::atomic::AtomicU64,
421 };
422
423 fn get_root_notifications(receiver: &Receiver<SlotNotification>) -> Vec<SlotNotification> {
426 let mut notifications = Vec::new();
427 while let Ok(notification) = receiver.recv_timeout(Duration::from_millis(100)) {
428 notifications.push(notification);
429 }
430 notifications
431 }
432
433 #[test]
434 fn test_process_notification() {
435 let exit = Arc::new(AtomicBool::new(false));
436 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
437 let bank = Bank::new_for_tests(&genesis_config);
438 let bank_forks = BankForks::new_rw_arc(bank);
439 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
440 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
441 bank_forks.write().unwrap().insert(bank1);
442 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
443 let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
444 bank_forks.write().unwrap().insert(bank2);
445 let bank2 = bank_forks.read().unwrap().get(2).unwrap();
446 let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
447 bank_forks.write().unwrap().insert(bank3);
448
449 let optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>> =
450 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
451
452 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
453 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
454 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
455 exit,
456 max_complete_transaction_status_slot,
457 bank_forks.clone(),
458 block_commitment_cache,
459 optimistically_confirmed_bank.clone(),
460 ));
461 let mut pending_optimistically_confirmed_banks: HashSet<u64> = HashSet::new();
462
463 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
464
465 let mut highest_confirmed_slot: Slot = 0;
466 let mut newest_root_slot: Slot = 0;
467
468 let mut last_notified_confirmed_slot: Slot = 0;
469 OptimisticallyConfirmedBankTracker::process_notification(
470 (
471 BankNotification::OptimisticallyConfirmed(2),
472 None, ),
474 &bank_forks,
475 &optimistically_confirmed_bank,
476 &subscriptions,
477 &mut pending_optimistically_confirmed_banks,
478 &mut last_notified_confirmed_slot,
479 &mut highest_confirmed_slot,
480 &mut newest_root_slot,
481 &None,
482 &PrioritizationFeeCache::default(),
483 &None, );
485 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
486 assert_eq!(highest_confirmed_slot, 2);
487
488 OptimisticallyConfirmedBankTracker::process_notification(
490 (
491 BankNotification::OptimisticallyConfirmed(1),
492 None, ),
494 &bank_forks,
495 &optimistically_confirmed_bank,
496 &subscriptions,
497 &mut pending_optimistically_confirmed_banks,
498 &mut last_notified_confirmed_slot,
499 &mut highest_confirmed_slot,
500 &mut newest_root_slot,
501 &None,
502 &PrioritizationFeeCache::default(),
503 &None, );
505 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
506 assert_eq!(highest_confirmed_slot, 2);
507
508 OptimisticallyConfirmedBankTracker::process_notification(
510 (
511 BankNotification::OptimisticallyConfirmed(3),
512 None, ),
514 &bank_forks,
515 &optimistically_confirmed_bank,
516 &subscriptions,
517 &mut pending_optimistically_confirmed_banks,
518 &mut last_notified_confirmed_slot,
519 &mut highest_confirmed_slot,
520 &mut newest_root_slot,
521 &None,
522 &PrioritizationFeeCache::default(),
523 &None, );
525 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
526 assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
527 assert!(pending_optimistically_confirmed_banks.contains(&3));
528 assert_eq!(highest_confirmed_slot, 3);
529
530 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
532 bank3.freeze();
533
534 OptimisticallyConfirmedBankTracker::process_notification(
535 (
536 BankNotification::Frozen(bank3),
537 None, ),
539 &bank_forks,
540 &optimistically_confirmed_bank,
541 &subscriptions,
542 &mut pending_optimistically_confirmed_banks,
543 &mut last_notified_confirmed_slot,
544 &mut highest_confirmed_slot,
545 &mut newest_root_slot,
546 &None,
547 &PrioritizationFeeCache::default(),
548 &None, );
550 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
551 assert_eq!(highest_confirmed_slot, 3);
552 assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
553
554 let bank3 = bank_forks.read().unwrap().get(3).unwrap();
556 let bank4 = Bank::new_from_parent(bank3, &Pubkey::default(), 4);
557 bank_forks.write().unwrap().insert(bank4);
558 OptimisticallyConfirmedBankTracker::process_notification(
559 (
560 BankNotification::OptimisticallyConfirmed(4),
561 None, ),
563 &bank_forks,
564 &optimistically_confirmed_bank,
565 &subscriptions,
566 &mut pending_optimistically_confirmed_banks,
567 &mut last_notified_confirmed_slot,
568 &mut highest_confirmed_slot,
569 &mut newest_root_slot,
570 &None,
571 &PrioritizationFeeCache::default(),
572 &None, );
574 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
575 assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
576 assert!(pending_optimistically_confirmed_banks.contains(&4));
577 assert_eq!(highest_confirmed_slot, 4);
578
579 let bank4 = bank_forks.read().unwrap().get(4).unwrap();
580 let bank5 = Bank::new_from_parent(bank4, &Pubkey::default(), 5);
581 bank_forks.write().unwrap().insert(bank5);
582 let bank5 = bank_forks.read().unwrap().get(5).unwrap();
583
584 let mut bank_notification_senders = Vec::new();
585 let (sender, receiver) = unbounded();
586 bank_notification_senders.push(sender);
587
588 let subscribers = Some(Arc::new(RwLock::new(bank_notification_senders)));
589 let parent_roots = bank5.ancestors.keys();
590
591 OptimisticallyConfirmedBankTracker::process_notification(
592 (
593 BankNotification::NewRootBank(bank5),
594 None, ),
596 &bank_forks,
597 &optimistically_confirmed_bank,
598 &subscriptions,
599 &mut pending_optimistically_confirmed_banks,
600 &mut last_notified_confirmed_slot,
601 &mut highest_confirmed_slot,
602 &mut newest_root_slot,
603 &subscribers,
604 &PrioritizationFeeCache::default(),
605 &None, );
607 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
608 assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
609 assert!(!pending_optimistically_confirmed_banks.contains(&4));
610 assert_eq!(highest_confirmed_slot, 4);
611 assert_eq!(newest_root_slot, 0);
613
614 OptimisticallyConfirmedBankTracker::process_notification(
615 (
616 BankNotification::NewRootedChain(parent_roots),
617 None, ),
619 &bank_forks,
620 &optimistically_confirmed_bank,
621 &subscriptions,
622 &mut pending_optimistically_confirmed_banks,
623 &mut last_notified_confirmed_slot,
624 &mut highest_confirmed_slot,
625 &mut newest_root_slot,
626 &subscribers,
627 &PrioritizationFeeCache::default(),
628 &None, );
630
631 assert_eq!(newest_root_slot, 5);
632
633 let notifications = get_root_notifications(&receiver);
635 assert_eq!(notifications.len(), 5);
636
637 let bank5 = bank_forks.read().unwrap().get(5).unwrap();
639 let bank6 = Bank::new_from_parent(bank5, &Pubkey::default(), 6);
640 bank_forks.write().unwrap().insert(bank6);
641 let bank5 = bank_forks.read().unwrap().get(5).unwrap();
642 let bank7 = Bank::new_from_parent(bank5, &Pubkey::default(), 7);
643 bank_forks.write().unwrap().insert(bank7);
644 bank_forks.write().unwrap().set_root(7, None, None).unwrap();
645 OptimisticallyConfirmedBankTracker::process_notification(
646 (
647 BankNotification::OptimisticallyConfirmed(6),
648 None, ),
650 &bank_forks,
651 &optimistically_confirmed_bank,
652 &subscriptions,
653 &mut pending_optimistically_confirmed_banks,
654 &mut last_notified_confirmed_slot,
655 &mut highest_confirmed_slot,
656 &mut newest_root_slot,
657 &None,
658 &PrioritizationFeeCache::default(),
659 &None, );
661 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
662 assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
663 assert!(!pending_optimistically_confirmed_banks.contains(&6));
664 assert_eq!(highest_confirmed_slot, 4);
665 assert_eq!(newest_root_slot, 5);
666
667 let bank7 = bank_forks.read().unwrap().get(7).unwrap();
668 let parent_roots = bank7.ancestors.keys();
669
670 OptimisticallyConfirmedBankTracker::process_notification(
671 (
672 BankNotification::NewRootBank(bank7),
673 None, ),
675 &bank_forks,
676 &optimistically_confirmed_bank,
677 &subscriptions,
678 &mut pending_optimistically_confirmed_banks,
679 &mut last_notified_confirmed_slot,
680 &mut highest_confirmed_slot,
681 &mut newest_root_slot,
682 &subscribers,
683 &PrioritizationFeeCache::default(),
684 &None, );
686 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 7);
687 assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
688 assert!(!pending_optimistically_confirmed_banks.contains(&6));
689 assert_eq!(highest_confirmed_slot, 4);
690 assert_eq!(newest_root_slot, 5);
691
692 OptimisticallyConfirmedBankTracker::process_notification(
693 (
694 BankNotification::NewRootedChain(parent_roots),
695 None, ),
697 &bank_forks,
698 &optimistically_confirmed_bank,
699 &subscriptions,
700 &mut pending_optimistically_confirmed_banks,
701 &mut last_notified_confirmed_slot,
702 &mut highest_confirmed_slot,
703 &mut newest_root_slot,
704 &subscribers,
705 &PrioritizationFeeCache::default(),
706 &None, );
708
709 assert_eq!(newest_root_slot, 7);
710
711 let notifications = get_root_notifications(&receiver);
713 assert_eq!(notifications.len(), 1);
714 }
715
716 #[test]
717 fn test_event_synchronization() {
718 let exit = Arc::new(AtomicBool::new(false));
719 let dependency_tracker: Arc<DependencyTracker> =
720 Arc::new(dependency_tracker::DependencyTracker::default());
721 let work_sequence_1 = 345;
722 let work_sequence_2 = 678;
723 let tracker_clone = dependency_tracker.clone();
724 let handle = thread::spawn(move || {
725 let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
726 let bank = Bank::new_for_tests(&genesis_config);
727 let bank_forks = BankForks::new_rw_arc(bank);
728
729 let bank0 = bank_forks.read().unwrap().get(0).unwrap();
731 let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
732 bank_forks.write().unwrap().insert(bank1);
733
734 let mut pending_optimistically_confirmed_banks: HashSet<u64> = HashSet::new();
735 let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
736
737 let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
738
739 let mut highest_confirmed_slot: Slot = 0;
740 let mut newest_root_slot: Slot = 0;
741
742 let mut last_notified_confirmed_slot: Slot = 0;
743
744 let optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>> =
745 OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
746
747 let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
748 exit,
749 max_complete_transaction_status_slot,
750 bank_forks.clone(),
751 block_commitment_cache,
752 optimistically_confirmed_bank.clone(),
753 ));
754
755 OptimisticallyConfirmedBankTracker::process_notification(
757 (
758 BankNotification::OptimisticallyConfirmed(1),
759 Some(work_sequence_1), ),
761 &bank_forks,
762 &optimistically_confirmed_bank,
763 &subscriptions,
764 &mut pending_optimistically_confirmed_banks,
765 &mut last_notified_confirmed_slot,
766 &mut highest_confirmed_slot,
767 &mut newest_root_slot,
768 &None,
769 &PrioritizationFeeCache::default(),
770 &Some(tracker_clone.clone()),
771 );
772
773 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
774 assert_eq!(highest_confirmed_slot, 1);
776 assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
777
778 let bank1 = bank_forks.read().unwrap().get(1).unwrap();
779 bank1.freeze();
780
781 OptimisticallyConfirmedBankTracker::process_notification(
782 (
783 BankNotification::Frozen(bank1),
784 Some(work_sequence_2), ),
786 &bank_forks,
787 &optimistically_confirmed_bank,
788 &subscriptions,
789 &mut pending_optimistically_confirmed_banks,
790 &mut last_notified_confirmed_slot,
791 &mut highest_confirmed_slot,
792 &mut newest_root_slot,
793 &None,
794 &PrioritizationFeeCache::default(),
795 &Some(tracker_clone),
796 );
797
798 assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 1);
799 assert_eq!(highest_confirmed_slot, 1);
800 assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
801 });
802
803 dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_1);
804 dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_2);
805
806 handle.join().unwrap();
807 }
808}