1mod stats;
6use {
7 crate::{
8 accounts_db::{AccountStorageEntry, CalcAccountsHashDataSource},
9 accounts_hash::CalcAccountsHashConfig,
10 bank::{Bank, BankSlotDelta, DropCallback},
11 bank_forks::BankForks,
12 snapshot_config::SnapshotConfig,
13 snapshot_package::{self, AccountsPackage, AccountsPackageType, SnapshotType},
14 snapshot_utils::{self, SnapshotError},
15 },
16 crossbeam_channel::{Receiver, SendError, Sender},
17 log::*,
18 rand::{thread_rng, Rng},
19 snapshot_utils::MAX_BANK_SNAPSHOTS_TO_RETAIN,
20 solana_measure::measure::Measure,
21 solana_sdk::clock::{BankId, Slot},
22 stats::StatsManager,
23 std::{
24 boxed::Box,
25 fmt::{Debug, Formatter},
26 sync::{
27 atomic::{AtomicBool, AtomicU64, Ordering},
28 Arc, RwLock,
29 },
30 thread::{self, sleep, Builder, JoinHandle},
31 time::{Duration, Instant},
32 },
33};
34
35const INTERVAL_MS: u64 = 100;
36const CLEAN_INTERVAL_BLOCKS: u64 = 100;
37
38const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
44
45pub type SnapshotRequestSender = Sender<SnapshotRequest>;
46pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
47pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
48pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
49
50const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
52const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
54
55#[derive(Debug, Default)]
56struct PrunedBankQueueLenReporter {
57 last_report_time: AtomicU64,
58}
59
60impl PrunedBankQueueLenReporter {
61 fn report(&self, q_len: usize) {
62 let now = solana_sdk::timing::timestamp();
63 let last_report_time = self.last_report_time.load(Ordering::Acquire);
64 if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
65 && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
66 {
67 datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
68 self.last_report_time.store(now, Ordering::Release);
69 }
70 }
71}
72
73lazy_static! {
74 static ref BANK_DROP_QUEUE_REPORTER: PrunedBankQueueLenReporter =
75 PrunedBankQueueLenReporter::default();
76}
77
78#[derive(Clone)]
79pub struct SendDroppedBankCallback {
80 sender: DroppedSlotsSender,
81}
82
83impl DropCallback for SendDroppedBankCallback {
84 fn callback(&self, bank: &Bank) {
85 BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
86 if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
87 info!("bank DropCallback signal queue disconnected.");
88 }
89 }
90
91 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
92 Box::new(self.clone())
93 }
94}
95
96impl Debug for SendDroppedBankCallback {
97 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
98 write!(f, "SendDroppedBankCallback({self:p})")
99 }
100}
101
102impl SendDroppedBankCallback {
103 pub fn new(sender: DroppedSlotsSender) -> Self {
104 Self { sender }
105 }
106}
107
108pub struct SnapshotRequest {
109 pub snapshot_root_bank: Arc<Bank>,
110 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
111 pub request_type: SnapshotRequestType,
112
113 pub enqueued: Instant,
116}
117
118impl Debug for SnapshotRequest {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct("SnapshotRequest")
121 .field("request type", &self.request_type)
122 .field("bank slot", &self.snapshot_root_bank.slot())
123 .finish()
124 }
125}
126
127#[derive(Debug, Copy, Clone, Eq, PartialEq)]
133pub enum SnapshotRequestType {
134 Snapshot,
135 EpochAccountsHash,
136}
137
138pub struct SnapshotRequestHandler {
139 pub snapshot_config: SnapshotConfig,
140 pub snapshot_request_sender: SnapshotRequestSender,
141 pub snapshot_request_receiver: SnapshotRequestReceiver,
142 pub accounts_package_sender: Sender<AccountsPackage>,
143}
144
145impl SnapshotRequestHandler {
146 #[allow(clippy::type_complexity)]
148 pub fn handle_snapshot_requests(
149 &self,
150 test_hash_calculation: bool,
151 non_snapshot_time_us: u128,
152 last_full_snapshot_slot: &mut Option<Slot>,
153 ) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
154 let (
155 snapshot_request,
156 accounts_package_type,
157 num_outstanding_requests,
158 num_re_enqueued_requests,
159 ) = self.get_next_snapshot_request(*last_full_snapshot_slot)?;
160
161 datapoint_info!(
162 "handle_snapshot_requests",
163 (
164 "num-outstanding-requests",
165 num_outstanding_requests as i64,
166 i64
167 ),
168 (
169 "num-re-enqueued-requests",
170 num_re_enqueued_requests as i64,
171 i64
172 ),
173 (
174 "enqueued-time-us",
175 snapshot_request.enqueued.elapsed().as_micros() as i64,
176 i64
177 ),
178 );
179
180 Some(self.handle_snapshot_request(
181 test_hash_calculation,
182 non_snapshot_time_us,
183 last_full_snapshot_slot,
184 snapshot_request,
185 accounts_package_type,
186 ))
187 }
188
189 fn get_next_snapshot_request(
199 &self,
200 last_full_snapshot_slot: Option<Slot>,
201 ) -> Option<(
202 SnapshotRequest,
203 AccountsPackageType,
204 usize,
205 usize,
206 )> {
207 let mut requests: Vec<_> = self
208 .snapshot_request_receiver
209 .try_iter()
210 .map(|request| {
211 let accounts_package_type = new_accounts_package_type(
212 &request,
213 &self.snapshot_config,
214 last_full_snapshot_slot,
215 );
216 (request, accounts_package_type)
217 })
218 .collect();
219 if requests.is_empty() {
221 return None;
222 }
223 let requests_len = requests.len();
224 debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
225 let num_eah_requests = requests
226 .iter()
227 .filter(|(_, account_package_type)| {
228 *account_package_type == AccountsPackageType::EpochAccountsHash
229 })
230 .count();
231 assert!(
232 num_eah_requests <= 1,
233 "Only a single EAH request is allowed at a time! count: {num_eah_requests}"
234 );
235
236 requests.select_nth_unstable_by(requests_len - 1, cmp_requests_by_priority);
238 let (snapshot_request, accounts_package_type) = requests.pop().unwrap();
241 let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
242 let num_re_enqueued_requests = requests
244 .into_iter()
245 .filter(|(snapshot_request, _)| {
246 snapshot_request.snapshot_root_bank.slot() > handled_request_slot
247 })
248 .map(|(snapshot_request, _)| {
249 self.snapshot_request_sender
250 .try_send(snapshot_request)
251 .expect("re-enqueue snapshot request")
252 })
253 .count();
254
255 Some((
256 snapshot_request,
257 accounts_package_type,
258 requests_len,
259 num_re_enqueued_requests,
260 ))
261 }
262
263 fn handle_snapshot_request(
264 &self,
265 test_hash_calculation: bool,
266 non_snapshot_time_us: u128,
267 last_full_snapshot_slot: &mut Option<Slot>,
268 snapshot_request: SnapshotRequest,
269 accounts_package_type: AccountsPackageType,
270 ) -> Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError> {
271 debug!(
272 "handling snapshot request: {:?}, {:?}",
273 snapshot_request, accounts_package_type
274 );
275 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
276 let SnapshotRequest {
277 snapshot_root_bank,
278 status_cache_slot_deltas,
279 request_type,
280 enqueued: _,
281 } = snapshot_request;
282
283 assert!(snapshot_root_bank.is_startup_verification_complete());
285
286 if accounts_package_type == AccountsPackageType::Snapshot(SnapshotType::FullSnapshot) {
287 *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
288 }
289
290 let previous_accounts_hash = test_hash_calculation.then(|| {
291 snapshot_root_bank.update_accounts_hash(
294 CalcAccountsHashDataSource::IndexForTests,
295 false,
296 false,
297 )
298 });
299
300 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
301 snapshot_root_bank.force_flush_accounts_cache();
306 assert!(
310 snapshot_root_bank.slot()
311 <= snapshot_root_bank
312 .rc
313 .accounts
314 .accounts_db
315 .accounts_cache
316 .fetch_max_flush_root()
317 );
318 flush_accounts_cache_time.stop();
319
320 let accounts_hash_for_testing = previous_accounts_hash.map(|previous_accounts_hash| {
321 let check_hash = false;
322
323 let (this_accounts_hash, capitalization) = snapshot_root_bank
324 .accounts()
325 .accounts_db
326 .calculate_accounts_hash(
327 CalcAccountsHashDataSource::Storages,
328 snapshot_root_bank.slot(),
329 &CalcAccountsHashConfig {
330 use_bg_thread_pool: true,
331 check_hash,
332 ancestors: None,
333 epoch_schedule: snapshot_root_bank.epoch_schedule(),
334 rent_collector: snapshot_root_bank.rent_collector(),
335 store_detailed_debug_info_on_failure: false,
336 },
337 )
338 .unwrap();
339 assert_eq!(previous_accounts_hash, this_accounts_hash);
340 assert_eq!(capitalization, snapshot_root_bank.capitalization());
341 this_accounts_hash
342 });
343
344 let mut clean_time = Measure::start("clean_time");
345 snapshot_root_bank.clean_accounts(*last_full_snapshot_slot);
346 clean_time.stop();
347
348 let mut shrink_time = Measure::start("shrink_time");
349 snapshot_root_bank.shrink_candidate_slots();
350 shrink_time.stop();
351
352 let mut snapshot_time = Measure::start("snapshot_time");
354 let snapshot_storages = snapshot_utils::get_snapshot_storages(&snapshot_root_bank);
355 let accounts_package = match request_type {
356 SnapshotRequestType::Snapshot => {
357 let bank_snapshot_info = snapshot_utils::add_bank_snapshot(
358 &self.snapshot_config.bank_snapshots_dir,
359 &snapshot_root_bank,
360 &snapshot_storages,
361 self.snapshot_config.snapshot_version,
362 status_cache_slot_deltas,
363 )
364 .expect("snapshot bank");
365 AccountsPackage::new_for_snapshot(
366 accounts_package_type,
367 &snapshot_root_bank,
368 &bank_snapshot_info,
369 &self.snapshot_config.bank_snapshots_dir,
370 &self.snapshot_config.full_snapshot_archives_dir,
371 &self.snapshot_config.incremental_snapshot_archives_dir,
372 snapshot_storages.clone(),
373 self.snapshot_config.archive_format,
374 self.snapshot_config.snapshot_version,
375 accounts_hash_for_testing,
376 )
377 .expect("new accounts package for snapshot")
378 }
379 SnapshotRequestType::EpochAccountsHash => {
380 AccountsPackage::new_for_epoch_accounts_hash(
382 accounts_package_type,
383 &snapshot_root_bank,
384 snapshot_storages.clone(),
385 accounts_hash_for_testing,
386 )
387 }
388 };
389 self.accounts_package_sender
390 .send(accounts_package)
391 .expect("send accounts package");
392 snapshot_time.stop();
393 info!(
394 "Took bank snapshot. accounts package type: {:?}, slot: {}, bank hash: {}",
395 accounts_package_type,
396 snapshot_root_bank.slot(),
397 snapshot_root_bank.hash(),
398 );
399
400 let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
402 snapshot_utils::purge_old_bank_snapshots(
403 &self.snapshot_config.bank_snapshots_dir,
404 MAX_BANK_SNAPSHOTS_TO_RETAIN,
405 );
406 purge_old_snapshots_time.stop();
407 total_time.stop();
408
409 datapoint_info!(
410 "handle_snapshot_requests-timing",
411 (
412 "flush_accounts_cache_time",
413 flush_accounts_cache_time.as_us(),
414 i64
415 ),
416 ("shrink_time", shrink_time.as_us(), i64),
417 ("clean_time", clean_time.as_us(), i64),
418 ("snapshot_time", snapshot_time.as_us(), i64),
419 (
420 "purge_old_snapshots_time",
421 purge_old_snapshots_time.as_us(),
422 i64
423 ),
424 ("total_us", total_time.as_us(), i64),
425 ("non_snapshot_time_us", non_snapshot_time_us, i64),
426 );
427 Ok((snapshot_root_bank.block_height(), snapshot_storages))
428 }
429}
430
431#[derive(Default, Clone)]
432pub struct AbsRequestSender {
433 snapshot_request_sender: Option<SnapshotRequestSender>,
434}
435
436impl AbsRequestSender {
437 pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
438 Self {
439 snapshot_request_sender: Some(snapshot_request_sender),
440 }
441 }
442
443 pub fn is_snapshot_creation_enabled(&self) -> bool {
444 self.snapshot_request_sender.is_some()
445 }
446
447 pub fn send_snapshot_request(
448 &self,
449 snapshot_request: SnapshotRequest,
450 ) -> Result<(), SendError<SnapshotRequest>> {
451 if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
452 snapshot_request_sender.send(snapshot_request)
453 } else {
454 Ok(())
455 }
456 }
457}
458
459#[derive(Debug)]
460pub struct PrunedBanksRequestHandler {
461 pub pruned_banks_receiver: DroppedSlotsReceiver,
462}
463
464impl PrunedBanksRequestHandler {
465 pub fn handle_request(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
466 let mut count = 0;
467 for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
468 count += 1;
469 bank.rc.accounts.accounts_db.purge_slot(
470 pruned_slot,
471 pruned_bank_id,
472 is_serialized_with_abs,
473 );
474 }
475
476 count
477 }
478
479 fn remove_dead_slots(
480 &self,
481 bank: &Bank,
482 removed_slots_count: &mut usize,
483 total_remove_slots_time: &mut u64,
484 ) {
485 let mut remove_slots_time = Measure::start("remove_slots_time");
486 *removed_slots_count += self.handle_request(bank, true);
487 remove_slots_time.stop();
488 *total_remove_slots_time += remove_slots_time.as_us();
489
490 if *removed_slots_count >= 100 {
491 datapoint_info!(
492 "remove_slots_timing",
493 ("remove_slots_time", *total_remove_slots_time, i64),
494 ("removed_slots_count", *removed_slots_count, i64),
495 );
496 *total_remove_slots_time = 0;
497 *removed_slots_count = 0;
498 }
499 }
500}
501
502pub struct AbsRequestHandlers {
503 pub snapshot_request_handler: SnapshotRequestHandler,
504 pub pruned_banks_request_handler: PrunedBanksRequestHandler,
505}
506
507impl AbsRequestHandlers {
508 #[allow(clippy::type_complexity)]
510 pub fn handle_snapshot_requests(
511 &self,
512 test_hash_calculation: bool,
513 non_snapshot_time_us: u128,
514 last_full_snapshot_slot: &mut Option<Slot>,
515 ) -> Option<Result<(u64, Vec<Arc<AccountStorageEntry>>), SnapshotError>> {
516 self.snapshot_request_handler.handle_snapshot_requests(
517 test_hash_calculation,
518 non_snapshot_time_us,
519 last_full_snapshot_slot,
520 )
521 }
522}
523
524pub struct AccountsBackgroundService {
525 t_background: JoinHandle<()>,
526}
527
528impl AccountsBackgroundService {
529 pub fn new(
530 bank_forks: Arc<RwLock<BankForks>>,
531 exit: &Arc<AtomicBool>,
532 request_handlers: AbsRequestHandlers,
533 test_hash_calculation: bool,
534 mut last_full_snapshot_slot: Option<Slot>,
535 ) -> Self {
536 info!("AccountsBackgroundService active");
537 let exit = exit.clone();
538 let mut last_cleaned_block_height = 0;
539 let mut removed_slots_count = 0;
540 let mut total_remove_slots_time = 0;
541 let mut last_expiration_check_time = Instant::now();
542 let t_background = Builder::new()
543 .name("solBgAccounts".to_string())
544 .spawn(move || {
545 let mut stats = StatsManager::new();
546 let mut last_snapshot_end_time = None;
547
548 let mut last_snapshot_storages: Option<Vec<Arc<AccountStorageEntry>>> = None;
552 loop {
553 if exit.load(Ordering::Relaxed) {
554 break;
555 }
556 let start_time = Instant::now();
557
558 let bank = bank_forks.read().unwrap().root_bank().clone();
560
561 request_handlers
563 .pruned_banks_request_handler
564 .remove_dead_slots(
565 &bank,
566 &mut removed_slots_count,
567 &mut total_remove_slots_time,
568 );
569
570 Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
571
572 let non_snapshot_time = last_snapshot_end_time
573 .map(|last_snapshot_end_time: Instant| {
574 last_snapshot_end_time.elapsed().as_micros()
575 })
576 .unwrap_or_default();
577
578 let snapshot_handle_result = bank
601 .is_startup_verification_complete()
602 .then(|| {
603 request_handlers.handle_snapshot_requests(
604 test_hash_calculation,
605 non_snapshot_time,
606 &mut last_full_snapshot_slot,
607 )
608 })
609 .flatten();
610 if snapshot_handle_result.is_some() {
611 last_snapshot_end_time = Some(Instant::now());
612 }
613
614 bank.flush_accounts_cache_if_needed();
619
620 if let Some(snapshot_handle_result) = snapshot_handle_result {
621 if let Ok((snapshot_block_height, snapshot_storages)) =
624 snapshot_handle_result
625 {
626 assert!(last_cleaned_block_height <= snapshot_block_height);
627 last_cleaned_block_height = snapshot_block_height;
628 last_snapshot_storages = Some(snapshot_storages);
631 debug!(
632 "Number of snapshot storages kept alive for fastboot: {}",
633 last_snapshot_storages
634 .as_ref()
635 .map(|storages| storages.len())
636 .unwrap_or(0)
637 );
638 } else {
639 exit.store(true, Ordering::Relaxed);
640 return;
641 }
642 } else {
643 bank.shrink_candidate_slots();
644 if bank.block_height() - last_cleaned_block_height
645 > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
646 {
647 bank.force_flush_accounts_cache();
652 bank.clean_accounts(last_full_snapshot_slot);
653 last_cleaned_block_height = bank.block_height();
654 }
655 }
656 stats.record_and_maybe_submit(start_time.elapsed());
657 sleep(Duration::from_millis(INTERVAL_MS));
658 }
659 info!(
660 "ABS loop done. Number of snapshot storages kept alive for fastboot: {}",
661 last_snapshot_storages
662 .map(|storages| storages.len())
663 .unwrap_or(0)
664 );
665 })
666 .unwrap();
667
668 Self { t_background }
669 }
670
671 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
676 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
677
678 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
679 {
680 let root_bank = bank_forks.read().unwrap().root_bank();
681 root_bank.set_callback(Some(Box::new(
682 root_bank
683 .rc
684 .accounts
685 .accounts_db
686 .create_drop_bank_callback(pruned_banks_sender),
687 )));
688 }
689 pruned_banks_receiver
690 }
691
692 pub fn join(self) -> thread::Result<()> {
693 self.t_background.join()
694 }
695
696 fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
697 let now = Instant::now();
698 if now.duration_since(*last_expiration_check_time).as_secs()
699 > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
700 {
701 bank.expire_old_recycle_stores();
702 *last_expiration_check_time = now;
703 }
704 }
705}
706
707#[must_use]
709fn new_accounts_package_type(
710 snapshot_request: &SnapshotRequest,
711 snapshot_config: &SnapshotConfig,
712 last_full_snapshot_slot: Option<Slot>,
713) -> AccountsPackageType {
714 let block_height = snapshot_request.snapshot_root_bank.block_height();
715 match snapshot_request.request_type {
716 SnapshotRequestType::EpochAccountsHash => AccountsPackageType::EpochAccountsHash,
717 _ => {
718 if snapshot_utils::should_take_full_snapshot(
719 block_height,
720 snapshot_config.full_snapshot_archive_interval_slots,
721 ) {
722 AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
723 } else if snapshot_utils::should_take_incremental_snapshot(
724 block_height,
725 snapshot_config.incremental_snapshot_archive_interval_slots,
726 last_full_snapshot_slot,
727 ) {
728 AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(
729 last_full_snapshot_slot.unwrap(),
730 ))
731 } else {
732 AccountsPackageType::AccountsHashVerifier
733 }
734 }
735 }
736}
737
738#[must_use]
748fn cmp_requests_by_priority(
749 a: &(SnapshotRequest, AccountsPackageType),
750 b: &(SnapshotRequest, AccountsPackageType),
751) -> std::cmp::Ordering {
752 let (snapshot_request_a, accounts_package_type_a) = a;
753 let (snapshot_request_b, accounts_package_type_b) = b;
754 let slot_a = snapshot_request_a.snapshot_root_bank.slot();
755 let slot_b = snapshot_request_b.snapshot_root_bank.slot();
756 snapshot_package::cmp_accounts_package_types_by_priority(
757 accounts_package_type_a,
758 accounts_package_type_b,
759 )
760 .then(slot_a.cmp(&slot_b))
761}
762
763#[cfg(test)]
764mod test {
765 use {
766 super::*,
767 crate::{
768 epoch_accounts_hash::{self, EpochAccountsHash},
769 genesis_utils::create_genesis_config,
770 },
771 crossbeam_channel::unbounded,
772 solana_sdk::{
773 account::AccountSharedData, epoch_schedule::EpochSchedule, hash::Hash, pubkey::Pubkey,
774 },
775 };
776
777 #[test]
778 fn test_accounts_background_service_remove_dead_slots() {
779 let genesis = create_genesis_config(10);
780 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
781 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
782 let pruned_banks_request_handler = PrunedBanksRequestHandler {
783 pruned_banks_receiver,
784 };
785
786 let account_key = Pubkey::new_unique();
788 bank0.store_account(
789 &account_key,
790 &AccountSharedData::new(264, 0, &Pubkey::default()),
791 );
792 assert!(bank0.get_account(&account_key).is_some());
793 pruned_banks_sender.send((0, 0)).unwrap();
794
795 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
796
797 pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
798
799 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
800 }
801
802 #[test]
811 fn test_get_next_snapshot_request() {
812 const SLOTS_PER_EPOCH: Slot = 400;
817 const FULL_SNAPSHOT_INTERVAL: Slot = 80;
818 const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
819
820 let snapshot_config = SnapshotConfig {
821 full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
822 incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
823 ..SnapshotConfig::default()
824 };
825
826 let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
827 let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
828 let snapshot_request_handler = SnapshotRequestHandler {
829 snapshot_config,
830 snapshot_request_sender: snapshot_request_sender.clone(),
831 snapshot_request_receiver,
832 accounts_package_sender,
833 };
834
835 let send_snapshot_request = |snapshot_root_bank, request_type| {
836 let snapshot_request = SnapshotRequest {
837 snapshot_root_bank,
838 status_cache_slot_deltas: Vec::default(),
839 request_type,
840 enqueued: Instant::now(),
841 };
842 snapshot_request_sender.send(snapshot_request).unwrap();
843 };
844
845 let mut genesis_config_info = create_genesis_config(10);
846 genesis_config_info.genesis_config.epoch_schedule =
847 EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
848 let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
849 bank.set_startup_verification_complete();
850 bank.rc
853 .accounts
854 .accounts_db
855 .epoch_accounts_hash_manager
856 .set_valid(EpochAccountsHash::new(Hash::new_unique()), 0);
857
858 let mut parent = Arc::clone(&bank);
881 for _ in 0..303 {
882 let bank = Arc::new(Bank::new_from_parent(
883 &parent,
884 &Pubkey::new_unique(),
885 parent.slot() + 1,
886 ));
887
888 if bank.slot() == epoch_accounts_hash::calculation_start(&bank) {
889 send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::EpochAccountsHash);
890 } else {
891 send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::Snapshot);
892 }
893
894 parent = bank;
895 }
896
897 let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
899 .get_next_snapshot_request(None)
900 .unwrap();
901 assert_eq!(
902 accounts_package_type,
903 AccountsPackageType::EpochAccountsHash
904 );
905 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 100);
906
907 let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
910 .get_next_snapshot_request(None)
911 .unwrap();
912 assert_eq!(
913 accounts_package_type,
914 AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
915 );
916 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
917
918 let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
921 .get_next_snapshot_request(Some(240))
922 .unwrap();
923 assert_eq!(
924 accounts_package_type,
925 AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(240))
926 );
927 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
928
929 let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
932 .get_next_snapshot_request(Some(240))
933 .unwrap();
934 assert_eq!(
935 accounts_package_type,
936 AccountsPackageType::AccountsHashVerifier
937 );
938 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 303);
939
940 assert!(snapshot_request_handler
942 .get_next_snapshot_request(Some(240))
943 .is_none());
944 }
945}