solana_runtime/
accounts_background_service.rs

1//! Service to clean up dead slots in accounts_db
2//!
3//! This can be expensive since we have to walk the append vecs being cleaned up.
4
5mod stats;
6#[cfg(feature = "dev-context-only-utils")]
7use qualifier_attr::qualifiers;
8use {
9    crate::{
10        bank::{Bank, BankSlotDelta, DropCallback},
11        bank_forks::BankForks,
12        snapshot_bank_utils,
13        snapshot_controller::SnapshotController,
14        snapshot_package::{AccountsPackage, AccountsPackageKind, SnapshotKind},
15        snapshot_utils::SnapshotError,
16    },
17    crossbeam_channel::{Receiver, SendError, Sender},
18    log::*,
19    rayon::iter::{IntoParallelIterator, ParallelIterator},
20    solana_accounts_db::{
21        accounts_db::CalcAccountsHashDataSource, accounts_hash::CalcAccountsHashConfig,
22    },
23    solana_clock::{BankId, Slot},
24    solana_measure::{measure::Measure, measure_us},
25    stats::StatsManager,
26    std::{
27        boxed::Box,
28        cmp,
29        fmt::{self, Debug, Formatter},
30        sync::{
31            atomic::{AtomicBool, AtomicU64, Ordering},
32            Arc, LazyLock, RwLock,
33        },
34        thread::{self, sleep, Builder, JoinHandle},
35        time::{Duration, Instant},
36    },
37};
38
39const INTERVAL_MS: u64 = 100;
40// Set the clean interval duration to be approximately how long before the next incremental
41// snapshot request is received, plus some buffer.  The default incremental snapshot interval is
42// 100 slots, which ends up being 40 seconds plus buffer.
43const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
44const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
45
46pub type SnapshotRequestSender = Sender<SnapshotRequest>;
47pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
48pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
49pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
50
51/// interval to report bank_drop queue events: 60s
52const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
53/// maximum drop bank signal queue length
54const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
55
56#[derive(Debug, Default)]
57struct PrunedBankQueueLenReporter {
58    last_report_time: AtomicU64,
59}
60
61impl PrunedBankQueueLenReporter {
62    fn report(&self, q_len: usize) {
63        let now = solana_time_utils::timestamp();
64        let last_report_time = self.last_report_time.load(Ordering::Acquire);
65        if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
66            && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
67        {
68            datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
69            self.last_report_time.store(now, Ordering::Release);
70        }
71    }
72}
73
74static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
75    LazyLock::new(PrunedBankQueueLenReporter::default);
76
77#[derive(Clone)]
78pub struct SendDroppedBankCallback {
79    sender: DroppedSlotsSender,
80}
81
82impl DropCallback for SendDroppedBankCallback {
83    fn callback(&self, bank: &Bank) {
84        BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
85        if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
86            info!("bank DropCallback signal queue disconnected.");
87        }
88    }
89
90    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
91        Box::new(self.clone())
92    }
93}
94
95impl Debug for SendDroppedBankCallback {
96    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97        write!(f, "SendDroppedBankCallback({self:p})")
98    }
99}
100
101impl SendDroppedBankCallback {
102    pub fn new(sender: DroppedSlotsSender) -> Self {
103        Self { sender }
104    }
105}
106
107pub struct SnapshotRequest {
108    pub snapshot_root_bank: Arc<Bank>,
109    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
110    pub request_kind: SnapshotRequestKind,
111
112    /// The instant this request was send to the queue.
113    /// Used to track how long requests wait before processing.
114    pub enqueued: Instant,
115}
116
117impl Debug for SnapshotRequest {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("SnapshotRequest")
120            .field("request kind", &self.request_kind)
121            .field("bank slot", &self.snapshot_root_bank.slot())
122            .field("block height", &self.snapshot_root_bank.block_height())
123            .finish_non_exhaustive()
124    }
125}
126
127/// What kind of request is this?
128///
129/// The snapshot request has been expanded to support more than just snapshots.  This is
130/// confusing, but can be resolved by renaming this type; or better, by creating an enum with
131/// variants that wrap the fields-of-interest for each request.
132#[derive(Debug, Copy, Clone, Eq, PartialEq)]
133pub enum SnapshotRequestKind {
134    FullSnapshot,
135    IncrementalSnapshot,
136    EpochAccountsHash,
137}
138
139pub struct SnapshotRequestHandler {
140    pub snapshot_controller: Arc<SnapshotController>,
141    pub snapshot_request_receiver: SnapshotRequestReceiver,
142    pub accounts_package_sender: Sender<AccountsPackage>,
143}
144
145impl SnapshotRequestHandler {
146    // Returns the latest requested snapshot slot and storages
147    #[allow(clippy::type_complexity)]
148    pub fn handle_snapshot_requests(
149        &self,
150        test_hash_calculation: bool,
151        non_snapshot_time_us: u128,
152        exit: &AtomicBool,
153    ) -> Option<Result<Slot, SnapshotError>> {
154        let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
155            self.get_next_snapshot_request()?;
156
157        datapoint_info!(
158            "handle_snapshot_requests",
159            ("num_outstanding_requests", num_outstanding_requests, i64),
160            ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
161            (
162                "enqueued_time_us",
163                snapshot_request.enqueued.elapsed().as_micros(),
164                i64
165            ),
166        );
167
168        let accounts_package_kind = new_accounts_package_kind(&snapshot_request)?;
169        Some(self.handle_snapshot_request(
170            test_hash_calculation,
171            non_snapshot_time_us,
172            snapshot_request,
173            accounts_package_kind,
174            exit,
175        ))
176    }
177
178    /// Get the next snapshot request to handle
179    ///
180    /// Look through the snapshot request channel to find the highest priority one to handle next.
181    /// If there are no snapshot requests in the channel, return None.  Otherwise return the
182    /// highest priority one.  Unhandled snapshot requests with slots GREATER-THAN the handled one
183    /// will be re-enqueued.  The remaining will be dropped.
184    ///
185    /// Also return the number of snapshot requests initially in the channel, and the number of
186    /// ones re-enqueued.
187    fn get_next_snapshot_request(
188        &self,
189    ) -> Option<(
190        SnapshotRequest,
191        /*num outstanding snapshot requests*/ usize,
192        /*num re-enqueued snapshot requests*/ usize,
193    )> {
194        let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
195        let requests_len = requests.len();
196        debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
197
198        // NOTE: This code to select the next request is mirrored in AccountsHashVerifier.
199        // Please ensure they stay in sync.
200        match requests_len {
201            0 => None,
202            1 => {
203                // SAFETY: We know the len is 1, so `pop` will return `Some`
204                let snapshot_request = requests.pop().unwrap();
205                Some((snapshot_request, 1, 0))
206            }
207            _ => {
208                let num_eah_requests = requests
209                    .iter()
210                    .filter(|request| {
211                        request.request_kind == SnapshotRequestKind::EpochAccountsHash
212                    })
213                    .count();
214                assert!(
215                    num_eah_requests <= 1,
216                    "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
217                );
218
219                // Get the two highest priority requests, `y` and `z`.
220                // By asking for the second-to-last element to be in its final sorted position, we
221                // also ensure that the last element is also sorted.
222                let (_, y, z) =
223                    requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
224                assert_eq!(z.len(), 1);
225                let z = z.first().unwrap();
226                let y: &_ = y; // reborrow to remove `mut`
227
228                // If the highest priority request (`z`) is EpochAccountsHash, we need to check if
229                // there's a FullSnapshot request with a lower slot in `y` that is about to be
230                // dropped.  We do not want to drop a FullSnapshot request in this case because it
231                // will cause subsequent IncrementalSnapshot requests to fail.
232                //
233                // So, if `z` is an EpochAccountsHash request, check `y`.  We know there can only
234                // be at most one EpochAccountsHash request, so `y` is the only other request we
235                // need to check.  If `y` is a FullSnapshot request *with a lower slot* than `z`,
236                // then handle `y` first.
237                let snapshot_request = if z.request_kind == SnapshotRequestKind::EpochAccountsHash
238                    && y.request_kind == SnapshotRequestKind::FullSnapshot
239                    && y.snapshot_root_bank.slot() < z.snapshot_root_bank.slot()
240                {
241                    // SAFETY: We know the len is > 1, so both `pop`s will return `Some`
242                    let z = requests.pop().unwrap();
243                    let y = requests.pop().unwrap();
244                    requests.push(z);
245                    y
246                } else {
247                    // SAFETY: We know the len is > 1, so `pop` will return `Some`
248                    requests.pop().unwrap()
249                };
250
251                let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
252                // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
253                let num_re_enqueued_requests = requests
254                    .into_iter()
255                    .filter(|snapshot_request| {
256                        snapshot_request.snapshot_root_bank.slot() > handled_request_slot
257                    })
258                    .map(|snapshot_request| {
259                        self.snapshot_controller
260                            .request_sender()
261                            .try_send(snapshot_request)
262                            .expect("re-enqueue snapshot request");
263                    })
264                    .count();
265
266                Some((snapshot_request, requests_len, num_re_enqueued_requests))
267            }
268        }
269    }
270
271    fn handle_snapshot_request(
272        &self,
273        test_hash_calculation: bool,
274        non_snapshot_time_us: u128,
275        snapshot_request: SnapshotRequest,
276        accounts_package_kind: AccountsPackageKind,
277        exit: &AtomicBool,
278    ) -> Result<Slot, SnapshotError> {
279        info!("handling snapshot request: {snapshot_request:?}, {accounts_package_kind:?}");
280        let mut total_time = Measure::start("snapshot_request_receiver_total_time");
281        let SnapshotRequest {
282            snapshot_root_bank,
283            status_cache_slot_deltas,
284            request_kind,
285            enqueued: _,
286        } = snapshot_request;
287
288        // we should not rely on the state of this validator until startup verification is complete
289        assert!(snapshot_root_bank.is_startup_verification_complete());
290
291        if accounts_package_kind == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot) {
292            // The latest full snapshot slot is what accounts-db uses to properly handle
293            // zero lamport accounts.  We are handling a full snapshot request here, and
294            // since taking a snapshot is not allowed to fail, we can update accounts-db now.
295            snapshot_root_bank
296                .rc
297                .accounts
298                .accounts_db
299                .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
300        }
301
302        let previous_accounts_hash = test_hash_calculation.then(|| {
303            // We have to use the index version here.
304            // We cannot calculate the non-index way because cache
305            // has not been flushed and stores don't match reality.
306            snapshot_root_bank
307                .update_accounts_hash(CalcAccountsHashDataSource::IndexForTests, false)
308        });
309
310        let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
311        // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
312        // That's because `snapshot_root_bank.slot()` must be root at this point,
313        // and contains relevant updates because each bank has at least 1 account update due
314        // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
315        snapshot_root_bank.force_flush_accounts_cache();
316        // Ensure all roots <= `self.slot()` have been flushed.
317        // Note `max_flush_root` could be larger than self.slot() if there are
318        // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
319        assert!(
320            snapshot_root_bank.slot()
321                <= snapshot_root_bank
322                    .rc
323                    .accounts
324                    .accounts_db
325                    .accounts_cache
326                    .fetch_max_flush_root()
327        );
328        flush_accounts_cache_time.stop();
329
330        let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
331            let (this_accounts_hash, capitalization) = snapshot_root_bank
332                .accounts()
333                .accounts_db
334                .calculate_accounts_hash_from(
335                    CalcAccountsHashDataSource::Storages,
336                    snapshot_root_bank.slot(),
337                    &CalcAccountsHashConfig {
338                        use_bg_thread_pool: true,
339                        ancestors: None,
340                        epoch_schedule: snapshot_root_bank.epoch_schedule(),
341                        rent_collector: snapshot_root_bank.rent_collector(),
342                        store_detailed_debug_info_on_failure: false,
343                    },
344                );
345            assert_eq!(previous_accounts_hash, this_accounts_hash);
346            assert_eq!(capitalization, snapshot_root_bank.capitalization());
347            this_accounts_hash
348        });
349
350        let mut clean_time = Measure::start("clean_time");
351        snapshot_root_bank.clean_accounts();
352        clean_time.stop();
353
354        let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
355
356        let mut shrink_time = Measure::start("shrink_time");
357        snapshot_root_bank.shrink_candidate_slots();
358        shrink_time.stop();
359
360        // Snapshot the bank and send over an accounts package
361        let mut snapshot_time = Measure::start("snapshot_time");
362        let snapshot_storages = snapshot_bank_utils::get_snapshot_storages(&snapshot_root_bank);
363        let accounts_package = match request_kind {
364            SnapshotRequestKind::FullSnapshot | SnapshotRequestKind::IncrementalSnapshot => {
365                match &accounts_package_kind {
366                    AccountsPackageKind::Snapshot(_) => AccountsPackage::new_for_snapshot(
367                        accounts_package_kind,
368                        &snapshot_root_bank,
369                        snapshot_storages,
370                        status_cache_slot_deltas,
371                        accounts_hash_for_testing,
372                    ),
373                    AccountsPackageKind::EpochAccountsHash => panic!(
374                        "Illegal account package type: EpochAccountsHash packages must \
375                         be from an EpochAccountsHash request!"
376                    ),
377                }
378            }
379            SnapshotRequestKind::EpochAccountsHash => AccountsPackage::new_for_epoch_accounts_hash(
380                accounts_package_kind,
381                &snapshot_root_bank,
382                snapshot_storages,
383                accounts_hash_for_testing,
384            ),
385        };
386        let send_result = self.accounts_package_sender.send(accounts_package);
387        if let Err(err) = send_result {
388            // Sending the accounts package should never fail *unless* we're shutting down.
389            let accounts_package = &err.0;
390            assert!(
391                exit.load(Ordering::Relaxed),
392                "Failed to send accounts package: {err}, {accounts_package:?}"
393            );
394        }
395        snapshot_time.stop();
396        info!(
397            "Handled snapshot request. accounts package kind: {:?}, slot: {}, bank hash: {}",
398            accounts_package_kind,
399            snapshot_root_bank.slot(),
400            snapshot_root_bank.hash(),
401        );
402
403        total_time.stop();
404
405        datapoint_info!(
406            "handle_snapshot_requests-timing",
407            (
408                "flush_accounts_cache_time",
409                flush_accounts_cache_time.as_us(),
410                i64
411            ),
412            ("shrink_time", shrink_time.as_us(), i64),
413            ("clean_time", clean_time.as_us(), i64),
414            ("snapshot_time", snapshot_time.as_us(), i64),
415            ("total_us", total_time.as_us(), i64),
416            ("non_snapshot_time_us", non_snapshot_time_us, i64),
417            ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
418        );
419        Ok(snapshot_root_bank.slot())
420    }
421
422    /// Returns the slot of the next snapshot request to be handled
423    fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
424        // We reuse `get_next_snapshot_request()` here, since it already implements all the logic
425        // for getting the highest priority request, *AND* we leverage its test coverage.
426        // Additionally, since `get_next_snapshot_request()` drops old requests, we might get to
427        // proactively clean up old banks earlier as well!
428        let (next_request, _, _) = self.get_next_snapshot_request()?;
429        let next_slot = next_request.snapshot_root_bank.slot();
430
431        // make sure to re-enqueue the request, otherwise we'd lose it!
432        self.snapshot_controller
433            .request_sender()
434            .try_send(next_request)
435            .expect("re-enqueue snapshot request");
436
437        Some(next_slot)
438    }
439}
440
441#[derive(Debug)]
442pub struct PrunedBanksRequestHandler {
443    pub pruned_banks_receiver: DroppedSlotsReceiver,
444}
445
446impl PrunedBanksRequestHandler {
447    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
448    fn handle_request(&self, bank: &Bank) -> usize {
449        let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
450        // We need a stable sort to ensure we purge banks—with the same slot—in the same order
451        // they were sent into the channel.
452        banks_to_purge.sort_by_key(|(slot, _id)| *slot);
453        let num_banks_to_purge = banks_to_purge.len();
454
455        // Group the banks into slices with the same slot
456        let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
457
458        // Log whenever we need to handle banks with the same slot.  Purposely do this *before* we
459        // call `purge_slot()` to ensure we get the datapoint (in case there's an assert/panic).
460        let num_banks_with_same_slot =
461            num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
462        if num_banks_with_same_slot > 0 {
463            datapoint_info!(
464                "pruned_banks_request_handler",
465                ("num_pruned_banks", num_banks_to_purge, i64),
466                ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
467            );
468        }
469
470        // Purge all the slots in parallel
471        // Banks for the same slot are purged sequentially
472        let accounts_db = bank.rc.accounts.accounts_db.as_ref();
473        accounts_db.thread_pool_clean.install(|| {
474            grouped_banks_to_purge.into_par_iter().for_each(|group| {
475                group.iter().for_each(|(slot, bank_id)| {
476                    accounts_db.purge_slot(*slot, *bank_id, true);
477                })
478            });
479        });
480
481        num_banks_to_purge
482    }
483
484    fn remove_dead_slots(
485        &self,
486        bank: &Bank,
487        removed_slots_count: &mut usize,
488        total_remove_slots_time: &mut u64,
489    ) {
490        let mut remove_slots_time = Measure::start("remove_slots_time");
491        *removed_slots_count += self.handle_request(bank);
492        remove_slots_time.stop();
493        *total_remove_slots_time += remove_slots_time.as_us();
494
495        if *removed_slots_count >= 100 {
496            datapoint_info!(
497                "remove_slots_timing",
498                ("remove_slots_time", *total_remove_slots_time, i64),
499                ("removed_slots_count", *removed_slots_count, i64),
500            );
501            *total_remove_slots_time = 0;
502            *removed_slots_count = 0;
503        }
504    }
505}
506
507pub struct AbsRequestHandlers {
508    pub snapshot_request_handler: SnapshotRequestHandler,
509    pub pruned_banks_request_handler: PrunedBanksRequestHandler,
510}
511
512impl AbsRequestHandlers {
513    // Returns the latest requested snapshot slot, if one exists
514    #[allow(clippy::type_complexity)]
515    pub fn handle_snapshot_requests(
516        &self,
517        test_hash_calculation: bool,
518        non_snapshot_time_us: u128,
519        exit: &AtomicBool,
520    ) -> Option<Result<Slot, SnapshotError>> {
521        self.snapshot_request_handler.handle_snapshot_requests(
522            test_hash_calculation,
523            non_snapshot_time_us,
524            exit,
525        )
526    }
527}
528
529pub struct AccountsBackgroundService {
530    t_background: JoinHandle<()>,
531    status: AbsStatus,
532}
533
534impl AccountsBackgroundService {
535    pub fn new(
536        bank_forks: Arc<RwLock<BankForks>>,
537        exit: Arc<AtomicBool>,
538        request_handlers: AbsRequestHandlers,
539        test_hash_calculation: bool,
540    ) -> Self {
541        let is_running = Arc::new(AtomicBool::new(true));
542        let stop = Arc::new(AtomicBool::new(false));
543        let mut last_cleaned_slot = 0;
544        let mut removed_slots_count = 0;
545        let mut total_remove_slots_time = 0;
546        let t_background = Builder::new()
547            .name("solBgAccounts".to_string())
548            .spawn({
549                let is_running = is_running.clone();
550                let stop = stop.clone();
551
552                move || {
553                    info!("AccountsBackgroundService has started");
554                    let mut stats = StatsManager::new();
555                    let mut last_snapshot_end_time = None;
556                    let mut previous_clean_time = Instant::now();
557                    let mut previous_shrink_time = Instant::now();
558
559                    loop {
560                        if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
561                            break;
562                        }
563                        let start_time = Instant::now();
564
565                        // Grab the current root bank
566                        let bank = bank_forks.read().unwrap().root_bank();
567
568                        // Purge accounts of any dead slots
569                        request_handlers
570                            .pruned_banks_request_handler
571                            .remove_dead_slots(
572                                &bank,
573                                &mut removed_slots_count,
574                                &mut total_remove_slots_time,
575                            );
576
577                        let non_snapshot_time = last_snapshot_end_time
578                            .map(|last_snapshot_end_time: Instant| {
579                                last_snapshot_end_time.elapsed().as_micros()
580                            })
581                            .unwrap_or_default();
582
583                        // Check to see if there were any requests for snapshotting banks
584                        // < the current root bank `bank` above.
585                        //
586                        // Claim: Any snapshot request for slot `N` found here implies that the
587                        // last cleanup slot `M` satisfies `M < N`
588                        //
589                        // Proof: Assume for contradiction that we find a snapshot request for slot
590                        // `N` here, but cleanup has already happened on some slot `M >= N`.
591                        // Because the call to `bank.clean_accounts(true)` (in the code below)
592                        // implies we only clean slots `<= bank - 1`, then that means in some
593                        // *previous* iteration of this loop, we must have gotten a root bank for
594                        // slot some slot `R` where `R > N`, but did not see the snapshot for `N`
595                        // in the snapshot request channel.
596                        //
597                        // However, this is impossible because BankForks.set_root() will always
598                        // flush the snapshot request for `N` to the snapshot request channel
599                        // before setting a root `R > N`, and
600                        // snapshot_request_handler.handle_requests() will always look for the
601                        // latest available snapshot in the channel.
602                        //
603                        // NOTE: We must wait for startup verification to complete before handling
604                        // snapshot requests.  This is because startup verification and snapshot
605                        // request handling can both kick off accounts hash calculations in
606                        // background threads, and these must not happen concurrently.
607                        let snapshot_handle_result = bank
608                            .is_startup_verification_complete()
609                            .then(|| {
610                                request_handlers.handle_snapshot_requests(
611                                    test_hash_calculation,
612                                    non_snapshot_time,
613                                    &exit,
614                                )
615                            })
616                            .flatten();
617
618                        if let Some(snapshot_handle_result) = snapshot_handle_result {
619                            // Safe, see proof above
620
621                            last_snapshot_end_time = Some(Instant::now());
622                            match snapshot_handle_result {
623                                Ok(snapshot_slot) => {
624                                    assert!(
625                                        last_cleaned_slot <= snapshot_slot,
626                                        "last cleaned slot: {last_cleaned_slot}, \
627                                         snapshot request slot: {snapshot_slot}, \
628                                         is startup verification complete: {}, \
629                                         enqueued snapshot requests: {:?}",
630                                        bank.is_startup_verification_complete(),
631                                        request_handlers
632                                            .snapshot_request_handler
633                                            .snapshot_request_receiver
634                                            .try_iter()
635                                            .collect::<Vec<_>>(),
636                                    );
637                                    last_cleaned_slot = snapshot_slot;
638                                    previous_clean_time = Instant::now();
639                                    previous_shrink_time = Instant::now();
640                                }
641                                Err(err) => {
642                                    error!(
643                                        "Stopping AccountsBackgroundService! \
644                                         Fatal error while handling snapshot requests: {err}",
645                                    );
646                                    exit.store(true, Ordering::Relaxed);
647                                    break;
648                                }
649                            }
650                        } else {
651                            // we didn't handle a snapshot request, so do flush/clean/shrink
652
653                            let next_snapshot_request_slot = request_handlers
654                                .snapshot_request_handler
655                                .peek_next_snapshot_request_slot();
656
657                            // We cannot clean past the next snapshot request slot because it may
658                            // have zero-lamport accounts.  See the comments in
659                            // Bank::clean_accounts() for more information.
660                            let max_clean_slot_inclusive = cmp::min(
661                                next_snapshot_request_slot.unwrap_or(Slot::MAX),
662                                bank.slot(),
663                            )
664                            .saturating_sub(1);
665
666                            let duration_since_previous_clean = previous_clean_time.elapsed();
667                            let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
668
669                            // if we're cleaning, then force flush, otherwise be lazy
670                            let force_flush = should_clean;
671                            bank.rc
672                                .accounts
673                                .accounts_db
674                                .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
675
676                            if should_clean {
677                                bank.rc.accounts.accounts_db.clean_accounts(
678                                    Some(max_clean_slot_inclusive),
679                                    false,
680                                    bank.epoch_schedule(),
681                                    bank.clean_accounts_old_storages_policy(),
682                                );
683                                last_cleaned_slot = max_clean_slot_inclusive;
684                                previous_clean_time = Instant::now();
685                            }
686
687                            let duration_since_previous_shrink = previous_shrink_time.elapsed();
688                            let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
689                            // To avoid pathological interactions between the clean and shrink
690                            // timers, call shrink for either should_shrink or should_clean.
691                            if should_shrink || should_clean {
692                                if should_clean {
693                                    // We used to only squash (aka shrink ancients) when we also
694                                    // cleaned, so keep that same behavior here for now.
695                                    bank.shrink_ancient_slots();
696                                }
697                                bank.shrink_candidate_slots();
698                                previous_shrink_time = Instant::now();
699                            }
700                        }
701                        stats.record_and_maybe_submit(start_time.elapsed());
702                        sleep(Duration::from_millis(INTERVAL_MS));
703                    }
704                    info!("AccountsBackgroundService has stopped");
705                    is_running.store(false, Ordering::Relaxed);
706                }
707            })
708            .unwrap();
709
710        Self {
711            t_background,
712            status: AbsStatus { is_running, stop },
713        }
714    }
715
716    /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
717    /// should only be one bank, the root bank, in `bank_forks`
718    /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
719    /// the bank drop callback.
720    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
721        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
722
723        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
724        {
725            let root_bank = bank_forks.read().unwrap().root_bank();
726
727            root_bank
728                .rc
729                .accounts
730                .accounts_db
731                .enable_bank_drop_callback();
732            root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
733                pruned_banks_sender,
734            ))));
735        }
736        pruned_banks_receiver
737    }
738
739    pub fn join(self) -> thread::Result<()> {
740        self.t_background.join()
741    }
742
743    /// Returns an object to query/manage the status of ABS
744    pub fn status(&self) -> &AbsStatus {
745        &self.status
746    }
747}
748
749/// Query and manage the status of AccountsBackgroundService
750#[derive(Debug, Clone)]
751pub struct AbsStatus {
752    /// Flag to query if ABS is running
753    is_running: Arc<AtomicBool>,
754    /// Flag to set to stop ABS
755    stop: Arc<AtomicBool>,
756}
757
758impl AbsStatus {
759    /// Returns if ABS is running
760    pub fn is_running(&self) -> bool {
761        self.is_running.load(Ordering::Relaxed)
762    }
763
764    /// Raises the flag for ABS to stop
765    pub fn stop(&self) {
766        self.stop.store(true, Ordering::Relaxed)
767    }
768
769    #[cfg(feature = "dev-context-only-utils")]
770    pub fn new_for_tests() -> Self {
771        Self {
772            is_running: Arc::new(AtomicBool::new(false)),
773            stop: Arc::new(AtomicBool::new(false)),
774        }
775    }
776}
777
778/// Get the AccountsPackageKind from a given SnapshotRequest
779#[must_use]
780fn new_accounts_package_kind(snapshot_request: &SnapshotRequest) -> Option<AccountsPackageKind> {
781    match snapshot_request.request_kind {
782        SnapshotRequestKind::EpochAccountsHash => Some(AccountsPackageKind::EpochAccountsHash),
783        SnapshotRequestKind::FullSnapshot => {
784            Some(AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot))
785        }
786        SnapshotRequestKind::IncrementalSnapshot => {
787            if let Some(latest_full_snapshot_slot) = snapshot_request
788                .snapshot_root_bank
789                .rc
790                .accounts
791                .accounts_db
792                .latest_full_snapshot_slot()
793            {
794                Some(AccountsPackageKind::Snapshot(
795                    SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot),
796                ))
797            } else {
798                warn!(
799                    "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
800                     full snapshot",
801                    snapshot_request.snapshot_root_bank.slot()
802                );
803                None
804            }
805        }
806    }
807}
808
809/// Compare snapshot requests; used to pick the highest priority request to handle.
810///
811/// Priority, from highest to lowest:
812/// - Epoch Accounts Hash
813/// - Full Snapshot
814/// - Incremental Snapshot
815///
816/// If two requests of the same kind are being compared, their bank slots are the tiebreaker.
817#[must_use]
818fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
819    let slot_a = a.snapshot_root_bank.slot();
820    let slot_b = b.snapshot_root_bank.slot();
821    cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
822        .then(slot_a.cmp(&slot_b))
823}
824
825/// Compare snapshot request kinds by priority
826///
827/// Priority, from highest to lowest:
828/// - Epoch Accounts Hash
829/// - Full Snapshot
830/// - Incremental Snapshot
831#[must_use]
832fn cmp_snapshot_request_kinds_by_priority(
833    a: &SnapshotRequestKind,
834    b: &SnapshotRequestKind,
835) -> cmp::Ordering {
836    use {
837        cmp::Ordering::{Equal, Greater, Less},
838        SnapshotRequestKind as Kind,
839    };
840    match (a, b) {
841        // Epoch Accounts Hash packages
842        (Kind::EpochAccountsHash, Kind::EpochAccountsHash) => Equal,
843        (Kind::EpochAccountsHash, _) => Greater,
844        (_, Kind::EpochAccountsHash) => Less,
845
846        // Snapshot packages
847        (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
848        (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
849        (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
850        (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
851    }
852}
853
854#[cfg(test)]
855mod test {
856    use {
857        super::*,
858        crate::{
859            bank::epoch_accounts_hash_utils, genesis_utils::create_genesis_config,
860            snapshot_config::SnapshotConfig,
861        },
862        crossbeam_channel::unbounded,
863        solana_account::AccountSharedData,
864        solana_accounts_db::epoch_accounts_hash::EpochAccountsHash,
865        solana_epoch_schedule::EpochSchedule,
866        solana_hash::Hash,
867        solana_pubkey::Pubkey,
868    };
869
870    #[test]
871    fn test_accounts_background_service_remove_dead_slots() {
872        let genesis = create_genesis_config(10);
873        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
874        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
875        let pruned_banks_request_handler = PrunedBanksRequestHandler {
876            pruned_banks_receiver,
877        };
878
879        // Store an account in slot 0
880        let account_key = Pubkey::new_unique();
881        bank0.store_account(
882            &account_key,
883            &AccountSharedData::new(264, 0, &Pubkey::default()),
884        );
885        assert!(bank0.get_account(&account_key).is_some());
886        pruned_banks_sender.send((0, 0)).unwrap();
887
888        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
889
890        pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
891
892        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
893    }
894
895    /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
896    ///
897    /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
898    /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
899    /// This is needed if, for example, an Epoch Accounts Hash for slot X and a Full Snapshot for
900    /// slot X+1 are both in the request channel.  The EAH needs to be handled first, but the full
901    /// snapshot should also be handled afterwards, since future incremental snapshots will depend
902    /// on it.
903    #[test]
904    fn test_get_next_snapshot_request() {
905        // These constants were picked to ensure the desired snapshot requests were sent to the
906        // channel.  With 400 slots per Epoch, the EAH start will be at slot 100.  Ensure there are
907        // other requests before this slot, and then 2+ requests of each type afterwards (to
908        // further test the prioritization logic).
909        const SLOTS_PER_EPOCH: Slot = 400;
910        const FULL_SNAPSHOT_INTERVAL: Slot = 80;
911        const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
912
913        let snapshot_config = SnapshotConfig {
914            full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
915            incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
916            ..SnapshotConfig::default()
917        };
918
919        let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
920        let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
921        let snapshot_controller = Arc::new(SnapshotController::new(
922            snapshot_request_sender.clone(),
923            snapshot_config,
924            0,
925        ));
926        let snapshot_request_handler = SnapshotRequestHandler {
927            snapshot_controller,
928            snapshot_request_receiver,
929            accounts_package_sender,
930        };
931
932        let send_snapshot_request = |snapshot_root_bank, request_kind| {
933            let snapshot_request = SnapshotRequest {
934                snapshot_root_bank,
935                status_cache_slot_deltas: Vec::default(),
936                request_kind,
937                enqueued: Instant::now(),
938            };
939            snapshot_request_sender.send(snapshot_request).unwrap();
940        };
941
942        let mut genesis_config_info = create_genesis_config(10);
943        genesis_config_info.genesis_config.epoch_schedule =
944            EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
945        let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
946        bank.set_startup_verification_complete();
947        // Need to set the EAH to Valid so that `Bank::new_from_parent()` doesn't panic during
948        // freeze when parent is in the EAH calculation window.
949        bank.rc
950            .accounts
951            .accounts_db
952            .epoch_accounts_hash_manager
953            .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
954
955        // We need to get and set accounts-db's latest full snapshot slot to test
956        // get_next_snapshot_request().  To workaround potential borrowing issues
957        // caused by make_banks() below, Arc::clone bank0 and add helper functions.
958        let bank0 = bank.clone();
959        fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
960            bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
961        }
962        fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
963            bank.rc
964                .accounts
965                .accounts_db
966                .set_latest_full_snapshot_slot(slot);
967        }
968
969        // Create new banks and send snapshot requests so that the following requests will be in
970        // the channel before handling the requests:
971        //
972        // fss  80
973        // iss  90
974        // eah 100 <-- handled 1st
975        // iss 120
976        // iss 150
977        // fss 160
978        // iss 180
979        // iss 210
980        // fss 240 <-- handled 2nd
981        // iss 270
982        // iss 300 <-- handled 3rd
983        //
984        // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
985        // actually be skipped since the latest full snapshot slot will be `None`.
986        let mut make_banks = |num_banks| {
987            for _ in 0..num_banks {
988                let slot = bank.slot() + 1;
989                bank = Arc::new(Bank::new_from_parent(
990                    bank.clone(),
991                    &Pubkey::new_unique(),
992                    slot,
993                ));
994
995                // Since we're not using `BankForks::set_root()`, we have to handle sending the
996                // correct snapshot requests ourself.
997                if bank.slot() == epoch_accounts_hash_utils::calculation_start(&bank) {
998                    send_snapshot_request(
999                        Arc::clone(&bank),
1000                        SnapshotRequestKind::EpochAccountsHash,
1001                    );
1002                } else if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
1003                    send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
1004                } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
1005                    send_snapshot_request(
1006                        Arc::clone(&bank),
1007                        SnapshotRequestKind::IncrementalSnapshot,
1008                    );
1009                }
1010            }
1011        };
1012        make_banks(303);
1013
1014        // Ensure the EAH is handled 1st
1015        assert_eq!(latest_full_snapshot_slot(&bank0), None);
1016        let (snapshot_request, ..) = snapshot_request_handler
1017            .get_next_snapshot_request()
1018            .unwrap();
1019        assert_eq!(
1020            snapshot_request.request_kind,
1021            SnapshotRequestKind::EpochAccountsHash
1022        );
1023        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
1024
1025        // Ensure the full snapshot from slot 240 is handled 2nd
1026        // (the older full snapshots are skipped and dropped)
1027        assert_eq!(latest_full_snapshot_slot(&bank0), None);
1028        let (snapshot_request, ..) = snapshot_request_handler
1029            .get_next_snapshot_request()
1030            .unwrap();
1031        assert_eq!(
1032            snapshot_request.request_kind,
1033            SnapshotRequestKind::FullSnapshot
1034        );
1035        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
1036        set_latest_full_snapshot_slot(&bank0, 240);
1037
1038        // Ensure the incremental snapshot from slot 300 is handled 3rd
1039        // (the older incremental snapshots are skipped and dropped)
1040        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1041        let (snapshot_request, ..) = snapshot_request_handler
1042            .get_next_snapshot_request()
1043            .unwrap();
1044        assert_eq!(
1045            snapshot_request.request_kind,
1046            SnapshotRequestKind::IncrementalSnapshot
1047        );
1048        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
1049
1050        // And now ensure the snapshot request channel is empty!
1051        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1052        assert!(snapshot_request_handler
1053            .get_next_snapshot_request()
1054            .is_none());
1055
1056        // Create more banks and send snapshot requests so that the following requests will be in
1057        // the channel before handling the requests:
1058        //
1059        // fss 480 <-- handled 1st
1060        // eah 500 <-- handled 2nd
1061        // iss 510
1062        // iss 540 <-- handled 3rd
1063        //
1064        // This test differs from the one above by having an older full snapshot request that must
1065        // be handled before the new epoch accounts hash request.
1066        make_banks(240);
1067
1068        // Ensure the full snapshot is handled 1st
1069        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1070        let (snapshot_request, ..) = snapshot_request_handler
1071            .get_next_snapshot_request()
1072            .unwrap();
1073        assert_eq!(
1074            snapshot_request.request_kind,
1075            SnapshotRequestKind::FullSnapshot
1076        );
1077        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 480);
1078        set_latest_full_snapshot_slot(&bank0, 480);
1079
1080        // Ensure the EAH is handled 2nd
1081        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1082        let (snapshot_request, ..) = snapshot_request_handler
1083            .get_next_snapshot_request()
1084            .unwrap();
1085        assert_eq!(
1086            snapshot_request.request_kind,
1087            SnapshotRequestKind::EpochAccountsHash
1088        );
1089        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 500);
1090
1091        // Ensure the incremental snapshot is handled 3rd
1092        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1093        let (snapshot_request, ..) = snapshot_request_handler
1094            .get_next_snapshot_request()
1095            .unwrap();
1096        assert_eq!(
1097            snapshot_request.request_kind,
1098            SnapshotRequestKind::IncrementalSnapshot
1099        );
1100        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 540);
1101
1102        // And now ensure the snapshot request channel is empty!
1103        assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1104        assert!(snapshot_request_handler
1105            .get_next_snapshot_request()
1106            .is_none());
1107    }
1108
1109    /// Ensure that we can prune banks with the same slot (if they were on different forks)
1110    #[test]
1111    fn test_pruned_banks_request_handler_handle_request() {
1112        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
1113        let pruned_banks_request_handler = PrunedBanksRequestHandler {
1114            pruned_banks_receiver,
1115        };
1116        let genesis_config_info = create_genesis_config(10);
1117        let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
1118        bank.set_startup_verification_complete();
1119        bank.rc.accounts.accounts_db.enable_bank_drop_callback();
1120        bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
1121            pruned_banks_sender,
1122        ))));
1123
1124        let fork0_bank0 = Arc::new(bank);
1125        let fork0_bank1 = Arc::new(Bank::new_from_parent(
1126            fork0_bank0.clone(),
1127            &Pubkey::new_unique(),
1128            fork0_bank0.slot() + 1,
1129        ));
1130        let fork1_bank1 = Arc::new(Bank::new_from_parent(
1131            fork0_bank0.clone(),
1132            &Pubkey::new_unique(),
1133            fork0_bank0.slot() + 1,
1134        ));
1135        let fork2_bank1 = Arc::new(Bank::new_from_parent(
1136            fork0_bank0.clone(),
1137            &Pubkey::new_unique(),
1138            fork0_bank0.slot() + 1,
1139        ));
1140        let fork0_bank2 = Arc::new(Bank::new_from_parent(
1141            fork0_bank1.clone(),
1142            &Pubkey::new_unique(),
1143            fork0_bank1.slot() + 1,
1144        ));
1145        let fork1_bank2 = Arc::new(Bank::new_from_parent(
1146            fork1_bank1.clone(),
1147            &Pubkey::new_unique(),
1148            fork1_bank1.slot() + 1,
1149        ));
1150        let fork0_bank3 = Arc::new(Bank::new_from_parent(
1151            fork0_bank2.clone(),
1152            &Pubkey::new_unique(),
1153            fork0_bank2.slot() + 1,
1154        ));
1155        let fork3_bank3 = Arc::new(Bank::new_from_parent(
1156            fork0_bank2.clone(),
1157            &Pubkey::new_unique(),
1158            fork0_bank2.slot() + 1,
1159        ));
1160        fork0_bank3.squash();
1161
1162        drop(fork3_bank3);
1163        drop(fork1_bank2);
1164        drop(fork0_bank2);
1165        drop(fork1_bank1);
1166        drop(fork2_bank1);
1167        drop(fork0_bank1);
1168        drop(fork0_bank0);
1169        let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
1170        assert_eq!(num_banks_purged, 7);
1171    }
1172
1173    #[test]
1174    fn test_cmp_snapshot_request_kinds_by_priority() {
1175        use cmp::Ordering::{Equal, Greater, Less};
1176        for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
1177            (
1178                SnapshotRequestKind::EpochAccountsHash,
1179                SnapshotRequestKind::EpochAccountsHash,
1180                Equal,
1181            ),
1182            (
1183                SnapshotRequestKind::EpochAccountsHash,
1184                SnapshotRequestKind::FullSnapshot,
1185                Greater,
1186            ),
1187            (
1188                SnapshotRequestKind::EpochAccountsHash,
1189                SnapshotRequestKind::IncrementalSnapshot,
1190                Greater,
1191            ),
1192            (
1193                SnapshotRequestKind::FullSnapshot,
1194                SnapshotRequestKind::EpochAccountsHash,
1195                Less,
1196            ),
1197            (
1198                SnapshotRequestKind::FullSnapshot,
1199                SnapshotRequestKind::FullSnapshot,
1200                Equal,
1201            ),
1202            (
1203                SnapshotRequestKind::FullSnapshot,
1204                SnapshotRequestKind::IncrementalSnapshot,
1205                Greater,
1206            ),
1207            (
1208                SnapshotRequestKind::IncrementalSnapshot,
1209                SnapshotRequestKind::EpochAccountsHash,
1210                Less,
1211            ),
1212            (
1213                SnapshotRequestKind::IncrementalSnapshot,
1214                SnapshotRequestKind::FullSnapshot,
1215                Less,
1216            ),
1217            (
1218                SnapshotRequestKind::IncrementalSnapshot,
1219                SnapshotRequestKind::IncrementalSnapshot,
1220                Equal,
1221            ),
1222        ] {
1223            let actual_result = cmp_snapshot_request_kinds_by_priority(
1224                &snapshot_request_kind_a,
1225                &snapshot_request_kind_b,
1226            );
1227            assert_eq!(expected_result, actual_result);
1228        }
1229    }
1230}