Skip to main content

solana_runtime/
accounts_background_service.rs

1//! Service to clean up dead slots in accounts_db
2//!
3//! This can be expensive since we have to walk the append vecs being cleaned up.
4
5mod pending_snapshot_packages;
6mod stats;
7pub use pending_snapshot_packages::PendingSnapshotPackages;
8#[cfg(feature = "dev-context-only-utils")]
9use qualifier_attr::qualifiers;
10use {
11    crate::{
12        bank::{Bank, BankSlotDelta, DropCallback},
13        bank_forks::BankForks,
14        snapshot_controller::SnapshotController,
15        snapshot_package::SnapshotPackage,
16    },
17    agave_snapshots::{error::SnapshotError, SnapshotKind},
18    crossbeam_channel::{Receiver, SendError, Sender},
19    log::*,
20    rayon::iter::{IntoParallelIterator, ParallelIterator},
21    solana_clock::{BankId, Slot},
22    solana_measure::{measure::Measure, measure_us},
23    stats::StatsManager,
24    std::{
25        boxed::Box,
26        cmp,
27        fmt::{self, Debug, Formatter},
28        sync::{
29            atomic::{AtomicBool, AtomicU64, Ordering},
30            Arc, LazyLock, Mutex, RwLock,
31        },
32        thread::{self, sleep, Builder, JoinHandle},
33        time::{Duration, Instant},
34    },
35};
36
37const INTERVAL_MS: u64 = 100;
38// Set the clean interval duration to be approximately how long before the next incremental
39// snapshot request is received, plus some buffer.  The default incremental snapshot interval is
40// 100 slots, which ends up being 40 seconds plus buffer.
41const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
42const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49/// interval to report bank_drop queue events: 60s
50const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
51/// maximum drop bank signal queue length
52const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
53
54#[derive(Debug, Default)]
55struct PrunedBankQueueLenReporter {
56    last_report_time: AtomicU64,
57}
58
59impl PrunedBankQueueLenReporter {
60    fn report(&self, q_len: usize) {
61        let now = solana_time_utils::timestamp();
62        let last_report_time = self.last_report_time.load(Ordering::Acquire);
63        if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
64            && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
65        {
66            datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
67            self.last_report_time.store(now, Ordering::Release);
68        }
69    }
70}
71
72static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
73    LazyLock::new(PrunedBankQueueLenReporter::default);
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77    sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81    fn callback(&self, bank: &Bank) {
82        BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83        if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84            info!("bank DropCallback signal queue disconnected.");
85        }
86    }
87
88    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89        Box::new(self.clone())
90    }
91}
92
93impl Debug for SendDroppedBankCallback {
94    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
95        write!(f, "SendDroppedBankCallback({self:p})")
96    }
97}
98
99impl SendDroppedBankCallback {
100    pub fn new(sender: DroppedSlotsSender) -> Self {
101        Self { sender }
102    }
103}
104
105pub struct SnapshotRequest {
106    pub snapshot_root_bank: Arc<Bank>,
107    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108    pub request_kind: SnapshotRequestKind,
109
110    /// The instant this request was send to the queue.
111    /// Used to track how long requests wait before processing.
112    pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("SnapshotRequest")
118            .field("request kind", &self.request_kind)
119            .field("bank slot", &self.snapshot_root_bank.slot())
120            .field("block height", &self.snapshot_root_bank.block_height())
121            .finish_non_exhaustive()
122    }
123}
124
125/// What kind of request is this?
126#[derive(Debug, Copy, Clone, Eq, PartialEq)]
127pub enum SnapshotRequestKind {
128    FullSnapshot,
129    IncrementalSnapshot,
130}
131
132pub struct SnapshotRequestHandler {
133    pub snapshot_controller: Arc<SnapshotController>,
134    pub snapshot_request_receiver: SnapshotRequestReceiver,
135    pub pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
136}
137
138impl SnapshotRequestHandler {
139    // Returns the latest requested snapshot slot and storages
140    #[allow(clippy::type_complexity)]
141    pub fn handle_snapshot_requests(
142        &self,
143        non_snapshot_time_us: u128,
144    ) -> Option<Result<Slot, SnapshotError>> {
145        let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
146            self.get_next_snapshot_request()?;
147
148        datapoint_info!(
149            "handle_snapshot_requests",
150            ("num_outstanding_requests", num_outstanding_requests, i64),
151            ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
152            (
153                "enqueued_time_us",
154                snapshot_request.enqueued.elapsed().as_micros(),
155                i64
156            ),
157        );
158
159        let snapshot_kind = new_snapshot_kind(&snapshot_request)?;
160        Some(self.handle_snapshot_request(non_snapshot_time_us, snapshot_request, snapshot_kind))
161    }
162
163    /// Get the next snapshot request to handle
164    ///
165    /// Look through the snapshot request channel to find the highest priority one to handle next.
166    /// If there are no snapshot requests in the channel, return None.  Otherwise return the
167    /// highest priority one.  Unhandled snapshot requests with slots GREATER-THAN the handled one
168    /// will be re-enqueued.  The remaining will be dropped.
169    ///
170    /// Also return the number of snapshot requests initially in the channel, and the number of
171    /// ones re-enqueued.
172    fn get_next_snapshot_request(
173        &self,
174    ) -> Option<(
175        SnapshotRequest,
176        /*num outstanding snapshot requests*/ usize,
177        /*num re-enqueued snapshot requests*/ usize,
178    )> {
179        let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
180        let requests_len = requests.len();
181        debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
182
183        match requests_len {
184            0 => None,
185            1 => {
186                // SAFETY: We know the len is 1, so `pop` will return `Some`
187                let snapshot_request = requests.pop().unwrap();
188                Some((snapshot_request, 1, 0))
189            }
190            _ => {
191                // Get the two highest priority requests, `y` and `z`.
192                // By asking for the second-to-last element to be in its final sorted position, we
193                // also ensure that the last element is also sorted.
194                // Note, we no longer need the second-to-last element; this code can be refactored.
195                let (_, _y, z) =
196                    requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
197                assert_eq!(z.len(), 1);
198
199                // SAFETY: We know the len is > 1, so `pop` will return `Some`
200                let snapshot_request = requests.pop().unwrap();
201
202                let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
203                // re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
204                let num_re_enqueued_requests = requests
205                    .into_iter()
206                    .filter(|snapshot_request| {
207                        snapshot_request.snapshot_root_bank.slot() > handled_request_slot
208                    })
209                    .map(|snapshot_request| {
210                        self.snapshot_controller
211                            .request_sender()
212                            .try_send(snapshot_request)
213                            .expect("re-enqueue snapshot request");
214                    })
215                    .count();
216
217                Some((snapshot_request, requests_len, num_re_enqueued_requests))
218            }
219        }
220    }
221
222    fn handle_snapshot_request(
223        &self,
224        non_snapshot_time_us: u128,
225        snapshot_request: SnapshotRequest,
226        snapshot_kind: SnapshotKind,
227    ) -> Result<Slot, SnapshotError> {
228        info!("handling snapshot request: {snapshot_request:?}, {snapshot_kind:?}");
229        let mut total_time = Measure::start("snapshot_request_receiver_total_time");
230        let SnapshotRequest {
231            snapshot_root_bank,
232            status_cache_slot_deltas,
233            request_kind: _,
234            enqueued: _,
235        } = snapshot_request;
236
237        if snapshot_kind.is_full_snapshot() {
238            // The latest full snapshot slot is what accounts-db uses to properly handle
239            // zero lamport accounts.  We are handling a full snapshot request here, and
240            // since taking a snapshot is not allowed to fail, we can update accounts-db now.
241            snapshot_root_bank
242                .rc
243                .accounts
244                .accounts_db
245                .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
246        }
247
248        let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
249        // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
250        // That's because `snapshot_root_bank.slot()` must be root at this point,
251        // and contains relevant updates because each bank has at least 1 account update due
252        // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
253        snapshot_root_bank.force_flush_accounts_cache();
254        // Ensure all roots <= `self.slot()` have been flushed.
255        // Note `max_flush_root` could be larger than self.slot() if there are
256        // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
257        assert!(
258            snapshot_root_bank.slot()
259                <= snapshot_root_bank
260                    .rc
261                    .accounts
262                    .accounts_db
263                    .accounts_cache
264                    .fetch_max_flush_root()
265        );
266        flush_accounts_cache_time.stop();
267
268        let mut clean_time = Measure::start("clean_time");
269        snapshot_root_bank.clean_accounts();
270        clean_time.stop();
271
272        let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
273
274        let mut shrink_time = Measure::start("shrink_time");
275        snapshot_root_bank.shrink_candidate_slots();
276        shrink_time.stop();
277
278        // Snapshot the bank and send over a snapshot package
279        let mut snapshot_time = Measure::start("snapshot_time");
280        let snapshot_package = SnapshotPackage::new(
281            snapshot_kind,
282            &snapshot_root_bank,
283            snapshot_root_bank.get_snapshot_storages(None),
284            status_cache_slot_deltas,
285        );
286        self.pending_snapshot_packages
287            .lock()
288            .unwrap()
289            .push(snapshot_package);
290        snapshot_time.stop();
291        info!(
292            "Handled snapshot request. snapshot kind: {:?}, slot: {}, bank hash: {}",
293            snapshot_kind,
294            snapshot_root_bank.slot(),
295            snapshot_root_bank.hash(),
296        );
297
298        total_time.stop();
299
300        datapoint_info!(
301            "handle_snapshot_requests-timing",
302            (
303                "flush_accounts_cache_time",
304                flush_accounts_cache_time.as_us(),
305                i64
306            ),
307            ("shrink_time", shrink_time.as_us(), i64),
308            ("clean_time", clean_time.as_us(), i64),
309            ("snapshot_time", snapshot_time.as_us(), i64),
310            ("total_us", total_time.as_us(), i64),
311            ("non_snapshot_time_us", non_snapshot_time_us, i64),
312            ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
313        );
314        Ok(snapshot_root_bank.slot())
315    }
316
317    /// Returns the slot of the next snapshot request to be handled
318    fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
319        // We reuse `get_next_snapshot_request()` here, since it already implements all the logic
320        // for getting the highest priority request, *AND* we leverage its test coverage.
321        // Additionally, since `get_next_snapshot_request()` drops old requests, we might get to
322        // proactively clean up old banks earlier as well!
323        let (next_request, _, _) = self.get_next_snapshot_request()?;
324        let next_slot = next_request.snapshot_root_bank.slot();
325
326        // make sure to re-enqueue the request, otherwise we'd lose it!
327        self.snapshot_controller
328            .request_sender()
329            .try_send(next_request)
330            .expect("re-enqueue snapshot request");
331
332        Some(next_slot)
333    }
334}
335
336#[derive(Debug)]
337pub struct PrunedBanksRequestHandler {
338    pub pruned_banks_receiver: DroppedSlotsReceiver,
339}
340
341impl PrunedBanksRequestHandler {
342    #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
343    fn handle_request(&self, bank: &Bank) -> usize {
344        let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
345        // We need a stable sort to ensure we purge banks—with the same slot—in the same order
346        // they were sent into the channel.
347        banks_to_purge.sort_by_key(|(slot, _id)| *slot);
348        let num_banks_to_purge = banks_to_purge.len();
349
350        // Group the banks into slices with the same slot
351        let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
352
353        // Log whenever we need to handle banks with the same slot.  Purposely do this *before* we
354        // call `purge_slot()` to ensure we get the datapoint (in case there's an assert/panic).
355        let num_banks_with_same_slot =
356            num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
357        if num_banks_with_same_slot > 0 {
358            datapoint_info!(
359                "pruned_banks_request_handler",
360                ("num_pruned_banks", num_banks_to_purge, i64),
361                ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
362            );
363        }
364
365        // Purge all the slots in parallel
366        // Banks for the same slot are purged sequentially
367        let accounts_db = bank.rc.accounts.accounts_db.as_ref();
368        accounts_db.thread_pool_background.install(|| {
369            grouped_banks_to_purge.into_par_iter().for_each(|group| {
370                group.iter().for_each(|(slot, bank_id)| {
371                    accounts_db.purge_slot(*slot, *bank_id, true);
372                })
373            });
374        });
375
376        num_banks_to_purge
377    }
378
379    fn remove_dead_slots(
380        &self,
381        bank: &Bank,
382        removed_slots_count: &mut usize,
383        total_remove_slots_time: &mut u64,
384    ) {
385        let mut remove_slots_time = Measure::start("remove_slots_time");
386        *removed_slots_count += self.handle_request(bank);
387        remove_slots_time.stop();
388        *total_remove_slots_time += remove_slots_time.as_us();
389
390        if *removed_slots_count >= 100 {
391            datapoint_info!(
392                "remove_slots_timing",
393                ("remove_slots_time", *total_remove_slots_time, i64),
394                ("removed_slots_count", *removed_slots_count, i64),
395            );
396            *total_remove_slots_time = 0;
397            *removed_slots_count = 0;
398        }
399    }
400}
401
402pub struct AbsRequestHandlers {
403    pub snapshot_request_handler: SnapshotRequestHandler,
404    pub pruned_banks_request_handler: PrunedBanksRequestHandler,
405}
406
407impl AbsRequestHandlers {
408    // Returns the latest requested snapshot slot, if one exists
409    #[allow(clippy::type_complexity)]
410    pub fn handle_snapshot_requests(
411        &self,
412        non_snapshot_time_us: u128,
413    ) -> Option<Result<Slot, SnapshotError>> {
414        self.snapshot_request_handler
415            .handle_snapshot_requests(non_snapshot_time_us)
416    }
417}
418
419pub struct AccountsBackgroundService {
420    t_background: JoinHandle<()>,
421    status: AbsStatus,
422}
423
424impl AccountsBackgroundService {
425    pub fn new(
426        bank_forks: Arc<RwLock<BankForks>>,
427        exit: Arc<AtomicBool>,
428        request_handlers: AbsRequestHandlers,
429    ) -> Self {
430        let is_running = Arc::new(AtomicBool::new(true));
431        let stop = Arc::new(AtomicBool::new(false));
432        let mut last_cleaned_slot = 0;
433        let mut removed_slots_count = 0;
434        let mut total_remove_slots_time = 0;
435        let t_background = Builder::new()
436            .name("solAcctsBgSvc".to_string())
437            .spawn({
438                let is_running = is_running.clone();
439                let stop = stop.clone();
440
441                move || {
442                    info!("AccountsBackgroundService has started");
443                    let mut stats = StatsManager::new();
444                    let mut last_snapshot_end_time = None;
445                    let mut previous_clean_time = Instant::now();
446                    let mut previous_shrink_time = Instant::now();
447
448                    loop {
449                        if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
450                            break;
451                        }
452                        let start_time = Instant::now();
453
454                        // Grab the current root bank
455                        let bank = bank_forks.read().unwrap().root_bank();
456
457                        // Purge accounts of any dead slots
458                        request_handlers
459                            .pruned_banks_request_handler
460                            .remove_dead_slots(
461                                &bank,
462                                &mut removed_slots_count,
463                                &mut total_remove_slots_time,
464                            );
465
466                        let non_snapshot_time = last_snapshot_end_time
467                            .map(|last_snapshot_end_time: Instant| {
468                                last_snapshot_end_time.elapsed().as_micros()
469                            })
470                            .unwrap_or_default();
471
472                        // Check to see if there were any requests for snapshotting banks
473                        // < the current root bank `bank` above.
474                        //
475                        // Claim: Any snapshot request for slot `N` found here implies that the
476                        // last cleanup slot `M` satisfies `M < N`
477                        //
478                        // Proof: Assume for contradiction that we find a snapshot request for slot
479                        // `N` here, but cleanup has already happened on some slot `M >= N`.
480                        // Because the call to `bank.clean_accounts(true)` (in the code below)
481                        // implies we only clean slots `<= bank - 1`, then that means in some
482                        // *previous* iteration of this loop, we must have gotten a root bank for
483                        // slot some slot `R` where `R > N`, but did not see the snapshot for `N`
484                        // in the snapshot request channel.
485                        //
486                        // However, this is impossible because BankForks.set_root() will always
487                        // flush the snapshot request for `N` to the snapshot request channel
488                        // before setting a root `R > N`, and
489                        // snapshot_request_handler.handle_requests() will always look for the
490                        // latest available snapshot in the channel.
491                        let snapshot_handle_result =
492                            request_handlers.handle_snapshot_requests(non_snapshot_time);
493
494                        if let Some(snapshot_handle_result) = snapshot_handle_result {
495                            // Safe, see proof above
496
497                            last_snapshot_end_time = Some(Instant::now());
498                            match snapshot_handle_result {
499                                Ok(snapshot_slot) => {
500                                    assert!(
501                                        last_cleaned_slot <= snapshot_slot,
502                                        "last cleaned slot: {last_cleaned_slot}, snapshot request \
503                                         slot: {snapshot_slot}, enqueued snapshot requests: {:?}",
504                                        request_handlers
505                                            .snapshot_request_handler
506                                            .snapshot_request_receiver
507                                            .try_iter()
508                                            .collect::<Vec<_>>(),
509                                    );
510                                    last_cleaned_slot = snapshot_slot;
511                                    previous_clean_time = Instant::now();
512                                    previous_shrink_time = Instant::now();
513                                }
514                                Err(err) => {
515                                    error!(
516                                        "Stopping AccountsBackgroundService! Fatal error while \
517                                         handling snapshot requests: {err}",
518                                    );
519                                    exit.store(true, Ordering::Relaxed);
520                                    break;
521                                }
522                            }
523                        } else {
524                            // we didn't handle a snapshot request, so do flush/clean/shrink
525
526                            let next_snapshot_request_slot = request_handlers
527                                .snapshot_request_handler
528                                .peek_next_snapshot_request_slot();
529
530                            // We cannot clean past the next snapshot request slot because it may
531                            // have zero-lamport accounts.  See the comments in
532                            // Bank::clean_accounts() for more information.
533                            let max_clean_slot_inclusive = cmp::min(
534                                next_snapshot_request_slot.unwrap_or(Slot::MAX),
535                                bank.slot(),
536                            )
537                            .saturating_sub(1);
538
539                            let duration_since_previous_clean = previous_clean_time.elapsed();
540                            let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
541
542                            // if we're cleaning, then force flush, otherwise be lazy
543                            let force_flush = should_clean;
544                            bank.rc
545                                .accounts
546                                .accounts_db
547                                .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
548
549                            if should_clean {
550                                bank.rc.accounts.accounts_db.clean_accounts(
551                                    Some(max_clean_slot_inclusive),
552                                    false,
553                                    bank.epoch_schedule(),
554                                );
555                                last_cleaned_slot = max_clean_slot_inclusive;
556                                previous_clean_time = Instant::now();
557                            }
558
559                            let duration_since_previous_shrink = previous_shrink_time.elapsed();
560                            let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
561                            // To avoid pathological interactions between the clean and shrink
562                            // timers, call shrink for either should_shrink or should_clean.
563                            if should_shrink || should_clean {
564                                if should_clean {
565                                    // We used to only squash (aka shrink ancients) when we also
566                                    // cleaned, so keep that same behavior here for now.
567                                    bank.shrink_ancient_slots();
568                                }
569                                bank.shrink_candidate_slots();
570                                previous_shrink_time = Instant::now();
571                            }
572                        }
573                        stats.record_and_maybe_submit(start_time.elapsed());
574                        sleep(Duration::from_millis(INTERVAL_MS));
575                    }
576                    info!("AccountsBackgroundService has stopped");
577                    is_running.store(false, Ordering::Relaxed);
578                }
579            })
580            .unwrap();
581
582        Self {
583            t_background,
584            status: AbsStatus { is_running, stop },
585        }
586    }
587
588    /// Should be called immediately after bank_fork_utils::load_bank_forks(), and as such, there
589    /// should only be one bank, the root bank, in `bank_forks`
590    /// All banks added to `bank_forks` will be descended from the root bank, and thus will inherit
591    /// the bank drop callback.
592    pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
593        assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
594
595        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
596        {
597            let root_bank = bank_forks.read().unwrap().root_bank();
598
599            root_bank
600                .rc
601                .accounts
602                .accounts_db
603                .enable_bank_drop_callback();
604            root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
605                pruned_banks_sender,
606            ))));
607        }
608        pruned_banks_receiver
609    }
610
611    pub fn join(self) -> thread::Result<()> {
612        self.t_background.join()
613    }
614
615    /// Returns an object to query/manage the status of ABS
616    pub fn status(&self) -> &AbsStatus {
617        &self.status
618    }
619}
620
621/// Query and manage the status of AccountsBackgroundService
622#[derive(Debug, Clone)]
623pub struct AbsStatus {
624    /// Flag to query if ABS is running
625    is_running: Arc<AtomicBool>,
626    /// Flag to set to stop ABS
627    stop: Arc<AtomicBool>,
628}
629
630impl AbsStatus {
631    /// Returns if ABS is running
632    pub fn is_running(&self) -> bool {
633        self.is_running.load(Ordering::Relaxed)
634    }
635
636    /// Raises the flag for ABS to stop
637    pub fn stop(&self) {
638        self.stop.store(true, Ordering::Relaxed)
639    }
640
641    #[cfg(feature = "dev-context-only-utils")]
642    pub fn new_for_tests() -> Self {
643        Self {
644            is_running: Arc::new(AtomicBool::new(false)),
645            stop: Arc::new(AtomicBool::new(false)),
646        }
647    }
648}
649
650/// Get the SnapshotKind from a given SnapshotRequest
651#[must_use]
652fn new_snapshot_kind(snapshot_request: &SnapshotRequest) -> Option<SnapshotKind> {
653    match snapshot_request.request_kind {
654        SnapshotRequestKind::FullSnapshot => Some(SnapshotKind::FullSnapshot),
655        SnapshotRequestKind::IncrementalSnapshot => {
656            if let Some(latest_full_snapshot_slot) = snapshot_request
657                .snapshot_root_bank
658                .rc
659                .accounts
660                .accounts_db
661                .latest_full_snapshot_slot()
662            {
663                Some(SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot))
664            } else {
665                warn!(
666                    "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
667                     full snapshot",
668                    snapshot_request.snapshot_root_bank.slot()
669                );
670                None
671            }
672        }
673    }
674}
675
676/// Compare snapshot requests; used to pick the highest priority request to handle.
677///
678/// Priority, from highest to lowest:
679/// - Epoch Accounts Hash
680/// - Full Snapshot
681/// - Incremental Snapshot
682///
683/// If two requests of the same kind are being compared, their bank slots are the tiebreaker.
684#[must_use]
685fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
686    let slot_a = a.snapshot_root_bank.slot();
687    let slot_b = b.snapshot_root_bank.slot();
688    cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
689        .then(slot_a.cmp(&slot_b))
690}
691
692/// Compare snapshot request kinds by priority
693///
694/// Priority, from highest to lowest:
695/// - Full Snapshot
696/// - Incremental Snapshot
697#[must_use]
698fn cmp_snapshot_request_kinds_by_priority(
699    a: &SnapshotRequestKind,
700    b: &SnapshotRequestKind,
701) -> cmp::Ordering {
702    use {
703        cmp::Ordering::{Equal, Greater, Less},
704        SnapshotRequestKind as Kind,
705    };
706    match (a, b) {
707        (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
708        (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
709        (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
710        (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
711    }
712}
713
714#[cfg(test)]
715mod test {
716    use {
717        super::*,
718        crate::genesis_utils::create_genesis_config,
719        agave_snapshots::{snapshot_config::SnapshotConfig, SnapshotInterval},
720        crossbeam_channel::unbounded,
721        solana_account::AccountSharedData,
722        solana_epoch_schedule::EpochSchedule,
723        solana_pubkey::Pubkey,
724        std::num::NonZeroU64,
725    };
726
727    #[test]
728    fn test_accounts_background_service_remove_dead_slots() {
729        let genesis = create_genesis_config(10);
730        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
731        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
732        let pruned_banks_request_handler = PrunedBanksRequestHandler {
733            pruned_banks_receiver,
734        };
735
736        // Store an account in slot 0
737        let account_key = Pubkey::new_unique();
738        bank0.store_account(
739            &account_key,
740            &AccountSharedData::new(264, 0, &Pubkey::default()),
741        );
742        assert!(bank0.get_account(&account_key).is_some());
743        pruned_banks_sender.send((0, 0)).unwrap();
744
745        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
746
747        pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
748
749        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
750    }
751
752    /// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
753    ///
754    /// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
755    /// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
756    #[test]
757    fn test_get_next_snapshot_request() {
758        // These constants were picked to ensure the desired snapshot requests were sent to the
759        // channel.  Ensure there are multiple requests of each kind.
760        const SLOTS_PER_EPOCH: Slot = 400;
761        const FULL_SNAPSHOT_INTERVAL: Slot = 80;
762        const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
763
764        let snapshot_config = SnapshotConfig {
765            full_snapshot_archive_interval: SnapshotInterval::Slots(
766                NonZeroU64::new(FULL_SNAPSHOT_INTERVAL).unwrap(),
767            ),
768            incremental_snapshot_archive_interval: SnapshotInterval::Slots(
769                NonZeroU64::new(INCREMENTAL_SNAPSHOT_INTERVAL).unwrap(),
770            ),
771            ..SnapshotConfig::default()
772        };
773
774        let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
775        let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
776        let snapshot_controller = Arc::new(SnapshotController::new(
777            snapshot_request_sender.clone(),
778            snapshot_config,
779            0,
780        ));
781        let snapshot_request_handler = SnapshotRequestHandler {
782            snapshot_controller,
783            snapshot_request_receiver,
784            pending_snapshot_packages,
785        };
786
787        let send_snapshot_request = |snapshot_root_bank, request_kind| {
788            let snapshot_request = SnapshotRequest {
789                snapshot_root_bank,
790                status_cache_slot_deltas: Vec::default(),
791                request_kind,
792                enqueued: Instant::now(),
793            };
794            snapshot_request_sender.send(snapshot_request).unwrap();
795        };
796
797        let mut genesis_config_info = create_genesis_config(10);
798        genesis_config_info.genesis_config.epoch_schedule =
799            EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
800        let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
801
802        // We need to get and set accounts-db's latest full snapshot slot to test
803        // get_next_snapshot_request().  To workaround potential borrowing issues
804        // caused by make_banks() below, Arc::clone bank0 and add helper functions.
805        let bank0 = bank.clone();
806        fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
807            bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
808        }
809        fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
810            bank.rc
811                .accounts
812                .accounts_db
813                .set_latest_full_snapshot_slot(slot);
814        }
815
816        // Create new banks and send snapshot requests so that the following requests will be in
817        // the channel before handling the requests:
818        //
819        // fss  80
820        // iss  90
821        // iss 120
822        // iss 150
823        // fss 160
824        // iss 180
825        // iss 210
826        // fss 240 <-- handled 1st
827        // iss 270
828        // iss 300 <-- handled 2nd
829        //
830        // Also, incremental snapshots before slot 240 (the first full snapshot handled), will
831        // actually be skipped since the latest full snapshot slot will be `None`.
832        let mut make_banks = |num_banks| {
833            for _ in 0..num_banks {
834                let slot = bank.slot() + 1;
835                bank = Arc::new(Bank::new_from_parent(
836                    bank.clone(),
837                    &Pubkey::new_unique(),
838                    slot,
839                ));
840
841                // Since we're not using `BankForks::set_root()`, we have to handle sending the
842                // correct snapshot requests ourself.
843                if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
844                    send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
845                } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
846                    send_snapshot_request(
847                        Arc::clone(&bank),
848                        SnapshotRequestKind::IncrementalSnapshot,
849                    );
850                }
851            }
852        };
853        make_banks(303);
854
855        // Ensure the full snapshot from slot 240 is handled 1st
856        // (the older full snapshots are skipped and dropped)
857        assert_eq!(latest_full_snapshot_slot(&bank0), None);
858        let (snapshot_request, ..) = snapshot_request_handler
859            .get_next_snapshot_request()
860            .unwrap();
861        assert_eq!(
862            snapshot_request.request_kind,
863            SnapshotRequestKind::FullSnapshot
864        );
865        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
866        set_latest_full_snapshot_slot(&bank0, 240);
867
868        // Ensure the incremental snapshot from slot 300 is handled 2nd
869        // (the older incremental snapshots are skipped and dropped)
870        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
871        let (snapshot_request, ..) = snapshot_request_handler
872            .get_next_snapshot_request()
873            .unwrap();
874        assert_eq!(
875            snapshot_request.request_kind,
876            SnapshotRequestKind::IncrementalSnapshot
877        );
878        assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
879
880        // And now ensure the snapshot request channel is empty!
881        assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
882        assert!(snapshot_request_handler
883            .get_next_snapshot_request()
884            .is_none());
885    }
886
887    /// Ensure that we can prune banks with the same slot (if they were on different forks)
888    #[test]
889    fn test_pruned_banks_request_handler_handle_request() {
890        let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
891        let pruned_banks_request_handler = PrunedBanksRequestHandler {
892            pruned_banks_receiver,
893        };
894        let genesis_config_info = create_genesis_config(10);
895        let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
896        bank.rc.accounts.accounts_db.enable_bank_drop_callback();
897        bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
898            pruned_banks_sender,
899        ))));
900
901        let fork0_bank0 = Arc::new(bank);
902        let fork0_bank1 = Arc::new(Bank::new_from_parent(
903            fork0_bank0.clone(),
904            &Pubkey::new_unique(),
905            fork0_bank0.slot() + 1,
906        ));
907        let fork1_bank1 = Arc::new(Bank::new_from_parent(
908            fork0_bank0.clone(),
909            &Pubkey::new_unique(),
910            fork0_bank0.slot() + 1,
911        ));
912        let fork2_bank1 = Arc::new(Bank::new_from_parent(
913            fork0_bank0.clone(),
914            &Pubkey::new_unique(),
915            fork0_bank0.slot() + 1,
916        ));
917        let fork0_bank2 = Arc::new(Bank::new_from_parent(
918            fork0_bank1.clone(),
919            &Pubkey::new_unique(),
920            fork0_bank1.slot() + 1,
921        ));
922        let fork1_bank2 = Arc::new(Bank::new_from_parent(
923            fork1_bank1.clone(),
924            &Pubkey::new_unique(),
925            fork1_bank1.slot() + 1,
926        ));
927        let fork0_bank3 = Arc::new(Bank::new_from_parent(
928            fork0_bank2.clone(),
929            &Pubkey::new_unique(),
930            fork0_bank2.slot() + 1,
931        ));
932        let fork3_bank3 = Arc::new(Bank::new_from_parent(
933            fork0_bank2.clone(),
934            &Pubkey::new_unique(),
935            fork0_bank2.slot() + 1,
936        ));
937        fork0_bank3.squash();
938
939        drop(fork3_bank3);
940        drop(fork1_bank2);
941        drop(fork0_bank2);
942        drop(fork1_bank1);
943        drop(fork2_bank1);
944        drop(fork0_bank1);
945        drop(fork0_bank0);
946        let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
947        assert_eq!(num_banks_purged, 7);
948    }
949
950    #[test]
951    fn test_cmp_snapshot_request_kinds_by_priority() {
952        use cmp::Ordering::{Equal, Greater, Less};
953        for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
954            (
955                SnapshotRequestKind::FullSnapshot,
956                SnapshotRequestKind::FullSnapshot,
957                Equal,
958            ),
959            (
960                SnapshotRequestKind::FullSnapshot,
961                SnapshotRequestKind::IncrementalSnapshot,
962                Greater,
963            ),
964            (
965                SnapshotRequestKind::IncrementalSnapshot,
966                SnapshotRequestKind::FullSnapshot,
967                Less,
968            ),
969            (
970                SnapshotRequestKind::IncrementalSnapshot,
971                SnapshotRequestKind::IncrementalSnapshot,
972                Equal,
973            ),
974        ] {
975            let actual_result = cmp_snapshot_request_kinds_by_priority(
976                &snapshot_request_kind_a,
977                &snapshot_request_kind_b,
978            );
979            assert_eq!(expected_result, actual_result);
980        }
981    }
982}