solana_rpc/
optimistically_confirmed_bank_tracker.rs

1//! The `optimistically_confirmed_bank_tracker` module implements a threaded service to track the
2//! most recent optimistically confirmed bank for use in rpc services, and triggers gossip
3//! subscription notifications.
4//! This module also supports notifying of slot status for subscribers using the SlotNotificationSender.
5//! It receives the BankNotification events, transforms them into SlotNotification and sends them via
6//! SlotNotificationSender in the following way:
7//! BankNotification::OptimisticallyConfirmed --> SlotNotification::OptimisticallyConfirmed
8//! BankNotification::Frozen --> SlotNotification::Frozen
9//! BankNotification::NewRootedChain --> SlotNotification::Root for the roots in the chain.
10
11use {
12    crate::rpc_subscriptions::RpcSubscriptions,
13    crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
14    solana_clock::Slot,
15    solana_rpc_client_api::response::{SlotTransactionStats, SlotUpdate},
16    solana_runtime::{
17        bank::Bank, bank_forks::BankForks, dependency_tracker::DependencyTracker,
18        prioritization_fee_cache::PrioritizationFeeCache,
19    },
20    solana_time_utils::timestamp,
21    std::{
22        collections::HashSet,
23        sync::{
24            atomic::{AtomicBool, Ordering},
25            Arc, RwLock,
26        },
27        thread::{self, Builder, JoinHandle},
28        time::Duration,
29    },
30};
31
32pub struct OptimisticallyConfirmedBank {
33    pub bank: Arc<Bank>,
34}
35
36impl OptimisticallyConfirmedBank {
37    pub fn locked_from_bank_forks_root(bank_forks: &Arc<RwLock<BankForks>>) -> Arc<RwLock<Self>> {
38        Arc::new(RwLock::new(Self {
39            bank: bank_forks.read().unwrap().root_bank(),
40        }))
41    }
42}
43
44#[derive(Clone)]
45pub enum BankNotification {
46    OptimisticallyConfirmed(Slot),
47    Frozen(Arc<Bank>),
48    NewRootBank(Arc<Bank>),
49    /// The newly rooted slot chain including the parent slot of the oldest bank in the rooted chain.
50    NewRootedChain(Vec<Slot>),
51}
52
53#[derive(Clone, Debug)]
54pub enum SlotNotification {
55    OptimisticallyConfirmed(Slot),
56    /// The (Slot, Parent Slot) pair for the slot frozen
57    Frozen((Slot, Slot)),
58    /// The (Slot, Parent Slot) pair for the root slot
59    Root((Slot, Slot)),
60}
61
62impl std::fmt::Debug for BankNotification {
63    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
64        match self {
65            BankNotification::OptimisticallyConfirmed(slot) => {
66                write!(f, "OptimisticallyConfirmed({slot:?})")
67            }
68            BankNotification::Frozen(bank) => write!(f, "Frozen({})", bank.slot()),
69            BankNotification::NewRootBank(bank) => write!(f, "Root({})", bank.slot()),
70            BankNotification::NewRootedChain(chain) => write!(f, "RootedChain({chain:?})"),
71        }
72    }
73}
74
75pub type BankNotificationWithEventSequence = (
76    BankNotification,
77    Option<u64>, // dependecy work sequence number
78);
79
80pub type BankNotificationReceiver = Receiver<BankNotificationWithEventSequence>;
81pub type BankNotificationSender = Sender<BankNotificationWithEventSequence>;
82
83#[derive(Clone)]
84pub struct BankNotificationSenderConfig {
85    pub sender: BankNotificationSender,
86    pub should_send_parents: bool,
87    pub dependency_tracker: Option<Arc<DependencyTracker>>,
88}
89
90pub type SlotNotificationReceiver = Receiver<SlotNotification>;
91pub type SlotNotificationSender = Sender<SlotNotification>;
92
93pub struct OptimisticallyConfirmedBankTracker {
94    thread_hdl: JoinHandle<()>,
95}
96
97impl OptimisticallyConfirmedBankTracker {
98    pub fn new(
99        receiver: BankNotificationReceiver,
100        exit: Arc<AtomicBool>,
101        bank_forks: Arc<RwLock<BankForks>>,
102        optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
103        subscriptions: Arc<RpcSubscriptions>,
104        slot_notification_subscribers: Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
105        prioritization_fee_cache: Arc<PrioritizationFeeCache>,
106        dependency_tracker: Option<Arc<DependencyTracker>>,
107    ) -> Self {
108        let mut pending_optimistically_confirmed_banks = HashSet::new();
109        let mut last_notified_confirmed_slot: Slot = 0;
110        let mut highest_confirmed_slot: Slot = 0;
111        let mut newest_root_slot: Slot = 0;
112        let thread_hdl = Builder::new()
113            .name("solOpConfBnkTrk".to_string())
114            .spawn(move || loop {
115                if exit.load(Ordering::Relaxed) {
116                    break;
117                }
118
119                if let Err(RecvTimeoutError::Disconnected) = Self::recv_notification(
120                    &receiver,
121                    &bank_forks,
122                    &optimistically_confirmed_bank,
123                    &subscriptions,
124                    &mut pending_optimistically_confirmed_banks,
125                    &mut last_notified_confirmed_slot,
126                    &mut highest_confirmed_slot,
127                    &mut newest_root_slot,
128                    &slot_notification_subscribers,
129                    &prioritization_fee_cache,
130                    &dependency_tracker,
131                ) {
132                    break;
133                }
134            })
135            .unwrap();
136        Self { thread_hdl }
137    }
138
139    #[allow(clippy::too_many_arguments)]
140    fn recv_notification(
141        receiver: &Receiver<BankNotificationWithEventSequence>,
142        bank_forks: &RwLock<BankForks>,
143        optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
144        subscriptions: &RpcSubscriptions,
145        pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
146        last_notified_confirmed_slot: &mut Slot,
147        highest_confirmed_slot: &mut Slot,
148        newest_root_slot: &mut Slot,
149        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
150        prioritization_fee_cache: &PrioritizationFeeCache,
151        dependency_tracker: &Option<Arc<DependencyTracker>>,
152    ) -> Result<(), RecvTimeoutError> {
153        let notification = receiver.recv_timeout(Duration::from_secs(1))?;
154        Self::process_notification(
155            notification,
156            bank_forks,
157            optimistically_confirmed_bank,
158            subscriptions,
159            pending_optimistically_confirmed_banks,
160            last_notified_confirmed_slot,
161            highest_confirmed_slot,
162            newest_root_slot,
163            slot_notification_subscribers,
164            prioritization_fee_cache,
165            dependency_tracker,
166        );
167        Ok(())
168    }
169
170    fn notify_slot_status(
171        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
172        notification: SlotNotification,
173    ) {
174        if let Some(slot_notification_subscribers) = slot_notification_subscribers {
175            for sender in slot_notification_subscribers.read().unwrap().iter() {
176                match sender.send(notification.clone()) {
177                    Ok(_) => {}
178                    Err(err) => {
179                        info!("Failed to send notification {notification:?}, error: {err:?}");
180                    }
181                }
182            }
183        }
184    }
185
186    fn notify_or_defer(
187        subscriptions: &RpcSubscriptions,
188        bank_forks: &RwLock<BankForks>,
189        bank: &Bank,
190        last_notified_confirmed_slot: &mut Slot,
191        pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
192        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
193        prioritization_fee_cache: &PrioritizationFeeCache,
194    ) {
195        if bank.is_frozen() {
196            if bank.slot() > *last_notified_confirmed_slot {
197                debug!(
198                    "notify_or_defer notifying via notify_gossip_subscribers for slot {:?}",
199                    bank.slot()
200                );
201                subscriptions.notify_gossip_subscribers(bank.slot());
202                *last_notified_confirmed_slot = bank.slot();
203                Self::notify_slot_status(
204                    slot_notification_subscribers,
205                    SlotNotification::OptimisticallyConfirmed(bank.slot()),
206                );
207
208                // finalize block's minimum prioritization fee cache for this bank
209                prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id());
210            }
211        } else if bank.slot() > bank_forks.read().unwrap().root() {
212            pending_optimistically_confirmed_banks.insert(bank.slot());
213            debug!("notify_or_defer defer notifying for slot {:?}", bank.slot());
214        }
215    }
216
217    fn notify_or_defer_confirmed_banks(
218        subscriptions: &RpcSubscriptions,
219        bank_forks: &RwLock<BankForks>,
220        bank: Arc<Bank>,
221        slot_threshold: Slot,
222        last_notified_confirmed_slot: &mut Slot,
223        pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
224        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
225        prioritization_fee_cache: &PrioritizationFeeCache,
226    ) {
227        for confirmed_bank in bank.parents_inclusive().iter().rev() {
228            if confirmed_bank.slot() > slot_threshold {
229                debug!(
230                    "Calling notify_or_defer for confirmed_bank {:?}",
231                    confirmed_bank.slot()
232                );
233                Self::notify_or_defer(
234                    subscriptions,
235                    bank_forks,
236                    confirmed_bank,
237                    last_notified_confirmed_slot,
238                    pending_optimistically_confirmed_banks,
239                    slot_notification_subscribers,
240                    prioritization_fee_cache,
241                );
242            }
243        }
244    }
245
246    fn notify_new_root_slots(
247        roots: &mut [Slot],
248        newest_root_slot: &mut Slot,
249        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
250    ) {
251        if slot_notification_subscribers.is_none() {
252            return;
253        }
254        roots.sort_unstable();
255        // The chain are sorted already and must contain at least the parent of a newly rooted slot as the first element
256        assert!(roots.len() >= 2);
257        for i in 1..roots.len() {
258            let root = roots[i];
259            if root > *newest_root_slot {
260                let parent = roots[i - 1];
261                debug!("Doing SlotNotification::Root for root {root}, parent: {parent}");
262                Self::notify_slot_status(
263                    slot_notification_subscribers,
264                    SlotNotification::Root((root, parent)),
265                );
266                *newest_root_slot = root;
267            }
268        }
269    }
270
271    #[allow(clippy::too_many_arguments)]
272    pub fn process_notification(
273        (notification, dependency_work): BankNotificationWithEventSequence,
274        bank_forks: &RwLock<BankForks>,
275        optimistically_confirmed_bank: &RwLock<OptimisticallyConfirmedBank>,
276        subscriptions: &RpcSubscriptions,
277        pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
278        last_notified_confirmed_slot: &mut Slot,
279        highest_confirmed_slot: &mut Slot,
280        newest_root_slot: &mut Slot,
281        slot_notification_subscribers: &Option<Arc<RwLock<Vec<SlotNotificationSender>>>>,
282        prioritization_fee_cache: &PrioritizationFeeCache,
283        dependency_tracker: &Option<Arc<DependencyTracker>>,
284    ) {
285        debug!("received bank notification: {notification:?} event: {dependency_work:?}");
286
287        if let Some(tracker) = dependency_tracker.as_ref() {
288            if let Some(dependency_work) = dependency_work {
289                tracker.wait_for_dependency(dependency_work);
290            }
291        }
292        match notification {
293            BankNotification::OptimisticallyConfirmed(slot) => {
294                let bank = bank_forks.read().unwrap().get(slot);
295                if let Some(bank) = bank {
296                    let mut w_optimistically_confirmed_bank =
297                        optimistically_confirmed_bank.write().unwrap();
298
299                    if bank.slot() > w_optimistically_confirmed_bank.bank.slot() && bank.is_frozen()
300                    {
301                        w_optimistically_confirmed_bank.bank = bank.clone();
302                    }
303
304                    if slot > *highest_confirmed_slot {
305                        Self::notify_or_defer_confirmed_banks(
306                            subscriptions,
307                            bank_forks,
308                            bank,
309                            *highest_confirmed_slot,
310                            last_notified_confirmed_slot,
311                            pending_optimistically_confirmed_banks,
312                            slot_notification_subscribers,
313                            prioritization_fee_cache,
314                        );
315
316                        *highest_confirmed_slot = slot;
317                    }
318                    drop(w_optimistically_confirmed_bank);
319                } else if slot > bank_forks.read().unwrap().root() {
320                    pending_optimistically_confirmed_banks.insert(slot);
321                } else {
322                    inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1);
323                }
324
325                // Send slot notification regardless of whether the bank is replayed
326                subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation {
327                    slot,
328                    timestamp: timestamp(),
329                });
330                // NOTE: replay of `slot` may or may not be complete. Therefore, most new
331                // functionality to be triggered on optimistic confirmation should go in
332                // `notify_or_defer()` under the `bank.is_frozen()` case instead of here.
333            }
334            BankNotification::Frozen(bank) => {
335                let frozen_slot = bank.slot();
336                if let Some(parent) = bank.parent() {
337                    let num_successful_transactions = bank
338                        .transaction_count()
339                        .saturating_sub(parent.transaction_count());
340                    subscriptions.notify_slot_update(SlotUpdate::Frozen {
341                        slot: frozen_slot,
342                        timestamp: timestamp(),
343                        stats: SlotTransactionStats {
344                            num_transaction_entries: bank.transaction_entries_count(),
345                            num_successful_transactions,
346                            num_failed_transactions: bank.transaction_error_count(),
347                            max_transactions_per_entry: bank.transactions_per_entry_max(),
348                        },
349                    });
350
351                    Self::notify_slot_status(
352                        slot_notification_subscribers,
353                        SlotNotification::Frozen((bank.slot(), bank.parent_slot())),
354                    );
355                }
356
357                if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
358                    debug!(
359                        "Calling notify_gossip_subscribers to send deferred notification \
360                         {frozen_slot:?}"
361                    );
362
363                    Self::notify_or_defer_confirmed_banks(
364                        subscriptions,
365                        bank_forks,
366                        bank.clone(),
367                        *last_notified_confirmed_slot,
368                        last_notified_confirmed_slot,
369                        pending_optimistically_confirmed_banks,
370                        slot_notification_subscribers,
371                        prioritization_fee_cache,
372                    );
373
374                    let mut w_optimistically_confirmed_bank =
375                        optimistically_confirmed_bank.write().unwrap();
376                    if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
377                        w_optimistically_confirmed_bank.bank = bank;
378                    }
379                    drop(w_optimistically_confirmed_bank);
380                }
381            }
382            BankNotification::NewRootBank(bank) => {
383                let root_slot = bank.slot();
384                let mut w_optimistically_confirmed_bank =
385                    optimistically_confirmed_bank.write().unwrap();
386                if root_slot > w_optimistically_confirmed_bank.bank.slot() {
387                    w_optimistically_confirmed_bank.bank = bank;
388                }
389                drop(w_optimistically_confirmed_bank);
390
391                pending_optimistically_confirmed_banks.retain(|&s| s > root_slot);
392            }
393            BankNotification::NewRootedChain(mut roots) => {
394                Self::notify_new_root_slots(
395                    &mut roots,
396                    newest_root_slot,
397                    slot_notification_subscribers,
398                );
399            }
400        }
401    }
402
403    pub fn close(self) -> thread::Result<()> {
404        self.join()
405    }
406
407    pub fn join(self) -> thread::Result<()> {
408        self.thread_hdl.join()
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use {
415        super::*,
416        crossbeam_channel::unbounded,
417        solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
418        solana_pubkey::Pubkey,
419        solana_runtime::{commitment::BlockCommitmentCache, dependency_tracker},
420        std::sync::atomic::AtomicU64,
421    };
422
423    /// Receive the Root notifications from the channel, if no item received within 100 ms, break and return all
424    /// of those received.
425    fn get_root_notifications(receiver: &Receiver<SlotNotification>) -> Vec<SlotNotification> {
426        let mut notifications = Vec::new();
427        while let Ok(notification) = receiver.recv_timeout(Duration::from_millis(100)) {
428            notifications.push(notification);
429        }
430        notifications
431    }
432
433    #[test]
434    fn test_process_notification() {
435        let exit = Arc::new(AtomicBool::new(false));
436        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
437        let bank = Bank::new_for_tests(&genesis_config);
438        let bank_forks = BankForks::new_rw_arc(bank);
439        let bank0 = bank_forks.read().unwrap().get(0).unwrap();
440        let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
441        bank_forks.write().unwrap().insert(bank1);
442        let bank1 = bank_forks.read().unwrap().get(1).unwrap();
443        let bank2 = Bank::new_from_parent(bank1, &Pubkey::default(), 2);
444        bank_forks.write().unwrap().insert(bank2);
445        let bank2 = bank_forks.read().unwrap().get(2).unwrap();
446        let bank3 = Bank::new_from_parent(bank2, &Pubkey::default(), 3);
447        bank_forks.write().unwrap().insert(bank3);
448
449        let optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>> =
450            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
451
452        let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
453        let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
454        let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
455            exit,
456            max_complete_transaction_status_slot,
457            bank_forks.clone(),
458            block_commitment_cache,
459            optimistically_confirmed_bank.clone(),
460        ));
461        let mut pending_optimistically_confirmed_banks: HashSet<u64> = HashSet::new();
462
463        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
464
465        let mut highest_confirmed_slot: Slot = 0;
466        let mut newest_root_slot: Slot = 0;
467
468        let mut last_notified_confirmed_slot: Slot = 0;
469        OptimisticallyConfirmedBankTracker::process_notification(
470            (
471                BankNotification::OptimisticallyConfirmed(2),
472                None, /* no work sequence */
473            ),
474            &bank_forks,
475            &optimistically_confirmed_bank,
476            &subscriptions,
477            &mut pending_optimistically_confirmed_banks,
478            &mut last_notified_confirmed_slot,
479            &mut highest_confirmed_slot,
480            &mut newest_root_slot,
481            &None,
482            &PrioritizationFeeCache::default(),
483            &None, // No dependency tracker
484        );
485        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
486        assert_eq!(highest_confirmed_slot, 2);
487
488        // Test max optimistically confirmed bank remains in the cache
489        OptimisticallyConfirmedBankTracker::process_notification(
490            (
491                BankNotification::OptimisticallyConfirmed(1),
492                None, /* no work sequence */
493            ),
494            &bank_forks,
495            &optimistically_confirmed_bank,
496            &subscriptions,
497            &mut pending_optimistically_confirmed_banks,
498            &mut last_notified_confirmed_slot,
499            &mut highest_confirmed_slot,
500            &mut newest_root_slot,
501            &None,
502            &PrioritizationFeeCache::default(),
503            &None, // No dependency tracker
504        );
505        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
506        assert_eq!(highest_confirmed_slot, 2);
507
508        // Test bank will only be cached when frozen
509        OptimisticallyConfirmedBankTracker::process_notification(
510            (
511                BankNotification::OptimisticallyConfirmed(3),
512                None, /* no work sequence */
513            ),
514            &bank_forks,
515            &optimistically_confirmed_bank,
516            &subscriptions,
517            &mut pending_optimistically_confirmed_banks,
518            &mut last_notified_confirmed_slot,
519            &mut highest_confirmed_slot,
520            &mut newest_root_slot,
521            &None,
522            &PrioritizationFeeCache::default(),
523            &None, // No dependency tracker
524        );
525        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
526        assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
527        assert!(pending_optimistically_confirmed_banks.contains(&3));
528        assert_eq!(highest_confirmed_slot, 3);
529
530        // Test bank will only be cached when frozen
531        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
532        bank3.freeze();
533
534        OptimisticallyConfirmedBankTracker::process_notification(
535            (
536                BankNotification::Frozen(bank3),
537                None, /* no work sequence */
538            ),
539            &bank_forks,
540            &optimistically_confirmed_bank,
541            &subscriptions,
542            &mut pending_optimistically_confirmed_banks,
543            &mut last_notified_confirmed_slot,
544            &mut highest_confirmed_slot,
545            &mut newest_root_slot,
546            &None,
547            &PrioritizationFeeCache::default(),
548            &None, // No dependency tracker
549        );
550        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
551        assert_eq!(highest_confirmed_slot, 3);
552        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
553
554        // Test higher root will be cached and clear pending_optimistically_confirmed_banks
555        let bank3 = bank_forks.read().unwrap().get(3).unwrap();
556        let bank4 = Bank::new_from_parent(bank3, &Pubkey::default(), 4);
557        bank_forks.write().unwrap().insert(bank4);
558        OptimisticallyConfirmedBankTracker::process_notification(
559            (
560                BankNotification::OptimisticallyConfirmed(4),
561                None, /* no work sequence */
562            ),
563            &bank_forks,
564            &optimistically_confirmed_bank,
565            &subscriptions,
566            &mut pending_optimistically_confirmed_banks,
567            &mut last_notified_confirmed_slot,
568            &mut highest_confirmed_slot,
569            &mut newest_root_slot,
570            &None,
571            &PrioritizationFeeCache::default(),
572            &None, // No dependency tracker
573        );
574        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
575        assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
576        assert!(pending_optimistically_confirmed_banks.contains(&4));
577        assert_eq!(highest_confirmed_slot, 4);
578
579        let bank4 = bank_forks.read().unwrap().get(4).unwrap();
580        let bank5 = Bank::new_from_parent(bank4, &Pubkey::default(), 5);
581        bank_forks.write().unwrap().insert(bank5);
582        let bank5 = bank_forks.read().unwrap().get(5).unwrap();
583
584        let mut bank_notification_senders = Vec::new();
585        let (sender, receiver) = unbounded();
586        bank_notification_senders.push(sender);
587
588        let subscribers = Some(Arc::new(RwLock::new(bank_notification_senders)));
589        let parent_roots = bank5.ancestors.keys();
590
591        OptimisticallyConfirmedBankTracker::process_notification(
592            (
593                BankNotification::NewRootBank(bank5),
594                None, /* no work sequence */
595            ),
596            &bank_forks,
597            &optimistically_confirmed_bank,
598            &subscriptions,
599            &mut pending_optimistically_confirmed_banks,
600            &mut last_notified_confirmed_slot,
601            &mut highest_confirmed_slot,
602            &mut newest_root_slot,
603            &subscribers,
604            &PrioritizationFeeCache::default(),
605            &None, // No dependency tracker
606        );
607        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
608        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
609        assert!(!pending_optimistically_confirmed_banks.contains(&4));
610        assert_eq!(highest_confirmed_slot, 4);
611        // The newest_root_slot is updated via NewRootedChain only
612        assert_eq!(newest_root_slot, 0);
613
614        OptimisticallyConfirmedBankTracker::process_notification(
615            (
616                BankNotification::NewRootedChain(parent_roots),
617                None, /* no work sequence */
618            ),
619            &bank_forks,
620            &optimistically_confirmed_bank,
621            &subscriptions,
622            &mut pending_optimistically_confirmed_banks,
623            &mut last_notified_confirmed_slot,
624            &mut highest_confirmed_slot,
625            &mut newest_root_slot,
626            &subscribers,
627            &PrioritizationFeeCache::default(),
628            &None, // No dependency tracker
629        );
630
631        assert_eq!(newest_root_slot, 5);
632
633        // Obtain the root notifications, we expect 5, including that for bank5.
634        let notifications = get_root_notifications(&receiver);
635        assert_eq!(notifications.len(), 5);
636
637        // Banks <= root do not get added to pending list, even if not frozen
638        let bank5 = bank_forks.read().unwrap().get(5).unwrap();
639        let bank6 = Bank::new_from_parent(bank5, &Pubkey::default(), 6);
640        bank_forks.write().unwrap().insert(bank6);
641        let bank5 = bank_forks.read().unwrap().get(5).unwrap();
642        let bank7 = Bank::new_from_parent(bank5, &Pubkey::default(), 7);
643        bank_forks.write().unwrap().insert(bank7);
644        bank_forks.write().unwrap().set_root(7, None, None).unwrap();
645        OptimisticallyConfirmedBankTracker::process_notification(
646            (
647                BankNotification::OptimisticallyConfirmed(6),
648                None, /* no work sequence */
649            ),
650            &bank_forks,
651            &optimistically_confirmed_bank,
652            &subscriptions,
653            &mut pending_optimistically_confirmed_banks,
654            &mut last_notified_confirmed_slot,
655            &mut highest_confirmed_slot,
656            &mut newest_root_slot,
657            &None,
658            &PrioritizationFeeCache::default(),
659            &None, // No dependency tracker
660        );
661        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
662        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
663        assert!(!pending_optimistically_confirmed_banks.contains(&6));
664        assert_eq!(highest_confirmed_slot, 4);
665        assert_eq!(newest_root_slot, 5);
666
667        let bank7 = bank_forks.read().unwrap().get(7).unwrap();
668        let parent_roots = bank7.ancestors.keys();
669
670        OptimisticallyConfirmedBankTracker::process_notification(
671            (
672                BankNotification::NewRootBank(bank7),
673                None, /* no work sequence */
674            ),
675            &bank_forks,
676            &optimistically_confirmed_bank,
677            &subscriptions,
678            &mut pending_optimistically_confirmed_banks,
679            &mut last_notified_confirmed_slot,
680            &mut highest_confirmed_slot,
681            &mut newest_root_slot,
682            &subscribers,
683            &PrioritizationFeeCache::default(),
684            &None, // No dependency tracker
685        );
686        assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 7);
687        assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
688        assert!(!pending_optimistically_confirmed_banks.contains(&6));
689        assert_eq!(highest_confirmed_slot, 4);
690        assert_eq!(newest_root_slot, 5);
691
692        OptimisticallyConfirmedBankTracker::process_notification(
693            (
694                BankNotification::NewRootedChain(parent_roots),
695                None, /* no work sequence */
696            ),
697            &bank_forks,
698            &optimistically_confirmed_bank,
699            &subscriptions,
700            &mut pending_optimistically_confirmed_banks,
701            &mut last_notified_confirmed_slot,
702            &mut highest_confirmed_slot,
703            &mut newest_root_slot,
704            &subscribers,
705            &PrioritizationFeeCache::default(),
706            &None, // No dependency tracker
707        );
708
709        assert_eq!(newest_root_slot, 7);
710
711        // Obtain the root notifications, we expect 1, which is for bank7 only as its parent bank5 is already notified.
712        let notifications = get_root_notifications(&receiver);
713        assert_eq!(notifications.len(), 1);
714    }
715
716    #[test]
717    fn test_event_synchronization() {
718        let exit = Arc::new(AtomicBool::new(false));
719        let dependency_tracker: Arc<DependencyTracker> =
720            Arc::new(dependency_tracker::DependencyTracker::default());
721        let work_sequence_1 = 345;
722        let work_sequence_2 = 678;
723        let tracker_clone = dependency_tracker.clone();
724        let handle = thread::spawn(move || {
725            let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100);
726            let bank = Bank::new_for_tests(&genesis_config);
727            let bank_forks = BankForks::new_rw_arc(bank);
728
729            // Test bank will only be cached when frozen
730            let bank0 = bank_forks.read().unwrap().get(0).unwrap();
731            let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
732            bank_forks.write().unwrap().insert(bank1);
733
734            let mut pending_optimistically_confirmed_banks: HashSet<u64> = HashSet::new();
735            let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
736
737            let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
738
739            let mut highest_confirmed_slot: Slot = 0;
740            let mut newest_root_slot: Slot = 0;
741
742            let mut last_notified_confirmed_slot: Slot = 0;
743
744            let optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>> =
745                OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
746
747            let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
748                exit,
749                max_complete_transaction_status_slot,
750                bank_forks.clone(),
751                block_commitment_cache,
752                optimistically_confirmed_bank.clone(),
753            ));
754
755            // confirmed without fronzen received
756            OptimisticallyConfirmedBankTracker::process_notification(
757                (
758                    BankNotification::OptimisticallyConfirmed(1),
759                    Some(work_sequence_1), /* dependency work sequence */
760                ),
761                &bank_forks,
762                &optimistically_confirmed_bank,
763                &subscriptions,
764                &mut pending_optimistically_confirmed_banks,
765                &mut last_notified_confirmed_slot,
766                &mut highest_confirmed_slot,
767                &mut newest_root_slot,
768                &None,
769                &PrioritizationFeeCache::default(),
770                &Some(tracker_clone.clone()),
771            );
772
773            assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
774            // highest_confirmed_slot is updated even when we have not received the frozen event
775            assert_eq!(highest_confirmed_slot, 1);
776            assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
777
778            let bank1 = bank_forks.read().unwrap().get(1).unwrap();
779            bank1.freeze();
780
781            OptimisticallyConfirmedBankTracker::process_notification(
782                (
783                    BankNotification::Frozen(bank1),
784                    Some(work_sequence_2), /* dependency work sequence */
785                ),
786                &bank_forks,
787                &optimistically_confirmed_bank,
788                &subscriptions,
789                &mut pending_optimistically_confirmed_banks,
790                &mut last_notified_confirmed_slot,
791                &mut highest_confirmed_slot,
792                &mut newest_root_slot,
793                &None,
794                &PrioritizationFeeCache::default(),
795                &Some(tracker_clone),
796            );
797
798            assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 1);
799            assert_eq!(highest_confirmed_slot, 1);
800            assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
801        });
802
803        dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_1);
804        dependency_tracker.mark_this_and_all_previous_work_processed(work_sequence_2);
805
806        handle.join().unwrap();
807    }
808}