atlas_runtime/
accounts_background_service.rs

1//! Service to clean up dead slots in accounts_db
2//!
3//! This can be expensive since we have to walk the append vecs being cleaned up.
4
5mod pending_snapshot_packages;
6mod stats;
7pub use pending_snapshot_packages::PendingSnapshotPackages;
8#[cfg(feature = "dev-context-only-utils")]
9use qualifier_attr::qualifiers;
10use {
11    crate::{
12        bank::{Bank, BankSlotDelta, DropCallback},
13        bank_forks::BankForks,
14        snapshot_controller::SnapshotController,
15        snapshot_package::{SnapshotKind, SnapshotPackage},
16        snapshot_utils::SnapshotError,
17    },
18    crossbeam_channel::{Receiver, SendError, Sender},
19    log::*,
20    rayon::iter::{IntoParallelIterator, ParallelIterator},
21    solana_clock::{BankId, Slot},
22    solana_measure::{measure::Measure, measure_us},
23    stats::StatsManager,
24    std::{
25        boxed::Box,
26        cmp,
27        fmt::{self, Debug, Formatter},
28        sync::{
29            atomic::{AtomicBool, AtomicU64, Ordering},
30            Arc, LazyLock, Mutex, RwLock,
31        },
32        thread::{self, sleep, Builder, JoinHandle},
33        time::{Duration, Instant},
34    },
35};
36
37const INTERVAL_MS: u64 = 100;
38// Set the clean interval duration to be approximately how long before the next incremental
39// snapshot request is received, plus some buffer.  The default incremental snapshot interval is
40// 100 slots, which ends up being 40 seconds plus buffer.
41const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
42const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49/// interval to report bank_drop queue events: 60s
50const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
51/// maximum drop bank signal queue length
52const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
53
54#[derive(Debug, Default)]
55struct PrunedBankQueueLenReporter {
56    last_report_time: AtomicU64,
57}
58
59impl PrunedBankQueueLenReporter {
60    fn report(&self, q_len: usize) {
61        let now = solana_time_utils::timestamp();
62        let last_report_time = self.last_report_time.load(Ordering::Acquire);
63        if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
64            && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
65        {
66            datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
67            self.last_report_time.store(now, Ordering::Release);
68        }
69    }
70}
71
72static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
73    LazyLock::new(PrunedBankQueueLenReporter::default);
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77    sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81    fn callback(&self, bank: &Bank) {
82        BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83        if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84            info!("bank DropCallback signal queue disconnected.");
85        }
86    }
87
88    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89        Box::new(self.clone())
90    }
91}
92
93impl Debug for SendDroppedBankCallback {
94    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
95        write!(f, "SendDroppedBankCallback({self:p})")
96    }
97}
98
99impl SendDroppedBankCallback {
100    pub fn new(sender: DroppedSlotsSender) -> Self {
101        Self { sender }
102    }
103}
104
105pub struct SnapshotRequest {
106    pub snapshot_root_bank: Arc<Bank>,
107    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108    pub request_kind: SnapshotRequestKind,
109
110    /// The instant this request was send to the queue.
111    /// Used to track how long requests wait before processing.
112    pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("SnapshotRequest")
118            .field("request kind", &self.request_kind)
119            .field("bank slot", &self.snapshot_root_bank.slot())
120            .field("block height", &self.snapshot_root_bank.block_height())
121            .finish_non_exhaustive()
122    }
123}
124
125/// What kind of request is this?
126#[derive(Debug, Copy, Clone, Eq, PartialEq)]
127pub enum SnapshotRequestKind {
128    FullSnapshot,
129    IncrementalSnapshot,
130}
131
132pub struct SnapshotRequestHandler {
133    pub snapshot_controller: Arc<SnapshotController>,
134    pub snapshot_request_receiver: SnapshotRequestReceiver,
135    pub pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
136}
137
138impl SnapshotRequestHandler {
139    // Returns the latest requested snapshot slot and storages
140    #[allow(clippy::type_complexity)]
141    pub fn handle_snapshot_requests(
142        &self,
143        non_snapshot_time_us: u128,
144    ) -> Option<Result<Slot, SnapshotError>> {
145        let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
146            self.get_next_snapshot_request()?;
147
148        datapoint_info!(
149            "handle_snapshot_requests",
150            ("num_outstanding_requests", num_outstanding_requests, i64),
151            ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
152            (
153                "enqueued_time_us",
154                snapshot_request.enqueued.elapsed().as_micros(),
155                i64
156            ),
157        );
158
159        let snapshot_kind = new_snapshot_kind(&snapshot_request)?;
160        Some(self.handle_snapshot_request(non_snapshot_time_us, snapshot_request, snapshot_kind))
161    }
162
163    /// Get the next snapshot request to handle
164    ///
165    /// Look through the snapshot request channel to find the highest priority one to handle next.
166    /// If there are no snapshot requests in the channel, return None.  Otherwise return the
167    /// highest priority one.  Unhandled snapshot requests with slots GREATER-THAN the handled one
168    /// will be re-enqueued.  The remaining will be dropped.
169    ///
170    /// Also return the number of snapshot requests initially in the channel, and the number of
171    /// ones re-enqueued.
172    fn get_next_snapshot_request(
173        &self,
174    ) -> Option<(
175        SnapshotRequest,
176        /*num outstanding snapshot requests*/ usize,
177        /*num re-enqueued snapshot requests*/ usize,
178    )> {
179        let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
180        let requests_len = requests.len();
181        debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
182
183        match requests_len {
184            0 => None,
185            1 => {
186                // SAFETY: We know the len is 1, so `pop` will return `Some`
187                let snapshot_request = requests.pop().unwrap();
188                Some((snapshot_request, 1, 0))
189            }
190            _ => {
191                // Get the two highest priority requests, `y` and `z`.
192                // By asking for the second-to-last element to be in its final sorted position, we
193                // also ensure that the last element is also sorted.
194                // Note, we no longer need the second-to-last element; this code can be refactored.
195                let (_, _y, z) =
196                    requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
197                assert_eq!(z.len(), 1);
198
199                // SAFETY: We know the len is > 1, so `pop` will return `Some`
200                let snapshot_request = requests.pop().unwrap();
201
202                let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
203                // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
204                let num_re_enqueued_requests = requests
205                    .into_iter()
206                    .filter(|snapshot_request| {
207                        snapshot_request.snapshot_root_bank.slot() > handled_request_slot
208                    })
209                    .map(|snapshot_request| {
210                        self.snapshot_controller
211                            .request_sender()
212                            .try_send(snapshot_request)
213                            .expect("re-enqueue snapshot request");
214                    })
215                    .count();
216
217                Some((snapshot_request, requests_len, num_re_enqueued_requests))
218            }
219        }
220    }
221
222    fn handle_snapshot_request(
223        &self,
224        non_snapshot_time_us: u128,
225        snapshot_request: SnapshotRequest,
226        snapshot_kind: SnapshotKind,
227    ) -> Result<Slot, SnapshotError> {
228        info!("handling snapshot request: {snapshot_request:?}, {snapshot_kind:?}");
229        let mut total_time = Measure::start("snapshot_request_receiver_total_time");
230        let SnapshotRequest {
231            snapshot_root_bank,
232            status_cache_slot_deltas,
233            request_kind: _,
234            enqueued: _,
235        } = snapshot_request;
236
237        // we should not rely on the state of this validator until startup verification is complete
238        assert!(snapshot_root_bank.has_initial_accounts_hash_verification_completed());
239
240        if snapshot_kind.is_full_snapshot() {
241            // The latest full snapshot slot is what accounts-db uses to properly handle
242            // zero lamport accounts.  We are handling a full snapshot request here, and
243            // since taking a snapshot is not allowed to fail, we can update accounts-db now.
244            snapshot_root_bank
245                .rc
246                .accounts
247                .accounts_db
248                .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
249        }
250
251        let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
252        // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
253        // That's because `snapshot_root_bank.slot()` must be root at this point,
254        // and contains relevant updates because each bank has at least 1 account update due
255        // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
256        snapshot_root_bank.force_flush_accounts_cache();
257        // Ensure all roots <= `self.slot()` have been flushed.
258        // Note `max_flush_root` could be larger than self.slot() if there are
259        // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
260        assert!(
261            snapshot_root_bank.slot()
262                <= snapshot_root_bank
263                    .rc
264                    .accounts
265                    .accounts_db
266                    .accounts_cache
267                    .fetch_max_flush_root()
268        );
269        flush_accounts_cache_time.stop();
270
271        let mut clean_time = Measure::start("clean_time");
272        snapshot_root_bank.clean_accounts();
273        clean_time.stop();
274
275        let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
276
277        let mut shrink_time = Measure::start("shrink_time");
278        snapshot_root_bank.shrink_candidate_slots();
279        shrink_time.stop();
280
281        // Snapshot the bank and send over a snapshot package
282        let mut snapshot_time = Measure::start("snapshot_time");
283        let snapshot_package = SnapshotPackage::new(
284            snapshot_kind,
285            &snapshot_root_bank,
286            snapshot_root_bank.get_snapshot_storages(None),
287            status_cache_slot_deltas,
288        );
289        self.pending_snapshot_packages
290            .lock()
291            .unwrap()
292            .push(snapshot_package);
293        snapshot_time.stop();
294        info!(
295            "Handled snapshot request. snapshot kind: {:?}, slot: {}, bank hash: {}",
296            snapshot_kind,
297            snapshot_root_bank.slot(),
298            snapshot_root_bank.hash(),
299        );
300
301        total_time.stop();
302
303        datapoint_info!(
304            "handle_snapshot_requests-timing",
305            (
306                "flush_accounts_cache_time",
307                flush_accounts_cache_time.as_us(),
308                i64
309            ),
310            ("shrink_time", shrink_time.as_us(), i64),
311            ("clean_time", clean_time.as_us(), i64),
312            ("snapshot_time", snapshot_time.as_us(), i64),
313            ("total_us", total_time.as_us(), i64),
314            ("non_snapshot_time_us", non_snapshot_time_us, i64),
315            ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
316        );
317        Ok(snapshot_root_bank.slot())
318    }
319
320    /// Returns the slot of the next snapshot request to be handled
321    fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
322        // We reuse `get_next_snapshot_request()` here, since it already implements all the logic
323        // for getting the highest priority request, *AND* we leverage its test coverage.
324        // Additionally, since `get_next_snapshot_request()` drops old requests, we might get to
325        // proactively clean up old banks earlier as well!
326        let (next_request, _, _) = self.get_next_snapshot_request()?;
327        let next_slot = next_request.snapshot_root_bank.slot();
328
329        // make sure to re-enqueue the request, otherwise we'd lose it!
330        self.snapshot_controller
331            .request_sender()
332            .try_send(next_request)
333            .expect("re-enqueue snapshot request");
334
335        Some(next_slot)
336    }
337}
338
339#[derive(Debug)]
340pub struct PrunedBanksRequestHandler {
341    pub pruned_banks_receiver: DroppedSlotsReceiver,
342}
343
344impl PrunedBanksRequestHandler {
345    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
346    fn handle_request(&self, bank: &Bank) -> usize {
347        let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
348        // We need a stable sort to ensure we purge banks—with the same slot—in the same order
349        // they were sent into the channel.
350        banks_to_purge.sort_by_key(|(slot, _id)| *slot);
351        let num_banks_to_purge = banks_to_purge.len();
352
353        // Group the banks into slices with the same slot
354        let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
355
356        // Log whenever we need to handle banks with the same slot.  Purposely do this *before* we
357        // call `purge_slot()` to ensure we get the datapoint (in case there's an assert/panic).
358        let num_banks_with_same_slot =
359            num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
360        if num_banks_with_same_slot > 0 {
361            datapoint_info!(
362                "pruned_banks_request_handler",
363                ("num_pruned_banks", num_banks_to_purge, i64),
364                ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
365            );
366        }
367
368        // Purge all the slots in parallel
369        // Banks for the same slot are purged sequentially
370        let accounts_db = bank.rc.accounts.accounts_db.as_ref();
371        accounts_db.thread_pool_background.install(|| {
372            grouped_banks_to_purge.into_par_iter().for_each(|group| {
373                group.iter().for_each(|(slot, bank_id)| {
374                    accounts_db.purge_slot(*slot, *bank_id, true);
375                })
376            });
377        });
378
379        num_banks_to_purge
380    }
381
382    fn remove_dead_slots(
383        &self,
384        bank: &Bank,
385        removed_slots_count: &mut usize,
386        total_remove_slots_time: &mut u64,
387    ) {
388        let mut remove_slots_time = Measure::start("remove_slots_time");
389        *removed_slots_count += self.handle_request(bank);
390        remove_slots_time.stop();
391        *total_remove_slots_time += remove_slots_time.as_us();
392
393        if *removed_slots_count >= 100 {
394            datapoint_info!(
395                "remove_slots_timing",
396                ("remove_slots_time", *total_remove_slots_time, i64),
397                ("removed_slots_count", *removed_slots_count, i64),
398            );
399            *total_remove_slots_time = 0;
400            *removed_slots_count = 0;
401        }
402    }
403}
404
405pub struct AbsRequestHandlers {
406    pub snapshot_request_handler: SnapshotRequestHandler,
407    pub pruned_banks_request_handler: PrunedBanksRequestHandler,
408}
409
410impl AbsRequestHandlers {
411    // Returns the latest requested snapshot slot, if one exists
412    #[allow(clippy::type_complexity)]
413    pub fn handle_snapshot_requests(
414        &self,
415        non_snapshot_time_us: u128,
416    ) -> Option<Result<Slot, SnapshotError>> {
417        self.snapshot_request_handler
418            .handle_snapshot_requests(non_snapshot_time_us)
419    }
420}
421
422pub struct AccountsBackgroundService {
423    t_background: JoinHandle<()>,
424    status: AbsStatus,
425}
426
427impl AccountsBackgroundService {
428    pub fn new(
429        bank_forks: Arc<RwLock<BankForks>>,
430        exit: Arc<AtomicBool>,
431        request_handlers: AbsRequestHandlers,
432    ) -> Self {
433        let is_running = Arc::new(AtomicBool::new(true));
434        let stop = Arc::new(AtomicBool::new(false));
435        let mut last_cleaned_slot = 0;
436        let mut removed_slots_count = 0;
437        let mut total_remove_slots_time = 0;
438        let t_background = Builder::new()
439            .name("solBgAccounts".to_string())
440            .spawn({
441                let is_running = is_running.clone();
442                let stop = stop.clone();
443
444                move || {
445                    info!("AccountsBackgroundService has started");
446                    let mut stats = StatsManager::new();
447                    let mut last_snapshot_end_time = None;
448                    let mut previous_clean_time = Instant::now();
449                    let mut previous_shrink_time = Instant::now();
450
451                    loop {
452                        if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
453                            break;
454                        }
455                        let start_time = Instant::now();
456
457                        // Grab the current root bank
458                        let bank = bank_forks.read().unwrap().root_bank();
459
460                        // Purge accounts of any dead slots
461                        request_handlers
462                            .pruned_banks_request_handler
463                            .remove_dead_slots(
464                                &bank,
465                                &mut removed_slots_count,
466                                &mut total_remove_slots_time,
467                            );
468
469                        let non_snapshot_time = last_snapshot_end_time
470                            .map(|last_snapshot_end_time: Instant| {
471                                last_snapshot_end_time.elapsed().as_micros()
472                            })
473                            .unwrap_or_default();
474
475                        // Check to see if there were any requests for snapshotting banks
476                        // < the current root bank `bank` above.
477                        //
478                        // Claim: Any snapshot request for slot `N` found here implies that the
479                        // last cleanup slot `M` satisfies `M < N`
480                        //
481                        // Proof: Assume for contradiction that we find a snapshot request for slot
482                        // `N` here, but cleanup has already happened on some slot `M >= N`.
483                        // Because the call to `bank.clean_accounts(true)` (in the code below)
484                        // implies we only clean slots `<= bank - 1`, then that means in some
485                        // *previous* iteration of this loop, we must have gotten a root bank for
486                        // slot some slot `R` where `R > N`, but did not see the snapshot for `N`
487                        // in the snapshot request channel.
488                        //
489                        // However, this is impossible because BankForks.set_root() will always
490                        // flush the snapshot request for `N` to the snapshot request channel
491                        // before setting a root `R > N`, and
492                        // snapshot_request_handler.handle_requests() will always look for the
493                        // latest available snapshot in the channel.
494                        //
495                        // NOTE: We must wait for startup verification to complete before handling
496                        // snapshot requests.  This is because startup verification and snapshot
497                        // request handling can both kick off accounts hash calculations in
498                        // background threads, and these must not happen concurrently.
499                        let snapshot_handle_result = bank
500                            .has_initial_accounts_hash_verification_completed()
501                            .then(|| request_handlers.handle_snapshot_requests(non_snapshot_time))
502                            .flatten();
503
504                        if let Some(snapshot_handle_result) = snapshot_handle_result {
505                            // Safe, see proof above
506
507                            last_snapshot_end_time = Some(Instant::now());
508                            match snapshot_handle_result {
509                                Ok(snapshot_slot) => {
510                                    assert!(
511                                        last_cleaned_slot <= snapshot_slot,
512                                        "last cleaned slot: {last_cleaned_slot}, snapshot request \
513                                         slot: {snapshot_slot}, is startup verification complete: \
514                                         {}, enqueued snapshot requests: {:?}",
515                                        bank.has_initial_accounts_hash_verification_completed(),
516                                        request_handlers
517                                            .snapshot_request_handler
518                                            .snapshot_request_receiver
519                                            .try_iter()
520                                            .collect::<Vec<_>>(),
521                                    );
522                                    last_cleaned_slot = snapshot_slot;
523                                    previous_clean_time = Instant::now();
524                                    previous_shrink_time = Instant::now();
525                                }
526                                Err(err) => {
527                                    error!(
528                                        "Stopping AccountsBackgroundService! Fatal error while \
529                                         handling snapshot requests: {err}",
530                                    );
531                                    exit.store(true, Ordering::Relaxed);
532                                    break;
533                                }
534                            }
535                        } else {
536                            // we didn't handle a snapshot request, so do flush/clean/shrink
537
538                            let next_snapshot_request_slot = request_handlers
539                                .snapshot_request_handler
540                                .peek_next_snapshot_request_slot();
541
542                            // We cannot clean past the next snapshot request slot because it may
543                            // have zero-lamport accounts.  See the comments in
544                            // Bank::clean_accounts() for more information.
545                            let max_clean_slot_inclusive = cmp::min(
546                                next_snapshot_request_slot.unwrap_or(Slot::MAX),
547                                bank.slot(),
548                            )
549                            .saturating_sub(1);
550
551                            let duration_since_previous_clean = previous_clean_time.elapsed();
552                            let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
553
554                            // if we're cleaning, then force flush, otherwise be lazy
555                            let force_flush = should_clean;
556                            bank.rc
557                                .accounts
558                                .accounts_db
559                                .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
560
561                            if should_clean {
562                                bank.rc.accounts.accounts_db.clean_accounts(
563                                    Some(max_clean_slot_inclusive),
564                                    false,
565                                    bank.epoch_schedule(),
566                                );
567                                last_cleaned_slot = max_clean_slot_inclusive;
568                                previous_clean_time = Instant::now();
569                            }
570
571                            let duration_since_previous_shrink = previous_shrink_time.elapsed();
572                            let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
573                            // To avoid pathological interactions between the clean and shrink
574                            // timers, call shrink for either should_shrink or should_clean.
575                            if should_shrink || should_clean {
576                                if should_clean {
577                                    // We used to only squash (aka shrink ancients) when we also
578                                    // cleaned, so keep that same behavior here for now.
579                                    bank.shrink_ancient_slots();
580                                }
581                                bank.shrink_candidate_slots();
582                                previous_shrink_time = Instant::now();
583                            }
584                        }
585                        stats.record_and_maybe_submit(start_time.elapsed());
586                        sleep(Duration::from_millis(INTERVAL_MS));
587                    }
588                    info!("AccountsBackgroundService has stopped");
589                    is_running.store(false, Ordering::Relaxed);
590                }
591            })
592            .unwrap();
593
594        Self {
595            t_background,
596            status: AbsStatus { is_running, stop },
597        }
598    }
599
600    /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
601    /// should only be one bank, the root bank, in `bank_forks`
602    /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
603    /// the bank drop callback.
604    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
605        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
606
607        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
608        {
609            let root_bank = bank_forks.read().unwrap().root_bank();
610
611            root_bank
612                .rc
613                .accounts
614                .accounts_db
615                .enable_bank_drop_callback();
616            root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
617                pruned_banks_sender,
618            ))));
619        }
620        pruned_banks_receiver
621    }
622
623    pub fn join(self) -> thread::Result<()> {
624        self.t_background.join()
625    }
626
627    /// Returns an object to query/manage the status of ABS
628    pub fn status(&self) -> &AbsStatus {
629        &self.status
630    }
631}
632
633/// Query and manage the status of AccountsBackgroundService
634#[derive(Debug, Clone)]
635pub struct AbsStatus {
636    /// Flag to query if ABS is running
637    is_running: Arc<AtomicBool>,
638    /// Flag to set to stop ABS
639    stop: Arc<AtomicBool>,
640}
641
642impl AbsStatus {
643    /// Returns if ABS is running
644    pub fn is_running(&self) -> bool {
645        self.is_running.load(Ordering::Relaxed)
646    }
647
648    /// Raises the flag for ABS to stop
649    pub fn stop(&self) {
650        self.stop.store(true, Ordering::Relaxed)
651    }
652
653    #[cfg(feature = "dev-context-only-utils")]
654    pub fn new_for_tests() -> Self {
655        Self {
656            is_running: Arc::new(AtomicBool::new(false)),
657            stop: Arc::new(AtomicBool::new(false)),
658        }
659    }
660}
661
662/// Get the SnapshotKind from a given SnapshotRequest
663#[must_use]
664fn new_snapshot_kind(snapshot_request: &SnapshotRequest) -> Option<SnapshotKind> {
665    match snapshot_request.request_kind {
666        SnapshotRequestKind::FullSnapshot => Some(SnapshotKind::FullSnapshot),
667        SnapshotRequestKind::IncrementalSnapshot => {
668            if let Some(latest_full_snapshot_slot) = snapshot_request
669                .snapshot_root_bank
670                .rc
671                .accounts
672                .accounts_db
673                .latest_full_snapshot_slot()
674            {
675                Some(SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot))
676            } else {
677                warn!(
678                    "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
679                     full snapshot",
680                    snapshot_request.snapshot_root_bank.slot()
681                );
682                None
683            }
684        }
685    }
686}
687
688/// Compare snapshot requests; used to pick the highest priority request to handle.
689///
690/// Priority, from highest to lowest:
691/// - Epoch Accounts Hash
692/// - Full Snapshot
693/// - Incremental Snapshot
694///
695/// If two requests of the same kind are being compared, their bank slots are the tiebreaker.
696#[must_use]
697fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
698    let slot_a = a.snapshot_root_bank.slot();
699    let slot_b = b.snapshot_root_bank.slot();
700    cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
701        .then(slot_a.cmp(&slot_b))
702}
703
704/// Compare snapshot request kinds by priority
705///
706/// Priority, from highest to lowest:
707/// - Full Snapshot
708/// - Incremental Snapshot
709#[must_use]
710fn cmp_snapshot_request_kinds_by_priority(
711    a: &SnapshotRequestKind,
712    b: &SnapshotRequestKind,
713) -> cmp::Ordering {
714    use {
715        cmp::Ordering::{Equal, Greater, Less},
716        SnapshotRequestKind as Kind,
717    };
718    match (a, b) {
719        (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
720        (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
721        (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
722        (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
723    }
724}
725
726#[cfg(test)]
727mod test {
728    use {
729        super::*,
730        crate::{
731            genesis_utils::create_genesis_config, snapshot_config::SnapshotConfig,
732            snapshot_utils::SnapshotInterval,
733        },
734        crossbeam_channel::unbounded,
735        solana_account::AccountSharedData,
736        solana_epoch_schedule::EpochSchedule,
737        solana_pubkey::Pubkey,
738        std::num::NonZeroU64,
739    };
740
741    #[test]
742    fn test_accounts_background_service_remove_dead_slots() {
743        let genesis = create_genesis_config(10);
744        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
745        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
746        let pruned_banks_request_handler = PrunedBanksRequestHandler {
747            pruned_banks_receiver,
748        };
749
750        // Store an account in slot 0
751        let account_key = Pubkey::new_unique();
752        bank0.store_account(
753            &account_key,
754            &AccountSharedData::new(264, 0, &Pubkey::default()),
755        );
756        assert!(bank0.get_account(&account_key).is_some());
757        pruned_banks_sender.send((0, 0)).unwrap();
758
759        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
760
761        pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
762
763        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
764    }
765
766    /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
767    ///
768    /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
769    /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
770    #[test]
771    fn test_get_next_snapshot_request() {
772        // These constants were picked to ensure the desired snapshot requests were sent to the
773        // channel.  Ensure there are multiple requests of each kind.
774        const SLOTS_PER_EPOCH: Slot = 400;
775        const FULL_SNAPSHOT_INTERVAL: Slot = 80;
776        const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
777
778        let snapshot_config = SnapshotConfig {
779            full_snapshot_archive_interval: SnapshotInterval::Slots(
780                NonZeroU64::new(FULL_SNAPSHOT_INTERVAL).unwrap(),
781            ),
782            incremental_snapshot_archive_interval: SnapshotInterval::Slots(
783                NonZeroU64::new(INCREMENTAL_SNAPSHOT_INTERVAL).unwrap(),
784            ),
785            ..SnapshotConfig::default()
786        };
787
788        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
789        let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
790        let snapshot_controller = Arc::new(SnapshotController::new(
791            snapshot_request_sender.clone(),
792            snapshot_config,
793            0,
794        ));
795        let snapshot_request_handler = SnapshotRequestHandler {
796            snapshot_controller,
797            snapshot_request_receiver,
798            pending_snapshot_packages,
799        };
800
801        let send_snapshot_request = |snapshot_root_bank, request_kind| {
802            let snapshot_request = SnapshotRequest {
803                snapshot_root_bank,
804                status_cache_slot_deltas: Vec::default(),
805                request_kind,
806                enqueued: Instant::now(),
807            };
808            snapshot_request_sender.send(snapshot_request).unwrap();
809        };
810
811        let mut genesis_config_info = create_genesis_config(10);
812        genesis_config_info.genesis_config.epoch_schedule =
813            EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
814        let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
815        bank.set_initial_accounts_hash_verification_completed();
816
817        // We need to get and set accounts-db's latest full snapshot slot to test
818        // get_next_snapshot_request().  To workaround potential borrowing issues
819        // caused by make_banks() below, Arc::clone bank0 and add helper functions.
820        let bank0 = bank.clone();
821        fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
822            bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
823        }
824        fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
825            bank.rc
826                .accounts
827                .accounts_db
828                .set_latest_full_snapshot_slot(slot);
829        }
830
831        // Create new banks and send snapshot requests so that the following requests will be in
832        // the channel before handling the requests:
833        //
834        // fss  80
835        // iss  90
836        // iss 120
837        // iss 150
838        // fss 160
839        // iss 180
840        // iss 210
841        // fss 240 <-- handled 1st
842        // iss 270
843        // iss 300 <-- handled 2nd
844        //
845        // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
846        // actually be skipped since the latest full snapshot slot will be `None`.
847        let mut make_banks = |num_banks| {
848            for _ in 0..num_banks {
849                let slot = bank.slot() + 1;
850                bank = Arc::new(Bank::new_from_parent(
851                    bank.clone(),
852                    &Pubkey::new_unique(),
853                    slot,
854                ));
855
856                // Since we're not using `BankForks::set_root()`, we have to handle sending the
857                // correct snapshot requests ourself.
858                if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
859                    send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
860                } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
861                    send_snapshot_request(
862                        Arc::clone(&bank),
863                        SnapshotRequestKind::IncrementalSnapshot,
864                    );
865                }
866            }
867        };
868        make_banks(303);
869
870        // Ensure the full snapshot from slot 240 is handled 1st
871        // (the older full snapshots are skipped and dropped)
872        assert_eq!(latest_full_snapshot_slot(&bank0), None);
873        let (snapshot_request, ..) = snapshot_request_handler
874            .get_next_snapshot_request()
875            .unwrap();
876        assert_eq!(
877            snapshot_request.request_kind,
878            SnapshotRequestKind::FullSnapshot
879        );
880        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
881        set_latest_full_snapshot_slot(&bank0, 240);
882
883        // Ensure the incremental snapshot from slot 300 is handled 2nd
884        // (the older incremental snapshots are skipped and dropped)
885        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
886        let (snapshot_request, ..) = snapshot_request_handler
887            .get_next_snapshot_request()
888            .unwrap();
889        assert_eq!(
890            snapshot_request.request_kind,
891            SnapshotRequestKind::IncrementalSnapshot
892        );
893        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
894
895        // And now ensure the snapshot request channel is empty!
896        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
897        assert!(snapshot_request_handler
898            .get_next_snapshot_request()
899            .is_none());
900    }
901
902    /// Ensure that we can prune banks with the same slot (if they were on different forks)
903    #[test]
904    fn test_pruned_banks_request_handler_handle_request() {
905        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
906        let pruned_banks_request_handler = PrunedBanksRequestHandler {
907            pruned_banks_receiver,
908        };
909        let genesis_config_info = create_genesis_config(10);
910        let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
911        bank.set_initial_accounts_hash_verification_completed();
912        bank.rc.accounts.accounts_db.enable_bank_drop_callback();
913        bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
914            pruned_banks_sender,
915        ))));
916
917        let fork0_bank0 = Arc::new(bank);
918        let fork0_bank1 = Arc::new(Bank::new_from_parent(
919            fork0_bank0.clone(),
920            &Pubkey::new_unique(),
921            fork0_bank0.slot() + 1,
922        ));
923        let fork1_bank1 = Arc::new(Bank::new_from_parent(
924            fork0_bank0.clone(),
925            &Pubkey::new_unique(),
926            fork0_bank0.slot() + 1,
927        ));
928        let fork2_bank1 = Arc::new(Bank::new_from_parent(
929            fork0_bank0.clone(),
930            &Pubkey::new_unique(),
931            fork0_bank0.slot() + 1,
932        ));
933        let fork0_bank2 = Arc::new(Bank::new_from_parent(
934            fork0_bank1.clone(),
935            &Pubkey::new_unique(),
936            fork0_bank1.slot() + 1,
937        ));
938        let fork1_bank2 = Arc::new(Bank::new_from_parent(
939            fork1_bank1.clone(),
940            &Pubkey::new_unique(),
941            fork1_bank1.slot() + 1,
942        ));
943        let fork0_bank3 = Arc::new(Bank::new_from_parent(
944            fork0_bank2.clone(),
945            &Pubkey::new_unique(),
946            fork0_bank2.slot() + 1,
947        ));
948        let fork3_bank3 = Arc::new(Bank::new_from_parent(
949            fork0_bank2.clone(),
950            &Pubkey::new_unique(),
951            fork0_bank2.slot() + 1,
952        ));
953        fork0_bank3.squash();
954
955        drop(fork3_bank3);
956        drop(fork1_bank2);
957        drop(fork0_bank2);
958        drop(fork1_bank1);
959        drop(fork2_bank1);
960        drop(fork0_bank1);
961        drop(fork0_bank0);
962        let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
963        assert_eq!(num_banks_purged, 7);
964    }
965
966    #[test]
967    fn test_cmp_snapshot_request_kinds_by_priority() {
968        use cmp::Ordering::{Equal, Greater, Less};
969        for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
970            (
971                SnapshotRequestKind::FullSnapshot,
972                SnapshotRequestKind::FullSnapshot,
973                Equal,
974            ),
975            (
976                SnapshotRequestKind::FullSnapshot,
977                SnapshotRequestKind::IncrementalSnapshot,
978                Greater,
979            ),
980            (
981                SnapshotRequestKind::IncrementalSnapshot,
982                SnapshotRequestKind::FullSnapshot,
983                Less,
984            ),
985            (
986                SnapshotRequestKind::IncrementalSnapshot,
987                SnapshotRequestKind::IncrementalSnapshot,
988                Equal,
989            ),
990        ] {
991            let actual_result = cmp_snapshot_request_kinds_by_priority(
992                &snapshot_request_kind_a,
993                &snapshot_request_kind_b,
994            );
995            assert_eq!(expected_result, actual_result);
996        }
997    }
998}