solana_runtime/
accounts_background_service.rs

1//! Service to clean up dead slots in accounts_db
2//!
3//! This can be expensive since we have to walk the append vecs being cleaned up.
4
5mod stats;
6use {
7    crate::{
8        accounts_db::{AccountStorageEntry, CalcAccountsHashDataSource},
9        accounts_hash::CalcAccountsHashConfig,
10        bank::{Bank, BankSlotDelta, DropCallback},
11        bank_forks::BankForks,
12        snapshot_config::SnapshotConfig,
13        snapshot_package::{self, AccountsPackage, AccountsPackageType, SnapshotType},
14        snapshot_utils::{self, SnapshotError},
15    },
16    crossbeam_channel::{Receiver, SendError, Sender},
17    log::*,
18    rand::{thread_rng, Rng},
19    snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
20    solana_measure::measure::Measure,
21    solana_sdk::clock::{BankId, Slot},
22    stats::StatsManager,
23    std::{
24        boxed::Box,
25        fmt::{Debug, Formatter},
26        sync::{
27            atomic::{AtomicBool, AtomicU64, Ordering},
28            Arc, RwLock,
29        },
30        thread::{self, sleep, Builder, JoinHandle},
31        time::{Duration, Instant},
32    },
33};
34
35const INTERVAL_MS: u64 = 100;
36const CLEAN_INTERVAL_BLOCKS: u64 = 100;
37
38// This value is chosen to spread the dropping cost over 3 expiration checks
39// RecycleStores are fully populated almost all of its lifetime. So, otherwise
40// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
41// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
42// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
43const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
44
45pub type SnapshotRequestSender = Sender<SnapshotRequest>;
46pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
47pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
48pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
49
50/// interval to report bank_drop queue events: 60s
51const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
52/// maximum drop bank signal queue length
53const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
54
55#[derive(Debug, Default)]
56struct PrunedBankQueueLenReporter {
57    last_report_time: AtomicU64,
58}
59
60impl PrunedBankQueueLenReporter {
61    fn report(&self, q_len: usize) {
62        let now = solana_sdk::timing::timestamp();
63        let last_report_time = self.last_report_time.load(Ordering::Acquire);
64        if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
65            && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
66        {
67            datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
68            self.last_report_time.store(now, Ordering::Release);
69        }
70    }
71}
72
73lazy_static! {
74    static ref BANK_DROP_QUEUE_REPORTER: PrunedBankQueueLenReporter =
75        PrunedBankQueueLenReporter::default();
76}
77
78#[derive(Clone)]
79pub struct SendDroppedBankCallback {
80    sender: DroppedSlotsSender,
81}
82
83impl DropCallback for SendDroppedBankCallback {
84    fn callback(&self, bank: &Bank) {
85        BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
86        if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
87            info!("bank DropCallback signal queue disconnected.");
88        }
89    }
90
91    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
92        Box::new(self.clone())
93    }
94}
95
96impl Debug for SendDroppedBankCallback {
97    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
98        write!(f, "SendDroppedBankCallback({self:p})")
99    }
100}
101
102impl SendDroppedBankCallback {
103    pub fn new(sender: DroppedSlotsSender) -> Self {
104        Self { sender }
105    }
106}
107
108pub struct SnapshotRequest {
109    pub snapshot_root_bank: Arc<Bank>,
110    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
111    pub request_type: SnapshotRequestType,
112
113    /// The instant this request was send to the queue.
114    /// Used to track how long requests wait before processing.
115    pub enqueued: Instant,
116}
117
118impl Debug for SnapshotRequest {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("SnapshotRequest")
121            .field("request type", &self.request_type)
122            .field("bank slot", &self.snapshot_root_bank.slot())
123            .finish()
124    }
125}
126
127/// What type of request is this?
128///
129/// The snapshot request has been expanded to support more than just snapshots.  This is
130/// confusing, but can be resolved by renaming this type; or better, by creating an enum with
131/// variants that wrap the fields-of-interest for each request.
132#[derive(Debug, Copy, Clone, Eq, PartialEq)]
133pub enum SnapshotRequestType {
134    Snapshot,
135    EpochAccountsHash,
136}
137
138pub struct SnapshotRequestHandler {
139    pub snapshot_config: SnapshotConfig,
140    pub snapshot_request_sender: SnapshotRequestSender,
141    pub snapshot_request_receiver: SnapshotRequestReceiver,
142    pub accounts_package_sender: Sender<AccountsPackage>,
143}
144
145impl SnapshotRequestHandler {
146    // Returns the latest requested snapshot block height and storages
147    #[allow(clippy::type_complexity)]
148    pub fn handle_snapshot_requests(
149        &self,
150        test_hash_calculation: bool,
151        non_snapshot_time_us: u128,
152        last_full_snapshot_slot: &mut Option<Slot>,
153    ) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
154        let (
155            snapshot_request,
156            accounts_package_type,
157            num_outstanding_requests,
158            num_re_enqueued_requests,
159        ) = self.get_next_snapshot_request(*last_full_snapshot_slot)?;
160
161        datapoint_info!(
162            "handle_snapshot_requests",
163            (
164                "num-outstanding-requests",
165                num_outstanding_requests as i64,
166                i64
167            ),
168            (
169                "num-re-enqueued-requests",
170                num_re_enqueued_requests as i64,
171                i64
172            ),
173            (
174                "enqueued-time-us",
175                snapshot_request.enqueued.elapsed().as_micros() as i64,
176                i64
177            ),
178        );
179
180        Some(self.handle_snapshot_request(
181            test_hash_calculation,
182            non_snapshot_time_us,
183            last_full_snapshot_slot,
184            snapshot_request,
185            accounts_package_type,
186        ))
187    }
188
189    /// Get the next snapshot request to handle
190    ///
191    /// Look through the snapshot request channel to find the highest priority one to handle next.
192    /// If there are no snapshot requests in the channel, return None.  Otherwise return the
193    /// highest priority one.  Unhandled snapshot requests with slots GREATER-THAN the handled one
194    /// will be re-enqueued.  The remaining will be dropped.
195    ///
196    /// Also return the number of snapshot requests initially in the channel, and the number of
197    /// ones re-enqueued.
198    fn get_next_snapshot_request(
199        &self,
200        last_full_snapshot_slot: Option<Slot>,
201    ) -> Option<(
202        SnapshotRequest,
203        AccountsPackageType,
204        /*num outstanding snapshot requests*/ usize,
205        /*num re-enqueued snapshot requests*/ usize,
206    )> {
207        let mut requests: Vec<_> = self
208            .snapshot_request_receiver
209            .try_iter()
210            .map(|request| {
211                let accounts_package_type = new_accounts_package_type(
212                    &request,
213                    &self.snapshot_config,
214                    last_full_snapshot_slot,
215                );
216                (request, accounts_package_type)
217            })
218            .collect();
219        // `select_nth()` panics if the slice is empty, so return early if that's the case
220        if requests.is_empty() {
221            return None;
222        }
223        let requests_len = requests.len();
224        debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
225        let num_eah_requests = requests
226            .iter()
227            .filter(|(_, account_package_type)| {
228                *account_package_type == AccountsPackageType::EpochAccountsHash
229            })
230            .count();
231        assert!(
232            num_eah_requests <= 1,
233            "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
234        );
235
236        // Get the highest priority request, and put it at the end, because we're going to pop it
237        requests.select_nth_unstable_by(requests_len - 1, cmp_requests_by_priority);
238        // SAFETY: We know `requests` is not empty, so its len is >= 1, therefore there is always
239        // an element to pop.
240        let (snapshot_request, accounts_package_type) = requests.pop().unwrap();
241        let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
242        // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
243        let num_re_enqueued_requests = requests
244            .into_iter()
245            .filter(|(snapshot_request, _)| {
246                snapshot_request.snapshot_root_bank.slot() > handled_request_slot
247            })
248            .map(|(snapshot_request, _)| {
249                self.snapshot_request_sender
250                    .try_send(snapshot_request)
251                    .expect("re-enqueue snapshot request")
252            })
253            .count();
254
255        Some((
256            snapshot_request,
257            accounts_package_type,
258            requests_len,
259            num_re_enqueued_requests,
260        ))
261    }
262
263    fn handle_snapshot_request(
264        &self,
265        test_hash_calculation: bool,
266        non_snapshot_time_us: u128,
267        last_full_snapshot_slot: &mut Option<Slot>,
268        snapshot_request: SnapshotRequest,
269        accounts_package_type: AccountsPackageType,
270    ) -> Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError> {
271        debug!(
272            "handling snapshot request: {:?}, {:?}",
273            snapshot_request, accounts_package_type
274        );
275        let mut total_time = Measure::start("snapshot_request_receiver_total_time");
276        let SnapshotRequest {
277            snapshot_root_bank,
278            status_cache_slot_deltas,
279            request_type,
280            enqueued: _,
281        } = snapshot_request;
282
283        // we should not rely on the state of this validator until startup verification is complete
284        assert!(snapshot_root_bank.is_startup_verification_complete());
285
286        if accounts_package_type == AccountsPackageType::Snapshot(SnapshotType::FullSnapshot) {
287            *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
288        }
289
290        let previous_accounts_hash = test_hash_calculation.then(|| {
291            // We have to use the index version here.
292            // We cannot calculate the non-index way because cache has not been flushed and stores don't match reality.
293            snapshot_root_bank.update_accounts_hash(
294                CalcAccountsHashDataSource::IndexForTests,
295                false,
296                false,
297            )
298        });
299
300        let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
301        // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
302        // That's because `snapshot_root_bank.slot()` must be root at this point,
303        // and contains relevant updates because each bank has at least 1 account update due
304        // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
305        snapshot_root_bank.force_flush_accounts_cache();
306        // Ensure all roots <= `self.slot()` have been flushed.
307        // Note `max_flush_root` could be larger than self.slot() if there are
308        // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
309        assert!(
310            snapshot_root_bank.slot()
311                <= snapshot_root_bank
312                    .rc
313                    .accounts
314                    .accounts_db
315                    .accounts_cache
316                    .fetch_max_flush_root()
317        );
318        flush_accounts_cache_time.stop();
319
320        let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
321            let check_hash = false;
322
323            let (this_accounts_hash, capitalization) = snapshot_root_bank
324                .accounts()
325                .accounts_db
326                .calculate_accounts_hash(
327                    CalcAccountsHashDataSource::Storages,
328                    snapshot_root_bank.slot(),
329                    &CalcAccountsHashConfig {
330                        use_bg_thread_pool: true,
331                        check_hash,
332                        ancestors: None,
333                        epoch_schedule: snapshot_root_bank.epoch_schedule(),
334                        rent_collector: snapshot_root_bank.rent_collector(),
335                        store_detailed_debug_info_on_failure: false,
336                    },
337                )
338                .unwrap();
339            assert_eq!(previous_accounts_hash, this_accounts_hash);
340            assert_eq!(capitalization, snapshot_root_bank.capitalization());
341            this_accounts_hash
342        });
343
344        let mut clean_time = Measure::start("clean_time");
345        snapshot_root_bank.clean_accounts(*last_full_snapshot_slot);
346        clean_time.stop();
347
348        let mut shrink_time = Measure::start("shrink_time");
349        snapshot_root_bank.shrink_candidate_slots();
350        shrink_time.stop();
351
352        // Snapshot the bank and send over an accounts package
353        let mut snapshot_time = Measure::start("snapshot_time");
354        let snapshot_storages = snapshot_utils::get_snapshot_storages(&snapshot_root_bank);
355        let accounts_package = match request_type {
356            SnapshotRequestType::Snapshot => {
357                let bank_snapshot_info = snapshot_utils::add_bank_snapshot(
358                    &self.snapshot_config.bank_snapshots_dir,
359                    &snapshot_root_bank,
360                    &snapshot_storages,
361                    self.snapshot_config.snapshot_version,
362                    status_cache_slot_deltas,
363                )
364                .expect("snapshot bank");
365                AccountsPackage::new_for_snapshot(
366                    accounts_package_type,
367                    &snapshot_root_bank,
368                    &bank_snapshot_info,
369                    &self.snapshot_config.bank_snapshots_dir,
370                    &self.snapshot_config.full_snapshot_archives_dir,
371                    &self.snapshot_config.incremental_snapshot_archives_dir,
372                    snapshot_storages.clone(),
373                    self.snapshot_config.archive_format,
374                    self.snapshot_config.snapshot_version,
375                    accounts_hash_for_testing,
376                )
377                .expect("new accounts package for snapshot")
378            }
379            SnapshotRequestType::EpochAccountsHash => {
380                // skip the bank snapshot, just make an accounts package to send to AHV
381                AccountsPackage::new_for_epoch_accounts_hash(
382                    accounts_package_type,
383                    &snapshot_root_bank,
384                    snapshot_storages.clone(),
385                    accounts_hash_for_testing,
386                )
387            }
388        };
389        self.accounts_package_sender
390            .send(accounts_package)
391            .expect("send accounts package");
392        snapshot_time.stop();
393        info!(
394            "Took bank snapshot. accounts package type: {:?}, slot: {}, bank hash: {}",
395            accounts_package_type,
396            snapshot_root_bank.slot(),
397            snapshot_root_bank.hash(),
398        );
399
400        // Cleanup outdated snapshots
401        let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
402        snapshot_utils::purge_old_bank_snapshots(
403            &self.snapshot_config.bank_snapshots_dir,
404            MAX_BANK_SNAPSHOTS_TO_RETAIN,
405        );
406        purge_old_snapshots_time.stop();
407        total_time.stop();
408
409        datapoint_info!(
410            "handle_snapshot_requests-timing",
411            (
412                "flush_accounts_cache_time",
413                flush_accounts_cache_time.as_us(),
414                i64
415            ),
416            ("shrink_time", shrink_time.as_us(), i64),
417            ("clean_time", clean_time.as_us(), i64),
418            ("snapshot_time", snapshot_time.as_us(), i64),
419            (
420                "purge_old_snapshots_time",
421                purge_old_snapshots_time.as_us(),
422                i64
423            ),
424            ("total_us", total_time.as_us(), i64),
425            ("non_snapshot_time_us", non_snapshot_time_us, i64),
426        );
427        Ok((snapshot_root_bank.block_height(), snapshot_storages))
428    }
429}
430
431#[derive(Default, Clone)]
432pub struct AbsRequestSender {
433    snapshot_request_sender: Option<SnapshotRequestSender>,
434}
435
436impl AbsRequestSender {
437    pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
438        Self {
439            snapshot_request_sender: Some(snapshot_request_sender),
440        }
441    }
442
443    pub fn is_snapshot_creation_enabled(&self) -> bool {
444        self.snapshot_request_sender.is_some()
445    }
446
447    pub fn send_snapshot_request(
448        &self,
449        snapshot_request: SnapshotRequest,
450    ) -> Result<(), SendError<SnapshotRequest>> {
451        if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
452            snapshot_request_sender.send(snapshot_request)
453        } else {
454            Ok(())
455        }
456    }
457}
458
459#[derive(Debug)]
460pub struct PrunedBanksRequestHandler {
461    pub pruned_banks_receiver: DroppedSlotsReceiver,
462}
463
464impl PrunedBanksRequestHandler {
465    pub fn handle_request(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
466        let mut count = 0;
467        for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
468            count += 1;
469            bank.rc.accounts.accounts_db.purge_slot(
470                pruned_slot,
471                pruned_bank_id,
472                is_serialized_with_abs,
473            );
474        }
475
476        count
477    }
478
479    fn remove_dead_slots(
480        &self,
481        bank: &Bank,
482        removed_slots_count: &mut usize,
483        total_remove_slots_time: &mut u64,
484    ) {
485        let mut remove_slots_time = Measure::start("remove_slots_time");
486        *removed_slots_count += self.handle_request(bank, true);
487        remove_slots_time.stop();
488        *total_remove_slots_time += remove_slots_time.as_us();
489
490        if *removed_slots_count >= 100 {
491            datapoint_info!(
492                "remove_slots_timing",
493                ("remove_slots_time", *total_remove_slots_time, i64),
494                ("removed_slots_count", *removed_slots_count, i64),
495            );
496            *total_remove_slots_time = 0;
497            *removed_slots_count = 0;
498        }
499    }
500}
501
502pub struct AbsRequestHandlers {
503    pub snapshot_request_handler: SnapshotRequestHandler,
504    pub pruned_banks_request_handler: PrunedBanksRequestHandler,
505}
506
507impl AbsRequestHandlers {
508    // Returns the latest requested snapshot block height, if one exists
509    #[allow(clippy::type_complexity)]
510    pub fn handle_snapshot_requests(
511        &self,
512        test_hash_calculation: bool,
513        non_snapshot_time_us: u128,
514        last_full_snapshot_slot: &mut Option<Slot>,
515    ) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
516        self.snapshot_request_handler.handle_snapshot_requests(
517            test_hash_calculation,
518            non_snapshot_time_us,
519            last_full_snapshot_slot,
520        )
521    }
522}
523
524pub struct AccountsBackgroundService {
525    t_background: JoinHandle<()>,
526}
527
528impl AccountsBackgroundService {
529    pub fn new(
530        bank_forks: Arc<RwLock<BankForks>>,
531        exit: &Arc<AtomicBool>,
532        request_handlers: AbsRequestHandlers,
533        test_hash_calculation: bool,
534        mut last_full_snapshot_slot: Option<Slot>,
535    ) -> Self {
536        info!("AccountsBackgroundService active");
537        let exit = exit.clone();
538        let mut last_cleaned_block_height = 0;
539        let mut removed_slots_count = 0;
540        let mut total_remove_slots_time = 0;
541        let mut last_expiration_check_time = Instant::now();
542        let t_background = Builder::new()
543            .name("solBgAccounts".to_string())
544            .spawn(move || {
545                let mut stats = StatsManager::new();
546                let mut last_snapshot_end_time = None;
547
548                // To support fastboot, we must ensure the storages used in the latest bank snapshot are
549                // not recycled nor removed early.  Hold an Arc of their AppendVecs to prevent them from
550                // expiring.
551                let mut last_snapshot_storages: Option<Vec<Arc<AccountStorageEntry>>> = None;
552                loop {
553                    if exit.load(Ordering::Relaxed) {
554                        break;
555                    }
556                    let start_time = Instant::now();
557
558                    // Grab the current root bank
559                    let bank = bank_forks.read().unwrap().root_bank().clone();
560
561                    // Purge accounts of any dead slots
562                    request_handlers
563                        .pruned_banks_request_handler
564                        .remove_dead_slots(
565                            &bank,
566                            &mut removed_slots_count,
567                            &mut total_remove_slots_time,
568                        );
569
570                    Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
571
572                    let non_snapshot_time = last_snapshot_end_time
573                        .map(|last_snapshot_end_time: Instant| {
574                            last_snapshot_end_time.elapsed().as_micros()
575                        })
576                        .unwrap_or_default();
577
578                    // Check to see if there were any requests for snapshotting banks
579                    // < the current root bank `bank` above.
580
581                    // Claim: Any snapshot request for slot `N` found here implies that the last cleanup
582                    // slot `M` satisfies `M < N`
583                    //
584                    // Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
585                    // but cleanup has already happened on some slot `M >= N`. Because the call to
586                    // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
587                    // then that means in some *previous* iteration of this loop, we must have gotten a root
588                    // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
589                    // snapshot request channel.
590                    //
591                    // However, this is impossible because BankForks.set_root() will always flush the snapshot
592                    // request for `N` to the snapshot request channel before setting a root `R > N`, and
593                    // snapshot_request_handler.handle_requests() will always look for the latest
594                    // available snapshot in the channel.
595                    //
596                    // NOTE: We must wait for startup verification to complete before handling
597                    // snapshot requests.  This is because startup verification and snapshot
598                    // request handling can both kick off accounts hash calculations in background
599                    // threads, and these must not happen concurrently.
600                    let snapshot_handle_result = bank
601                        .is_startup_verification_complete()
602                        .then(|| {
603                            request_handlers.handle_snapshot_requests(
604                                test_hash_calculation,
605                                non_snapshot_time,
606                                &mut last_full_snapshot_slot,
607                            )
608                        })
609                        .flatten();
610                    if snapshot_handle_result.is_some() {
611                        last_snapshot_end_time = Some(Instant::now());
612                    }
613
614                    // Note that the flush will do an internal clean of the
615                    // cache up to bank.slot(), so should be safe as long
616                    // as any later snapshots that are taken are of
617                    // slots >= bank.slot()
618                    bank.flush_accounts_cache_if_needed();
619
620                    if let Some(snapshot_handle_result) = snapshot_handle_result {
621                        // Safe, see proof above
622
623                        if let Ok((snapshot_block_height, snapshot_storages)) =
624                            snapshot_handle_result
625                        {
626                            assert!(last_cleaned_block_height <= snapshot_block_height);
627                            last_cleaned_block_height = snapshot_block_height;
628                            // Update the option, so the older one is released, causing the release of
629                            // its reference counts of the appendvecs
630                            last_snapshot_storages = Some(snapshot_storages);
631                            debug!(
632                                "Number of snapshot storages kept alive for fastboot: {}",
633                                last_snapshot_storages
634                                    .as_ref()
635                                    .map(|storages| storages.len())
636                                    .unwrap_or(0)
637                            );
638                        } else {
639                            exit.store(true, Ordering::Relaxed);
640                            return;
641                        }
642                    } else {
643                        bank.shrink_candidate_slots();
644                        if bank.block_height() - last_cleaned_block_height
645                            > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
646                        {
647                            // Note that the flush will do an internal clean of the
648                            // cache up to bank.slot(), so should be safe as long
649                            // as any later snapshots that are taken are of
650                            // slots >= bank.slot()
651                            bank.force_flush_accounts_cache();
652                            bank.clean_accounts(last_full_snapshot_slot);
653                            last_cleaned_block_height = bank.block_height();
654                        }
655                    }
656                    stats.record_and_maybe_submit(start_time.elapsed());
657                    sleep(Duration::from_millis(INTERVAL_MS));
658                }
659                info!(
660                    "ABS loop done.  Number of snapshot storages kept alive for fastboot: {}",
661                    last_snapshot_storages
662                        .map(|storages| storages.len())
663                        .unwrap_or(0)
664                );
665            })
666            .unwrap();
667
668        Self { t_background }
669    }
670
671    /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
672    /// should only be one bank, the root bank, in `bank_forks`
673    /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
674    /// the bank drop callback.
675    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
676        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
677
678        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
679        {
680            let root_bank = bank_forks.read().unwrap().root_bank();
681            root_bank.set_callback(Some(Box::new(
682                root_bank
683                    .rc
684                    .accounts
685                    .accounts_db
686                    .create_drop_bank_callback(pruned_banks_sender),
687            )));
688        }
689        pruned_banks_receiver
690    }
691
692    pub fn join(self) -> thread::Result<()> {
693        self.t_background.join()
694    }
695
696    fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
697        let now = Instant::now();
698        if now.duration_since(*last_expiration_check_time).as_secs()
699            > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
700        {
701            bank.expire_old_recycle_stores();
702            *last_expiration_check_time = now;
703        }
704    }
705}
706
707/// Get the AccountsPackageType from a given SnapshotRequest
708#[must_use]
709fn new_accounts_package_type(
710    snapshot_request: &SnapshotRequest,
711    snapshot_config: &SnapshotConfig,
712    last_full_snapshot_slot: Option<Slot>,
713) -> AccountsPackageType {
714    let block_height = snapshot_request.snapshot_root_bank.block_height();
715    match snapshot_request.request_type {
716        SnapshotRequestType::EpochAccountsHash => AccountsPackageType::EpochAccountsHash,
717        _ => {
718            if snapshot_utils::should_take_full_snapshot(
719                block_height,
720                snapshot_config.full_snapshot_archive_interval_slots,
721            ) {
722                AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
723            } else if snapshot_utils::should_take_incremental_snapshot(
724                block_height,
725                snapshot_config.incremental_snapshot_archive_interval_slots,
726                last_full_snapshot_slot,
727            ) {
728                AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(
729                    last_full_snapshot_slot.unwrap(),
730                ))
731            } else {
732                AccountsPackageType::AccountsHashVerifier
733            }
734        }
735    }
736}
737
738/// Compare snapshot requests; used to pick the highest priority request to handle.
739///
740/// Priority, from highest to lowest:
741/// - Epoch Accounts Hash
742/// - Full Snapshot
743/// - Incremental Snapshot
744/// - Accounts Hash Verifier
745///
746/// If two requests of the same type are being compared, their bank slots are the tiebreaker.
747#[must_use]
748fn cmp_requests_by_priority(
749    a: &(SnapshotRequest, AccountsPackageType),
750    b: &(SnapshotRequest, AccountsPackageType),
751) -> std::cmp::Ordering {
752    let (snapshot_request_a, accounts_package_type_a) = a;
753    let (snapshot_request_b, accounts_package_type_b) = b;
754    let slot_a = snapshot_request_a.snapshot_root_bank.slot();
755    let slot_b = snapshot_request_b.snapshot_root_bank.slot();
756    snapshot_package::cmp_accounts_package_types_by_priority(
757        accounts_package_type_a,
758        accounts_package_type_b,
759    )
760    .then(slot_a.cmp(&slot_b))
761}
762
763#[cfg(test)]
764mod test {
765    use {
766        super::*,
767        crate::{
768            epoch_accounts_hash::{self, EpochAccountsHash},
769            genesis_utils::create_genesis_config,
770        },
771        crossbeam_channel::unbounded,
772        solana_sdk::{
773            account::AccountSharedData, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
774        },
775    };
776
777    #[test]
778    fn test_accounts_background_service_remove_dead_slots() {
779        let genesis = create_genesis_config(10);
780        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
781        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
782        let pruned_banks_request_handler = PrunedBanksRequestHandler {
783            pruned_banks_receiver,
784        };
785
786        // Store an account in slot 0
787        let account_key = Pubkey::new_unique();
788        bank0.store_account(
789            &account_key,
790            &AccountSharedData::new(264, 0, &Pubkey::default()),
791        );
792        assert!(bank0.get_account(&account_key).is_some());
793        pruned_banks_sender.send((0, 0)).unwrap();
794
795        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
796
797        pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
798
799        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
800    }
801
802    /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
803    ///
804    /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
805    /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
806    /// This is needed if, for example, an Epoch Accounts Hash for slot X and a Full Snapshot for
807    /// slot X+1 are both in the request channel.  The EAH needs to be handled first, but the full
808    /// snapshot should also be handled afterwards, since future incremental snapshots will depend
809    /// on it.
810    #[test]
811    fn test_get_next_snapshot_request() {
812        // These constants were picked to ensure the desired snapshot requests were sent to the
813        // channel.  With 400 slots per Epoch, the EAH start will be at slot 100.  Ensure there are
814        // other requests before this slot, and then 2+ requests of each type afterwards (to
815        // further test the prioritization logic).
816        const SLOTS_PER_EPOCH: Slot = 400;
817        const FULL_SNAPSHOT_INTERVAL: Slot = 80;
818        const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
819
820        let snapshot_config = SnapshotConfig {
821            full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
822            incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
823            ..SnapshotConfig::default()
824        };
825
826        let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
827        let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
828        let snapshot_request_handler = SnapshotRequestHandler {
829            snapshot_config,
830            snapshot_request_sender: snapshot_request_sender.clone(),
831            snapshot_request_receiver,
832            accounts_package_sender,
833        };
834
835        let send_snapshot_request = |snapshot_root_bank, request_type| {
836            let snapshot_request = SnapshotRequest {
837                snapshot_root_bank,
838                status_cache_slot_deltas: Vec::default(),
839                request_type,
840                enqueued: Instant::now(),
841            };
842            snapshot_request_sender.send(snapshot_request).unwrap();
843        };
844
845        let mut genesis_config_info = create_genesis_config(10);
846        genesis_config_info.genesis_config.epoch_schedule =
847            EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
848        let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
849        bank.set_startup_verification_complete();
850        // Need to set the EAH to Valid so that `Bank::new_from_parent()` doesn't panic during
851        // freeze when parent is in the EAH calculation window.
852        bank.rc
853            .accounts
854            .accounts_db
855            .epoch_accounts_hash_manager
856            .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
857
858        // Create new banks and send snapshot requests so that the following requests will be in
859        // the channel before handling the requests:
860        //
861        // fss  80
862        // iss  90
863        // eah 100 <-- handled 1st
864        // iss 120
865        // iss 150
866        // fss 160
867        // iss 180
868        // iss 210
869        // fss 240 <-- handled 2nd
870        // iss 270
871        // iss 300 <-- handled 3rd
872        // ahv 301
873        // ahv 302
874        // ahv 303 <-- handled 4th
875        //
876        // (slots not called out will all be AHV)
877        // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
878        // actually be AHV since the last full snapshot slot will be `None`.  This is expected and
879        // fine; but maybe unexpected for a reader/debugger without this additional context.
880        let mut parent = Arc::clone(&bank);
881        for _ in 0..303 {
882            let bank = Arc::new(Bank::new_from_parent(
883                &parent,
884                &Pubkey::new_unique(),
885                parent.slot() + 1,
886            ));
887
888            if bank.slot() == epoch_accounts_hash::calculation_start(&bank) {
889                send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::EpochAccountsHash);
890            } else {
891                send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::Snapshot);
892            }
893
894            parent = bank;
895        }
896
897        // Ensure the EAH is handled 1st
898        let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
899            .get_next_snapshot_request(None)
900            .unwrap();
901        assert_eq!(
902            accounts_package_type,
903            AccountsPackageType::EpochAccountsHash
904        );
905        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
906
907        // Ensure the full snapshot from slot 240 is handled 2nd
908        // (the older full snapshots are skipped and dropped)
909        let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
910            .get_next_snapshot_request(None)
911            .unwrap();
912        assert_eq!(
913            accounts_package_type,
914            AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
915        );
916        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
917
918        // Ensure the incremental snapshot from slot 300 is handled 3rd
919        // (the older incremental snapshots are skipped and dropped)
920        let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
921            .get_next_snapshot_request(Some(240))
922            .unwrap();
923        assert_eq!(
924            accounts_package_type,
925            AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(240))
926        );
927        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
928
929        // Ensure the accounts hash verifier from slot 303 is handled 4th
930        // (the older accounts hash verifiers are skipped and dropped)
931        let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
932            .get_next_snapshot_request(Some(240))
933            .unwrap();
934        assert_eq!(
935            accounts_package_type,
936            AccountsPackageType::AccountsHashVerifier
937        );
938        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 303);
939
940        // And now ensure the snapshot request channel is empty!
941        assert!(snapshot_request_handler
942            .get_next_snapshot_request(Some(240))
943            .is_none());
944    }
945}