1mod stats;
6#[cfg(feature = "dev-context-only-utils")]
7use qualifier_attr::qualifiers;
8use {
9 crate::{
10 bank::{Bank, BankSlotDelta, DropCallback},
11 bank_forks::BankForks,
12 snapshot_bank_utils,
13 snapshot_controller::SnapshotController,
14 snapshot_package::{AccountsPackage, AccountsPackageKind, SnapshotKind},
15 snapshot_utils::SnapshotError,
16 },
17 crossbeam_channel::{Receiver, SendError, Sender},
18 log::*,
19 rayon::iter::{IntoParallelIterator, ParallelIterator},
20 solana_accounts_db::{
21 accounts_db::CalcAccountsHashDataSource, accounts_hash::CalcAccountsHashConfig,
22 },
23 solana_clock::{BankId, Slot},
24 solana_measure::{measure::Measure, measure_us},
25 stats::StatsManager,
26 std::{
27 boxed::Box,
28 cmp,
29 fmt::{self, Debug, Formatter},
30 sync::{
31 atomic::{AtomicBool, AtomicU64, Ordering},
32 Arc, LazyLock, RwLock,
33 },
34 thread::{self, sleep, Builder, JoinHandle},
35 time::{Duration, Instant},
36 },
37};
38
39const INTERVAL_MS: u64 = 100;
40const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
44const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
45
46pub type SnapshotRequestSender = Sender<SnapshotRequest>;
47pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
48pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
49pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
50
51const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
53const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
55
56#[derive(Debug, Default)]
57struct PrunedBankQueueLenReporter {
58 last_report_time: AtomicU64,
59}
60
61impl PrunedBankQueueLenReporter {
62 fn report(&self, q_len: usize) {
63 let now = solana_time_utils::timestamp();
64 let last_report_time = self.last_report_time.load(Ordering::Acquire);
65 if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
66 && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
67 {
68 datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
69 self.last_report_time.store(now, Ordering::Release);
70 }
71 }
72}
73
74static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
75 LazyLock::new(PrunedBankQueueLenReporter::default);
76
77#[derive(Clone)]
78pub struct SendDroppedBankCallback {
79 sender: DroppedSlotsSender,
80}
81
82impl DropCallback for SendDroppedBankCallback {
83 fn callback(&self, bank: &Bank) {
84 BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
85 if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
86 info!("bank DropCallback signal queue disconnected.");
87 }
88 }
89
90 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
91 Box::new(self.clone())
92 }
93}
94
95impl Debug for SendDroppedBankCallback {
96 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97 write!(f, "SendDroppedBankCallback({self:p})")
98 }
99}
100
101impl SendDroppedBankCallback {
102 pub fn new(sender: DroppedSlotsSender) -> Self {
103 Self { sender }
104 }
105}
106
107pub struct SnapshotRequest {
108 pub snapshot_root_bank: Arc<Bank>,
109 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
110 pub request_kind: SnapshotRequestKind,
111
112 pub enqueued: Instant,
115}
116
117impl Debug for SnapshotRequest {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("SnapshotRequest")
120 .field("request kind", &self.request_kind)
121 .field("bank slot", &self.snapshot_root_bank.slot())
122 .field("block height", &self.snapshot_root_bank.block_height())
123 .finish_non_exhaustive()
124 }
125}
126
127#[derive(Debug, Copy, Clone, Eq, PartialEq)]
133pub enum SnapshotRequestKind {
134 FullSnapshot,
135 IncrementalSnapshot,
136 EpochAccountsHash,
137}
138
139pub struct SnapshotRequestHandler {
140 pub snapshot_controller: Arc<SnapshotController>,
141 pub snapshot_request_receiver: SnapshotRequestReceiver,
142 pub accounts_package_sender: Sender<AccountsPackage>,
143}
144
145impl SnapshotRequestHandler {
146 #[allow(clippy::type_complexity)]
148 pub fn handle_snapshot_requests(
149 &self,
150 test_hash_calculation: bool,
151 non_snapshot_time_us: u128,
152 exit: &AtomicBool,
153 ) -> Option<Result<Slot, SnapshotError>> {
154 let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
155 self.get_next_snapshot_request()?;
156
157 datapoint_info!(
158 "handle_snapshot_requests",
159 ("num_outstanding_requests", num_outstanding_requests, i64),
160 ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
161 (
162 "enqueued_time_us",
163 snapshot_request.enqueued.elapsed().as_micros(),
164 i64
165 ),
166 );
167
168 let accounts_package_kind = new_accounts_package_kind(&snapshot_request)?;
169 Some(self.handle_snapshot_request(
170 test_hash_calculation,
171 non_snapshot_time_us,
172 snapshot_request,
173 accounts_package_kind,
174 exit,
175 ))
176 }
177
178 fn get_next_snapshot_request(
188 &self,
189 ) -> Option<(
190 SnapshotRequest,
191 usize,
192 usize,
193 )> {
194 let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
195 let requests_len = requests.len();
196 debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
197
198 match requests_len {
201 0 => None,
202 1 => {
203 let snapshot_request = requests.pop().unwrap();
205 Some((snapshot_request, 1, 0))
206 }
207 _ => {
208 let num_eah_requests = requests
209 .iter()
210 .filter(|request| {
211 request.request_kind == SnapshotRequestKind::EpochAccountsHash
212 })
213 .count();
214 assert!(
215 num_eah_requests <= 1,
216 "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
217 );
218
219 let (_, y, z) =
223 requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
224 assert_eq!(z.len(), 1);
225 let z = z.first().unwrap();
226 let y: &_ = y; let snapshot_request = if z.request_kind == SnapshotRequestKind::EpochAccountsHash
238 && y.request_kind == SnapshotRequestKind::FullSnapshot
239 && y.snapshot_root_bank.slot() < z.snapshot_root_bank.slot()
240 {
241 let z = requests.pop().unwrap();
243 let y = requests.pop().unwrap();
244 requests.push(z);
245 y
246 } else {
247 requests.pop().unwrap()
249 };
250
251 let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
252 let num_re_enqueued_requests = requests
254 .into_iter()
255 .filter(|snapshot_request| {
256 snapshot_request.snapshot_root_bank.slot() > handled_request_slot
257 })
258 .map(|snapshot_request| {
259 self.snapshot_controller
260 .request_sender()
261 .try_send(snapshot_request)
262 .expect("re-enqueue snapshot request");
263 })
264 .count();
265
266 Some((snapshot_request, requests_len, num_re_enqueued_requests))
267 }
268 }
269 }
270
271 fn handle_snapshot_request(
272 &self,
273 test_hash_calculation: bool,
274 non_snapshot_time_us: u128,
275 snapshot_request: SnapshotRequest,
276 accounts_package_kind: AccountsPackageKind,
277 exit: &AtomicBool,
278 ) -> Result<Slot, SnapshotError> {
279 info!("handling snapshot request: {snapshot_request:?}, {accounts_package_kind:?}");
280 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
281 let SnapshotRequest {
282 snapshot_root_bank,
283 status_cache_slot_deltas,
284 request_kind,
285 enqueued: _,
286 } = snapshot_request;
287
288 assert!(snapshot_root_bank.is_startup_verification_complete());
290
291 if accounts_package_kind == AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot) {
292 snapshot_root_bank
296 .rc
297 .accounts
298 .accounts_db
299 .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
300 }
301
302 let previous_accounts_hash = test_hash_calculation.then(|| {
303 snapshot_root_bank
307 .update_accounts_hash(CalcAccountsHashDataSource::IndexForTests, false)
308 });
309
310 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
311 snapshot_root_bank.force_flush_accounts_cache();
316 assert!(
320 snapshot_root_bank.slot()
321 <= snapshot_root_bank
322 .rc
323 .accounts
324 .accounts_db
325 .accounts_cache
326 .fetch_max_flush_root()
327 );
328 flush_accounts_cache_time.stop();
329
330 let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
331 let (this_accounts_hash, capitalization) = snapshot_root_bank
332 .accounts()
333 .accounts_db
334 .calculate_accounts_hash_from(
335 CalcAccountsHashDataSource::Storages,
336 snapshot_root_bank.slot(),
337 &CalcAccountsHashConfig {
338 use_bg_thread_pool: true,
339 ancestors: None,
340 epoch_schedule: snapshot_root_bank.epoch_schedule(),
341 rent_collector: snapshot_root_bank.rent_collector(),
342 store_detailed_debug_info_on_failure: false,
343 },
344 );
345 assert_eq!(previous_accounts_hash, this_accounts_hash);
346 assert_eq!(capitalization, snapshot_root_bank.capitalization());
347 this_accounts_hash
348 });
349
350 let mut clean_time = Measure::start("clean_time");
351 snapshot_root_bank.clean_accounts();
352 clean_time.stop();
353
354 let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
355
356 let mut shrink_time = Measure::start("shrink_time");
357 snapshot_root_bank.shrink_candidate_slots();
358 shrink_time.stop();
359
360 let mut snapshot_time = Measure::start("snapshot_time");
362 let snapshot_storages = snapshot_bank_utils::get_snapshot_storages(&snapshot_root_bank);
363 let accounts_package = match request_kind {
364 SnapshotRequestKind::FullSnapshot | SnapshotRequestKind::IncrementalSnapshot => {
365 match &accounts_package_kind {
366 AccountsPackageKind::Snapshot(_) => AccountsPackage::new_for_snapshot(
367 accounts_package_kind,
368 &snapshot_root_bank,
369 snapshot_storages,
370 status_cache_slot_deltas,
371 accounts_hash_for_testing,
372 ),
373 AccountsPackageKind::EpochAccountsHash => panic!(
374 "Illegal account package type: EpochAccountsHash packages must \
375 be from an EpochAccountsHash request!"
376 ),
377 }
378 }
379 SnapshotRequestKind::EpochAccountsHash => AccountsPackage::new_for_epoch_accounts_hash(
380 accounts_package_kind,
381 &snapshot_root_bank,
382 snapshot_storages,
383 accounts_hash_for_testing,
384 ),
385 };
386 let send_result = self.accounts_package_sender.send(accounts_package);
387 if let Err(err) = send_result {
388 let accounts_package = &err.0;
390 assert!(
391 exit.load(Ordering::Relaxed),
392 "Failed to send accounts package: {err}, {accounts_package:?}"
393 );
394 }
395 snapshot_time.stop();
396 info!(
397 "Handled snapshot request. accounts package kind: {:?}, slot: {}, bank hash: {}",
398 accounts_package_kind,
399 snapshot_root_bank.slot(),
400 snapshot_root_bank.hash(),
401 );
402
403 total_time.stop();
404
405 datapoint_info!(
406 "handle_snapshot_requests-timing",
407 (
408 "flush_accounts_cache_time",
409 flush_accounts_cache_time.as_us(),
410 i64
411 ),
412 ("shrink_time", shrink_time.as_us(), i64),
413 ("clean_time", clean_time.as_us(), i64),
414 ("snapshot_time", snapshot_time.as_us(), i64),
415 ("total_us", total_time.as_us(), i64),
416 ("non_snapshot_time_us", non_snapshot_time_us, i64),
417 ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
418 );
419 Ok(snapshot_root_bank.slot())
420 }
421
422 fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
424 let (next_request, _, _) = self.get_next_snapshot_request()?;
429 let next_slot = next_request.snapshot_root_bank.slot();
430
431 self.snapshot_controller
433 .request_sender()
434 .try_send(next_request)
435 .expect("re-enqueue snapshot request");
436
437 Some(next_slot)
438 }
439}
440
441#[derive(Debug)]
442pub struct PrunedBanksRequestHandler {
443 pub pruned_banks_receiver: DroppedSlotsReceiver,
444}
445
446impl PrunedBanksRequestHandler {
447 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
448 fn handle_request(&self, bank: &Bank) -> usize {
449 let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
450 banks_to_purge.sort_by_key(|(slot, _id)| *slot);
453 let num_banks_to_purge = banks_to_purge.len();
454
455 let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
457
458 let num_banks_with_same_slot =
461 num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
462 if num_banks_with_same_slot > 0 {
463 datapoint_info!(
464 "pruned_banks_request_handler",
465 ("num_pruned_banks", num_banks_to_purge, i64),
466 ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
467 );
468 }
469
470 let accounts_db = bank.rc.accounts.accounts_db.as_ref();
473 accounts_db.thread_pool_clean.install(|| {
474 grouped_banks_to_purge.into_par_iter().for_each(|group| {
475 group.iter().for_each(|(slot, bank_id)| {
476 accounts_db.purge_slot(*slot, *bank_id, true);
477 })
478 });
479 });
480
481 num_banks_to_purge
482 }
483
484 fn remove_dead_slots(
485 &self,
486 bank: &Bank,
487 removed_slots_count: &mut usize,
488 total_remove_slots_time: &mut u64,
489 ) {
490 let mut remove_slots_time = Measure::start("remove_slots_time");
491 *removed_slots_count += self.handle_request(bank);
492 remove_slots_time.stop();
493 *total_remove_slots_time += remove_slots_time.as_us();
494
495 if *removed_slots_count >= 100 {
496 datapoint_info!(
497 "remove_slots_timing",
498 ("remove_slots_time", *total_remove_slots_time, i64),
499 ("removed_slots_count", *removed_slots_count, i64),
500 );
501 *total_remove_slots_time = 0;
502 *removed_slots_count = 0;
503 }
504 }
505}
506
507pub struct AbsRequestHandlers {
508 pub snapshot_request_handler: SnapshotRequestHandler,
509 pub pruned_banks_request_handler: PrunedBanksRequestHandler,
510}
511
512impl AbsRequestHandlers {
513 #[allow(clippy::type_complexity)]
515 pub fn handle_snapshot_requests(
516 &self,
517 test_hash_calculation: bool,
518 non_snapshot_time_us: u128,
519 exit: &AtomicBool,
520 ) -> Option<Result<Slot, SnapshotError>> {
521 self.snapshot_request_handler.handle_snapshot_requests(
522 test_hash_calculation,
523 non_snapshot_time_us,
524 exit,
525 )
526 }
527}
528
529pub struct AccountsBackgroundService {
530 t_background: JoinHandle<()>,
531 status: AbsStatus,
532}
533
534impl AccountsBackgroundService {
535 pub fn new(
536 bank_forks: Arc<RwLock<BankForks>>,
537 exit: Arc<AtomicBool>,
538 request_handlers: AbsRequestHandlers,
539 test_hash_calculation: bool,
540 ) -> Self {
541 let is_running = Arc::new(AtomicBool::new(true));
542 let stop = Arc::new(AtomicBool::new(false));
543 let mut last_cleaned_slot = 0;
544 let mut removed_slots_count = 0;
545 let mut total_remove_slots_time = 0;
546 let t_background = Builder::new()
547 .name("solBgAccounts".to_string())
548 .spawn({
549 let is_running = is_running.clone();
550 let stop = stop.clone();
551
552 move || {
553 info!("AccountsBackgroundService has started");
554 let mut stats = StatsManager::new();
555 let mut last_snapshot_end_time = None;
556 let mut previous_clean_time = Instant::now();
557 let mut previous_shrink_time = Instant::now();
558
559 loop {
560 if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
561 break;
562 }
563 let start_time = Instant::now();
564
565 let bank = bank_forks.read().unwrap().root_bank();
567
568 request_handlers
570 .pruned_banks_request_handler
571 .remove_dead_slots(
572 &bank,
573 &mut removed_slots_count,
574 &mut total_remove_slots_time,
575 );
576
577 let non_snapshot_time = last_snapshot_end_time
578 .map(|last_snapshot_end_time: Instant| {
579 last_snapshot_end_time.elapsed().as_micros()
580 })
581 .unwrap_or_default();
582
583 let snapshot_handle_result = bank
608 .is_startup_verification_complete()
609 .then(|| {
610 request_handlers.handle_snapshot_requests(
611 test_hash_calculation,
612 non_snapshot_time,
613 &exit,
614 )
615 })
616 .flatten();
617
618 if let Some(snapshot_handle_result) = snapshot_handle_result {
619 last_snapshot_end_time = Some(Instant::now());
622 match snapshot_handle_result {
623 Ok(snapshot_slot) => {
624 assert!(
625 last_cleaned_slot <= snapshot_slot,
626 "last cleaned slot: {last_cleaned_slot}, \
627 snapshot request slot: {snapshot_slot}, \
628 is startup verification complete: {}, \
629 enqueued snapshot requests: {:?}",
630 bank.is_startup_verification_complete(),
631 request_handlers
632 .snapshot_request_handler
633 .snapshot_request_receiver
634 .try_iter()
635 .collect::<Vec<_>>(),
636 );
637 last_cleaned_slot = snapshot_slot;
638 previous_clean_time = Instant::now();
639 previous_shrink_time = Instant::now();
640 }
641 Err(err) => {
642 error!(
643 "Stopping AccountsBackgroundService! \
644 Fatal error while handling snapshot requests: {err}",
645 );
646 exit.store(true, Ordering::Relaxed);
647 break;
648 }
649 }
650 } else {
651 let next_snapshot_request_slot = request_handlers
654 .snapshot_request_handler
655 .peek_next_snapshot_request_slot();
656
657 let max_clean_slot_inclusive = cmp::min(
661 next_snapshot_request_slot.unwrap_or(Slot::MAX),
662 bank.slot(),
663 )
664 .saturating_sub(1);
665
666 let duration_since_previous_clean = previous_clean_time.elapsed();
667 let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
668
669 let force_flush = should_clean;
671 bank.rc
672 .accounts
673 .accounts_db
674 .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
675
676 if should_clean {
677 bank.rc.accounts.accounts_db.clean_accounts(
678 Some(max_clean_slot_inclusive),
679 false,
680 bank.epoch_schedule(),
681 bank.clean_accounts_old_storages_policy(),
682 );
683 last_cleaned_slot = max_clean_slot_inclusive;
684 previous_clean_time = Instant::now();
685 }
686
687 let duration_since_previous_shrink = previous_shrink_time.elapsed();
688 let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
689 if should_shrink || should_clean {
692 if should_clean {
693 bank.shrink_ancient_slots();
696 }
697 bank.shrink_candidate_slots();
698 previous_shrink_time = Instant::now();
699 }
700 }
701 stats.record_and_maybe_submit(start_time.elapsed());
702 sleep(Duration::from_millis(INTERVAL_MS));
703 }
704 info!("AccountsBackgroundService has stopped");
705 is_running.store(false, Ordering::Relaxed);
706 }
707 })
708 .unwrap();
709
710 Self {
711 t_background,
712 status: AbsStatus { is_running, stop },
713 }
714 }
715
716 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
721 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
722
723 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
724 {
725 let root_bank = bank_forks.read().unwrap().root_bank();
726
727 root_bank
728 .rc
729 .accounts
730 .accounts_db
731 .enable_bank_drop_callback();
732 root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
733 pruned_banks_sender,
734 ))));
735 }
736 pruned_banks_receiver
737 }
738
739 pub fn join(self) -> thread::Result<()> {
740 self.t_background.join()
741 }
742
743 pub fn status(&self) -> &AbsStatus {
745 &self.status
746 }
747}
748
749#[derive(Debug, Clone)]
751pub struct AbsStatus {
752 is_running: Arc<AtomicBool>,
754 stop: Arc<AtomicBool>,
756}
757
758impl AbsStatus {
759 pub fn is_running(&self) -> bool {
761 self.is_running.load(Ordering::Relaxed)
762 }
763
764 pub fn stop(&self) {
766 self.stop.store(true, Ordering::Relaxed)
767 }
768
769 #[cfg(feature = "dev-context-only-utils")]
770 pub fn new_for_tests() -> Self {
771 Self {
772 is_running: Arc::new(AtomicBool::new(false)),
773 stop: Arc::new(AtomicBool::new(false)),
774 }
775 }
776}
777
778#[must_use]
780fn new_accounts_package_kind(snapshot_request: &SnapshotRequest) -> Option<AccountsPackageKind> {
781 match snapshot_request.request_kind {
782 SnapshotRequestKind::EpochAccountsHash => Some(AccountsPackageKind::EpochAccountsHash),
783 SnapshotRequestKind::FullSnapshot => {
784 Some(AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot))
785 }
786 SnapshotRequestKind::IncrementalSnapshot => {
787 if let Some(latest_full_snapshot_slot) = snapshot_request
788 .snapshot_root_bank
789 .rc
790 .accounts
791 .accounts_db
792 .latest_full_snapshot_slot()
793 {
794 Some(AccountsPackageKind::Snapshot(
795 SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot),
796 ))
797 } else {
798 warn!(
799 "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
800 full snapshot",
801 snapshot_request.snapshot_root_bank.slot()
802 );
803 None
804 }
805 }
806 }
807}
808
809#[must_use]
818fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
819 let slot_a = a.snapshot_root_bank.slot();
820 let slot_b = b.snapshot_root_bank.slot();
821 cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
822 .then(slot_a.cmp(&slot_b))
823}
824
825#[must_use]
832fn cmp_snapshot_request_kinds_by_priority(
833 a: &SnapshotRequestKind,
834 b: &SnapshotRequestKind,
835) -> cmp::Ordering {
836 use {
837 cmp::Ordering::{Equal, Greater, Less},
838 SnapshotRequestKind as Kind,
839 };
840 match (a, b) {
841 (Kind::EpochAccountsHash, Kind::EpochAccountsHash) => Equal,
843 (Kind::EpochAccountsHash, _) => Greater,
844 (_, Kind::EpochAccountsHash) => Less,
845
846 (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
848 (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
849 (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
850 (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
851 }
852}
853
854#[cfg(test)]
855mod test {
856 use {
857 super::*,
858 crate::{
859 bank::epoch_accounts_hash_utils, genesis_utils::create_genesis_config,
860 snapshot_config::SnapshotConfig,
861 },
862 crossbeam_channel::unbounded,
863 solana_account::AccountSharedData,
864 solana_accounts_db::epoch_accounts_hash::EpochAccountsHash,
865 solana_epoch_schedule::EpochSchedule,
866 solana_hash::Hash,
867 solana_pubkey::Pubkey,
868 };
869
870 #[test]
871 fn test_accounts_background_service_remove_dead_slots() {
872 let genesis = create_genesis_config(10);
873 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
874 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
875 let pruned_banks_request_handler = PrunedBanksRequestHandler {
876 pruned_banks_receiver,
877 };
878
879 let account_key = Pubkey::new_unique();
881 bank0.store_account(
882 &account_key,
883 &AccountSharedData::new(264, 0, &Pubkey::default()),
884 );
885 assert!(bank0.get_account(&account_key).is_some());
886 pruned_banks_sender.send((0, 0)).unwrap();
887
888 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
889
890 pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
891
892 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
893 }
894
895 #[test]
904 fn test_get_next_snapshot_request() {
905 const SLOTS_PER_EPOCH: Slot = 400;
910 const FULL_SNAPSHOT_INTERVAL: Slot = 80;
911 const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
912
913 let snapshot_config = SnapshotConfig {
914 full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
915 incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
916 ..SnapshotConfig::default()
917 };
918
919 let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
920 let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
921 let snapshot_controller = Arc::new(SnapshotController::new(
922 snapshot_request_sender.clone(),
923 snapshot_config,
924 0,
925 ));
926 let snapshot_request_handler = SnapshotRequestHandler {
927 snapshot_controller,
928 snapshot_request_receiver,
929 accounts_package_sender,
930 };
931
932 let send_snapshot_request = |snapshot_root_bank, request_kind| {
933 let snapshot_request = SnapshotRequest {
934 snapshot_root_bank,
935 status_cache_slot_deltas: Vec::default(),
936 request_kind,
937 enqueued: Instant::now(),
938 };
939 snapshot_request_sender.send(snapshot_request).unwrap();
940 };
941
942 let mut genesis_config_info = create_genesis_config(10);
943 genesis_config_info.genesis_config.epoch_schedule =
944 EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
945 let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
946 bank.set_startup_verification_complete();
947 bank.rc
950 .accounts
951 .accounts_db
952 .epoch_accounts_hash_manager
953 .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
954
955 let bank0 = bank.clone();
959 fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
960 bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
961 }
962 fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
963 bank.rc
964 .accounts
965 .accounts_db
966 .set_latest_full_snapshot_slot(slot);
967 }
968
969 let mut make_banks = |num_banks| {
987 for _ in 0..num_banks {
988 let slot = bank.slot() + 1;
989 bank = Arc::new(Bank::new_from_parent(
990 bank.clone(),
991 &Pubkey::new_unique(),
992 slot,
993 ));
994
995 if bank.slot() == epoch_accounts_hash_utils::calculation_start(&bank) {
998 send_snapshot_request(
999 Arc::clone(&bank),
1000 SnapshotRequestKind::EpochAccountsHash,
1001 );
1002 } else if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
1003 send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
1004 } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
1005 send_snapshot_request(
1006 Arc::clone(&bank),
1007 SnapshotRequestKind::IncrementalSnapshot,
1008 );
1009 }
1010 }
1011 };
1012 make_banks(303);
1013
1014 assert_eq!(latest_full_snapshot_slot(&bank0), None);
1016 let (snapshot_request, ..) = snapshot_request_handler
1017 .get_next_snapshot_request()
1018 .unwrap();
1019 assert_eq!(
1020 snapshot_request.request_kind,
1021 SnapshotRequestKind::EpochAccountsHash
1022 );
1023 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
1024
1025 assert_eq!(latest_full_snapshot_slot(&bank0), None);
1028 let (snapshot_request, ..) = snapshot_request_handler
1029 .get_next_snapshot_request()
1030 .unwrap();
1031 assert_eq!(
1032 snapshot_request.request_kind,
1033 SnapshotRequestKind::FullSnapshot
1034 );
1035 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
1036 set_latest_full_snapshot_slot(&bank0, 240);
1037
1038 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1041 let (snapshot_request, ..) = snapshot_request_handler
1042 .get_next_snapshot_request()
1043 .unwrap();
1044 assert_eq!(
1045 snapshot_request.request_kind,
1046 SnapshotRequestKind::IncrementalSnapshot
1047 );
1048 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
1049
1050 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1052 assert!(snapshot_request_handler
1053 .get_next_snapshot_request()
1054 .is_none());
1055
1056 make_banks(240);
1067
1068 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
1070 let (snapshot_request, ..) = snapshot_request_handler
1071 .get_next_snapshot_request()
1072 .unwrap();
1073 assert_eq!(
1074 snapshot_request.request_kind,
1075 SnapshotRequestKind::FullSnapshot
1076 );
1077 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 480);
1078 set_latest_full_snapshot_slot(&bank0, 480);
1079
1080 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1082 let (snapshot_request, ..) = snapshot_request_handler
1083 .get_next_snapshot_request()
1084 .unwrap();
1085 assert_eq!(
1086 snapshot_request.request_kind,
1087 SnapshotRequestKind::EpochAccountsHash
1088 );
1089 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 500);
1090
1091 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1093 let (snapshot_request, ..) = snapshot_request_handler
1094 .get_next_snapshot_request()
1095 .unwrap();
1096 assert_eq!(
1097 snapshot_request.request_kind,
1098 SnapshotRequestKind::IncrementalSnapshot
1099 );
1100 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 540);
1101
1102 assert_eq!(latest_full_snapshot_slot(&bank0), Some(480));
1104 assert!(snapshot_request_handler
1105 .get_next_snapshot_request()
1106 .is_none());
1107 }
1108
1109 #[test]
1111 fn test_pruned_banks_request_handler_handle_request() {
1112 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
1113 let pruned_banks_request_handler = PrunedBanksRequestHandler {
1114 pruned_banks_receiver,
1115 };
1116 let genesis_config_info = create_genesis_config(10);
1117 let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
1118 bank.set_startup_verification_complete();
1119 bank.rc.accounts.accounts_db.enable_bank_drop_callback();
1120 bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
1121 pruned_banks_sender,
1122 ))));
1123
1124 let fork0_bank0 = Arc::new(bank);
1125 let fork0_bank1 = Arc::new(Bank::new_from_parent(
1126 fork0_bank0.clone(),
1127 &Pubkey::new_unique(),
1128 fork0_bank0.slot() + 1,
1129 ));
1130 let fork1_bank1 = Arc::new(Bank::new_from_parent(
1131 fork0_bank0.clone(),
1132 &Pubkey::new_unique(),
1133 fork0_bank0.slot() + 1,
1134 ));
1135 let fork2_bank1 = Arc::new(Bank::new_from_parent(
1136 fork0_bank0.clone(),
1137 &Pubkey::new_unique(),
1138 fork0_bank0.slot() + 1,
1139 ));
1140 let fork0_bank2 = Arc::new(Bank::new_from_parent(
1141 fork0_bank1.clone(),
1142 &Pubkey::new_unique(),
1143 fork0_bank1.slot() + 1,
1144 ));
1145 let fork1_bank2 = Arc::new(Bank::new_from_parent(
1146 fork1_bank1.clone(),
1147 &Pubkey::new_unique(),
1148 fork1_bank1.slot() + 1,
1149 ));
1150 let fork0_bank3 = Arc::new(Bank::new_from_parent(
1151 fork0_bank2.clone(),
1152 &Pubkey::new_unique(),
1153 fork0_bank2.slot() + 1,
1154 ));
1155 let fork3_bank3 = Arc::new(Bank::new_from_parent(
1156 fork0_bank2.clone(),
1157 &Pubkey::new_unique(),
1158 fork0_bank2.slot() + 1,
1159 ));
1160 fork0_bank3.squash();
1161
1162 drop(fork3_bank3);
1163 drop(fork1_bank2);
1164 drop(fork0_bank2);
1165 drop(fork1_bank1);
1166 drop(fork2_bank1);
1167 drop(fork0_bank1);
1168 drop(fork0_bank0);
1169 let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
1170 assert_eq!(num_banks_purged, 7);
1171 }
1172
1173 #[test]
1174 fn test_cmp_snapshot_request_kinds_by_priority() {
1175 use cmp::Ordering::{Equal, Greater, Less};
1176 for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
1177 (
1178 SnapshotRequestKind::EpochAccountsHash,
1179 SnapshotRequestKind::EpochAccountsHash,
1180 Equal,
1181 ),
1182 (
1183 SnapshotRequestKind::EpochAccountsHash,
1184 SnapshotRequestKind::FullSnapshot,
1185 Greater,
1186 ),
1187 (
1188 SnapshotRequestKind::EpochAccountsHash,
1189 SnapshotRequestKind::IncrementalSnapshot,
1190 Greater,
1191 ),
1192 (
1193 SnapshotRequestKind::FullSnapshot,
1194 SnapshotRequestKind::EpochAccountsHash,
1195 Less,
1196 ),
1197 (
1198 SnapshotRequestKind::FullSnapshot,
1199 SnapshotRequestKind::FullSnapshot,
1200 Equal,
1201 ),
1202 (
1203 SnapshotRequestKind::FullSnapshot,
1204 SnapshotRequestKind::IncrementalSnapshot,
1205 Greater,
1206 ),
1207 (
1208 SnapshotRequestKind::IncrementalSnapshot,
1209 SnapshotRequestKind::EpochAccountsHash,
1210 Less,
1211 ),
1212 (
1213 SnapshotRequestKind::IncrementalSnapshot,
1214 SnapshotRequestKind::FullSnapshot,
1215 Less,
1216 ),
1217 (
1218 SnapshotRequestKind::IncrementalSnapshot,
1219 SnapshotRequestKind::IncrementalSnapshot,
1220 Equal,
1221 ),
1222 ] {
1223 let actual_result = cmp_snapshot_request_kinds_by_priority(
1224 &snapshot_request_kind_a,
1225 &snapshot_request_kind_b,
1226 );
1227 assert_eq!(expected_result, actual_result);
1228 }
1229 }
1230}