1mod pending_snapshot_packages;
6mod stats;
7pub use pending_snapshot_packages::PendingSnapshotPackages;
8#[cfg(feature = "dev-context-only-utils")]
9use qualifier_attr::qualifiers;
10use {
11 crate::{
12 bank::{Bank, BankSlotDelta, DropCallback},
13 bank_forks::BankForks,
14 snapshot_controller::SnapshotController,
15 snapshot_package::{SnapshotKind, SnapshotPackage},
16 snapshot_utils::SnapshotError,
17 },
18 crossbeam_channel::{Receiver, SendError, Sender},
19 log::*,
20 rayon::iter::{IntoParallelIterator, ParallelIterator},
21 solana_clock::{BankId, Slot},
22 solana_measure::{measure::Measure, measure_us},
23 stats::StatsManager,
24 std::{
25 boxed::Box,
26 cmp,
27 fmt::{self, Debug, Formatter},
28 sync::{
29 atomic::{AtomicBool, AtomicU64, Ordering},
30 Arc, LazyLock, Mutex, RwLock,
31 },
32 thread::{self, sleep, Builder, JoinHandle},
33 time::{Duration, Instant},
34 },
35};
36
37const INTERVAL_MS: u64 = 100;
38const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
42const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
51const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
53
54#[derive(Debug, Default)]
55struct PrunedBankQueueLenReporter {
56 last_report_time: AtomicU64,
57}
58
59impl PrunedBankQueueLenReporter {
60 fn report(&self, q_len: usize) {
61 let now = solana_time_utils::timestamp();
62 let last_report_time = self.last_report_time.load(Ordering::Acquire);
63 if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
64 && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
65 {
66 datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
67 self.last_report_time.store(now, Ordering::Release);
68 }
69 }
70}
71
72static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
73 LazyLock::new(PrunedBankQueueLenReporter::default);
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77 sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81 fn callback(&self, bank: &Bank) {
82 BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83 if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84 info!("bank DropCallback signal queue disconnected.");
85 }
86 }
87
88 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89 Box::new(self.clone())
90 }
91}
92
93impl Debug for SendDroppedBankCallback {
94 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
95 write!(f, "SendDroppedBankCallback({self:p})")
96 }
97}
98
99impl SendDroppedBankCallback {
100 pub fn new(sender: DroppedSlotsSender) -> Self {
101 Self { sender }
102 }
103}
104
105pub struct SnapshotRequest {
106 pub snapshot_root_bank: Arc<Bank>,
107 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108 pub request_kind: SnapshotRequestKind,
109
110 pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("SnapshotRequest")
118 .field("request kind", &self.request_kind)
119 .field("bank slot", &self.snapshot_root_bank.slot())
120 .field("block height", &self.snapshot_root_bank.block_height())
121 .finish_non_exhaustive()
122 }
123}
124
125#[derive(Debug, Copy, Clone, Eq, PartialEq)]
127pub enum SnapshotRequestKind {
128 FullSnapshot,
129 IncrementalSnapshot,
130}
131
132pub struct SnapshotRequestHandler {
133 pub snapshot_controller: Arc<SnapshotController>,
134 pub snapshot_request_receiver: SnapshotRequestReceiver,
135 pub pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
136}
137
138impl SnapshotRequestHandler {
139 #[allow(clippy::type_complexity)]
141 pub fn handle_snapshot_requests(
142 &self,
143 non_snapshot_time_us: u128,
144 ) -> Option<Result<Slot, SnapshotError>> {
145 let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
146 self.get_next_snapshot_request()?;
147
148 datapoint_info!(
149 "handle_snapshot_requests",
150 ("num_outstanding_requests", num_outstanding_requests, i64),
151 ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
152 (
153 "enqueued_time_us",
154 snapshot_request.enqueued.elapsed().as_micros(),
155 i64
156 ),
157 );
158
159 let snapshot_kind = new_snapshot_kind(&snapshot_request)?;
160 Some(self.handle_snapshot_request(non_snapshot_time_us, snapshot_request, snapshot_kind))
161 }
162
163 fn get_next_snapshot_request(
173 &self,
174 ) -> Option<(
175 SnapshotRequest,
176 usize,
177 usize,
178 )> {
179 let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
180 let requests_len = requests.len();
181 debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
182
183 match requests_len {
184 0 => None,
185 1 => {
186 let snapshot_request = requests.pop().unwrap();
188 Some((snapshot_request, 1, 0))
189 }
190 _ => {
191 let (_, _y, z) =
196 requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
197 assert_eq!(z.len(), 1);
198
199 let snapshot_request = requests.pop().unwrap();
201
202 let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
203 let num_re_enqueued_requests = requests
205 .into_iter()
206 .filter(|snapshot_request| {
207 snapshot_request.snapshot_root_bank.slot() > handled_request_slot
208 })
209 .map(|snapshot_request| {
210 self.snapshot_controller
211 .request_sender()
212 .try_send(snapshot_request)
213 .expect("re-enqueue snapshot request");
214 })
215 .count();
216
217 Some((snapshot_request, requests_len, num_re_enqueued_requests))
218 }
219 }
220 }
221
222 fn handle_snapshot_request(
223 &self,
224 non_snapshot_time_us: u128,
225 snapshot_request: SnapshotRequest,
226 snapshot_kind: SnapshotKind,
227 ) -> Result<Slot, SnapshotError> {
228 info!("handling snapshot request: {snapshot_request:?}, {snapshot_kind:?}");
229 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
230 let SnapshotRequest {
231 snapshot_root_bank,
232 status_cache_slot_deltas,
233 request_kind: _,
234 enqueued: _,
235 } = snapshot_request;
236
237 assert!(snapshot_root_bank.has_initial_accounts_hash_verification_completed());
239
240 if snapshot_kind.is_full_snapshot() {
241 snapshot_root_bank
245 .rc
246 .accounts
247 .accounts_db
248 .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
249 }
250
251 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
252 snapshot_root_bank.force_flush_accounts_cache();
257 assert!(
261 snapshot_root_bank.slot()
262 <= snapshot_root_bank
263 .rc
264 .accounts
265 .accounts_db
266 .accounts_cache
267 .fetch_max_flush_root()
268 );
269 flush_accounts_cache_time.stop();
270
271 let mut clean_time = Measure::start("clean_time");
272 snapshot_root_bank.clean_accounts();
273 clean_time.stop();
274
275 let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
276
277 let mut shrink_time = Measure::start("shrink_time");
278 snapshot_root_bank.shrink_candidate_slots();
279 shrink_time.stop();
280
281 let mut snapshot_time = Measure::start("snapshot_time");
283 let snapshot_package = SnapshotPackage::new(
284 snapshot_kind,
285 &snapshot_root_bank,
286 snapshot_root_bank.get_snapshot_storages(None),
287 status_cache_slot_deltas,
288 );
289 self.pending_snapshot_packages
290 .lock()
291 .unwrap()
292 .push(snapshot_package);
293 snapshot_time.stop();
294 info!(
295 "Handled snapshot request. snapshot kind: {:?}, slot: {}, bank hash: {}",
296 snapshot_kind,
297 snapshot_root_bank.slot(),
298 snapshot_root_bank.hash(),
299 );
300
301 total_time.stop();
302
303 datapoint_info!(
304 "handle_snapshot_requests-timing",
305 (
306 "flush_accounts_cache_time",
307 flush_accounts_cache_time.as_us(),
308 i64
309 ),
310 ("shrink_time", shrink_time.as_us(), i64),
311 ("clean_time", clean_time.as_us(), i64),
312 ("snapshot_time", snapshot_time.as_us(), i64),
313 ("total_us", total_time.as_us(), i64),
314 ("non_snapshot_time_us", non_snapshot_time_us, i64),
315 ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
316 );
317 Ok(snapshot_root_bank.slot())
318 }
319
320 fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
322 let (next_request, _, _) = self.get_next_snapshot_request()?;
327 let next_slot = next_request.snapshot_root_bank.slot();
328
329 self.snapshot_controller
331 .request_sender()
332 .try_send(next_request)
333 .expect("re-enqueue snapshot request");
334
335 Some(next_slot)
336 }
337}
338
339#[derive(Debug)]
340pub struct PrunedBanksRequestHandler {
341 pub pruned_banks_receiver: DroppedSlotsReceiver,
342}
343
344impl PrunedBanksRequestHandler {
345 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
346 fn handle_request(&self, bank: &Bank) -> usize {
347 let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
348 banks_to_purge.sort_by_key(|(slot, _id)| *slot);
351 let num_banks_to_purge = banks_to_purge.len();
352
353 let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
355
356 let num_banks_with_same_slot =
359 num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
360 if num_banks_with_same_slot > 0 {
361 datapoint_info!(
362 "pruned_banks_request_handler",
363 ("num_pruned_banks", num_banks_to_purge, i64),
364 ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
365 );
366 }
367
368 let accounts_db = bank.rc.accounts.accounts_db.as_ref();
371 accounts_db.thread_pool_background.install(|| {
372 grouped_banks_to_purge.into_par_iter().for_each(|group| {
373 group.iter().for_each(|(slot, bank_id)| {
374 accounts_db.purge_slot(*slot, *bank_id, true);
375 })
376 });
377 });
378
379 num_banks_to_purge
380 }
381
382 fn remove_dead_slots(
383 &self,
384 bank: &Bank,
385 removed_slots_count: &mut usize,
386 total_remove_slots_time: &mut u64,
387 ) {
388 let mut remove_slots_time = Measure::start("remove_slots_time");
389 *removed_slots_count += self.handle_request(bank);
390 remove_slots_time.stop();
391 *total_remove_slots_time += remove_slots_time.as_us();
392
393 if *removed_slots_count >= 100 {
394 datapoint_info!(
395 "remove_slots_timing",
396 ("remove_slots_time", *total_remove_slots_time, i64),
397 ("removed_slots_count", *removed_slots_count, i64),
398 );
399 *total_remove_slots_time = 0;
400 *removed_slots_count = 0;
401 }
402 }
403}
404
405pub struct AbsRequestHandlers {
406 pub snapshot_request_handler: SnapshotRequestHandler,
407 pub pruned_banks_request_handler: PrunedBanksRequestHandler,
408}
409
410impl AbsRequestHandlers {
411 #[allow(clippy::type_complexity)]
413 pub fn handle_snapshot_requests(
414 &self,
415 non_snapshot_time_us: u128,
416 ) -> Option<Result<Slot, SnapshotError>> {
417 self.snapshot_request_handler
418 .handle_snapshot_requests(non_snapshot_time_us)
419 }
420}
421
422pub struct AccountsBackgroundService {
423 t_background: JoinHandle<()>,
424 status: AbsStatus,
425}
426
427impl AccountsBackgroundService {
428 pub fn new(
429 bank_forks: Arc<RwLock<BankForks>>,
430 exit: Arc<AtomicBool>,
431 request_handlers: AbsRequestHandlers,
432 ) -> Self {
433 let is_running = Arc::new(AtomicBool::new(true));
434 let stop = Arc::new(AtomicBool::new(false));
435 let mut last_cleaned_slot = 0;
436 let mut removed_slots_count = 0;
437 let mut total_remove_slots_time = 0;
438 let t_background = Builder::new()
439 .name("solBgAccounts".to_string())
440 .spawn({
441 let is_running = is_running.clone();
442 let stop = stop.clone();
443
444 move || {
445 info!("AccountsBackgroundService has started");
446 let mut stats = StatsManager::new();
447 let mut last_snapshot_end_time = None;
448 let mut previous_clean_time = Instant::now();
449 let mut previous_shrink_time = Instant::now();
450
451 loop {
452 if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
453 break;
454 }
455 let start_time = Instant::now();
456
457 let bank = bank_forks.read().unwrap().root_bank();
459
460 request_handlers
462 .pruned_banks_request_handler
463 .remove_dead_slots(
464 &bank,
465 &mut removed_slots_count,
466 &mut total_remove_slots_time,
467 );
468
469 let non_snapshot_time = last_snapshot_end_time
470 .map(|last_snapshot_end_time: Instant| {
471 last_snapshot_end_time.elapsed().as_micros()
472 })
473 .unwrap_or_default();
474
475 let snapshot_handle_result = bank
500 .has_initial_accounts_hash_verification_completed()
501 .then(|| request_handlers.handle_snapshot_requests(non_snapshot_time))
502 .flatten();
503
504 if let Some(snapshot_handle_result) = snapshot_handle_result {
505 last_snapshot_end_time = Some(Instant::now());
508 match snapshot_handle_result {
509 Ok(snapshot_slot) => {
510 assert!(
511 last_cleaned_slot <= snapshot_slot,
512 "last cleaned slot: {last_cleaned_slot}, snapshot request \
513 slot: {snapshot_slot}, is startup verification complete: \
514 {}, enqueued snapshot requests: {:?}",
515 bank.has_initial_accounts_hash_verification_completed(),
516 request_handlers
517 .snapshot_request_handler
518 .snapshot_request_receiver
519 .try_iter()
520 .collect::<Vec<_>>(),
521 );
522 last_cleaned_slot = snapshot_slot;
523 previous_clean_time = Instant::now();
524 previous_shrink_time = Instant::now();
525 }
526 Err(err) => {
527 error!(
528 "Stopping AccountsBackgroundService! Fatal error while \
529 handling snapshot requests: {err}",
530 );
531 exit.store(true, Ordering::Relaxed);
532 break;
533 }
534 }
535 } else {
536 let next_snapshot_request_slot = request_handlers
539 .snapshot_request_handler
540 .peek_next_snapshot_request_slot();
541
542 let max_clean_slot_inclusive = cmp::min(
546 next_snapshot_request_slot.unwrap_or(Slot::MAX),
547 bank.slot(),
548 )
549 .saturating_sub(1);
550
551 let duration_since_previous_clean = previous_clean_time.elapsed();
552 let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
553
554 let force_flush = should_clean;
556 bank.rc
557 .accounts
558 .accounts_db
559 .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
560
561 if should_clean {
562 bank.rc.accounts.accounts_db.clean_accounts(
563 Some(max_clean_slot_inclusive),
564 false,
565 bank.epoch_schedule(),
566 );
567 last_cleaned_slot = max_clean_slot_inclusive;
568 previous_clean_time = Instant::now();
569 }
570
571 let duration_since_previous_shrink = previous_shrink_time.elapsed();
572 let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
573 if should_shrink || should_clean {
576 if should_clean {
577 bank.shrink_ancient_slots();
580 }
581 bank.shrink_candidate_slots();
582 previous_shrink_time = Instant::now();
583 }
584 }
585 stats.record_and_maybe_submit(start_time.elapsed());
586 sleep(Duration::from_millis(INTERVAL_MS));
587 }
588 info!("AccountsBackgroundService has stopped");
589 is_running.store(false, Ordering::Relaxed);
590 }
591 })
592 .unwrap();
593
594 Self {
595 t_background,
596 status: AbsStatus { is_running, stop },
597 }
598 }
599
600 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
605 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
606
607 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
608 {
609 let root_bank = bank_forks.read().unwrap().root_bank();
610
611 root_bank
612 .rc
613 .accounts
614 .accounts_db
615 .enable_bank_drop_callback();
616 root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
617 pruned_banks_sender,
618 ))));
619 }
620 pruned_banks_receiver
621 }
622
623 pub fn join(self) -> thread::Result<()> {
624 self.t_background.join()
625 }
626
627 pub fn status(&self) -> &AbsStatus {
629 &self.status
630 }
631}
632
633#[derive(Debug, Clone)]
635pub struct AbsStatus {
636 is_running: Arc<AtomicBool>,
638 stop: Arc<AtomicBool>,
640}
641
642impl AbsStatus {
643 pub fn is_running(&self) -> bool {
645 self.is_running.load(Ordering::Relaxed)
646 }
647
648 pub fn stop(&self) {
650 self.stop.store(true, Ordering::Relaxed)
651 }
652
653 #[cfg(feature = "dev-context-only-utils")]
654 pub fn new_for_tests() -> Self {
655 Self {
656 is_running: Arc::new(AtomicBool::new(false)),
657 stop: Arc::new(AtomicBool::new(false)),
658 }
659 }
660}
661
662#[must_use]
664fn new_snapshot_kind(snapshot_request: &SnapshotRequest) -> Option<SnapshotKind> {
665 match snapshot_request.request_kind {
666 SnapshotRequestKind::FullSnapshot => Some(SnapshotKind::FullSnapshot),
667 SnapshotRequestKind::IncrementalSnapshot => {
668 if let Some(latest_full_snapshot_slot) = snapshot_request
669 .snapshot_root_bank
670 .rc
671 .accounts
672 .accounts_db
673 .latest_full_snapshot_slot()
674 {
675 Some(SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot))
676 } else {
677 warn!(
678 "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
679 full snapshot",
680 snapshot_request.snapshot_root_bank.slot()
681 );
682 None
683 }
684 }
685 }
686}
687
688#[must_use]
697fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
698 let slot_a = a.snapshot_root_bank.slot();
699 let slot_b = b.snapshot_root_bank.slot();
700 cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
701 .then(slot_a.cmp(&slot_b))
702}
703
704#[must_use]
710fn cmp_snapshot_request_kinds_by_priority(
711 a: &SnapshotRequestKind,
712 b: &SnapshotRequestKind,
713) -> cmp::Ordering {
714 use {
715 cmp::Ordering::{Equal, Greater, Less},
716 SnapshotRequestKind as Kind,
717 };
718 match (a, b) {
719 (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
720 (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
721 (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
722 (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
723 }
724}
725
726#[cfg(test)]
727mod test {
728 use {
729 super::*,
730 crate::{
731 genesis_utils::create_genesis_config, snapshot_config::SnapshotConfig,
732 snapshot_utils::SnapshotInterval,
733 },
734 crossbeam_channel::unbounded,
735 solana_account::AccountSharedData,
736 solana_epoch_schedule::EpochSchedule,
737 solana_pubkey::Pubkey,
738 std::num::NonZeroU64,
739 };
740
741 #[test]
742 fn test_accounts_background_service_remove_dead_slots() {
743 let genesis = create_genesis_config(10);
744 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
745 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
746 let pruned_banks_request_handler = PrunedBanksRequestHandler {
747 pruned_banks_receiver,
748 };
749
750 let account_key = Pubkey::new_unique();
752 bank0.store_account(
753 &account_key,
754 &AccountSharedData::new(264, 0, &Pubkey::default()),
755 );
756 assert!(bank0.get_account(&account_key).is_some());
757 pruned_banks_sender.send((0, 0)).unwrap();
758
759 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
760
761 pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
762
763 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
764 }
765
766 #[test]
771 fn test_get_next_snapshot_request() {
772 const SLOTS_PER_EPOCH: Slot = 400;
775 const FULL_SNAPSHOT_INTERVAL: Slot = 80;
776 const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
777
778 let snapshot_config = SnapshotConfig {
779 full_snapshot_archive_interval: SnapshotInterval::Slots(
780 NonZeroU64::new(FULL_SNAPSHOT_INTERVAL).unwrap(),
781 ),
782 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
783 NonZeroU64::new(INCREMENTAL_SNAPSHOT_INTERVAL).unwrap(),
784 ),
785 ..SnapshotConfig::default()
786 };
787
788 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
789 let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
790 let snapshot_controller = Arc::new(SnapshotController::new(
791 snapshot_request_sender.clone(),
792 snapshot_config,
793 0,
794 ));
795 let snapshot_request_handler = SnapshotRequestHandler {
796 snapshot_controller,
797 snapshot_request_receiver,
798 pending_snapshot_packages,
799 };
800
801 let send_snapshot_request = |snapshot_root_bank, request_kind| {
802 let snapshot_request = SnapshotRequest {
803 snapshot_root_bank,
804 status_cache_slot_deltas: Vec::default(),
805 request_kind,
806 enqueued: Instant::now(),
807 };
808 snapshot_request_sender.send(snapshot_request).unwrap();
809 };
810
811 let mut genesis_config_info = create_genesis_config(10);
812 genesis_config_info.genesis_config.epoch_schedule =
813 EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
814 let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
815 bank.set_initial_accounts_hash_verification_completed();
816
817 let bank0 = bank.clone();
821 fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
822 bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
823 }
824 fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
825 bank.rc
826 .accounts
827 .accounts_db
828 .set_latest_full_snapshot_slot(slot);
829 }
830
831 let mut make_banks = |num_banks| {
848 for _ in 0..num_banks {
849 let slot = bank.slot() + 1;
850 bank = Arc::new(Bank::new_from_parent(
851 bank.clone(),
852 &Pubkey::new_unique(),
853 slot,
854 ));
855
856 if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
859 send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
860 } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
861 send_snapshot_request(
862 Arc::clone(&bank),
863 SnapshotRequestKind::IncrementalSnapshot,
864 );
865 }
866 }
867 };
868 make_banks(303);
869
870 assert_eq!(latest_full_snapshot_slot(&bank0), None);
873 let (snapshot_request, ..) = snapshot_request_handler
874 .get_next_snapshot_request()
875 .unwrap();
876 assert_eq!(
877 snapshot_request.request_kind,
878 SnapshotRequestKind::FullSnapshot
879 );
880 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
881 set_latest_full_snapshot_slot(&bank0, 240);
882
883 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
886 let (snapshot_request, ..) = snapshot_request_handler
887 .get_next_snapshot_request()
888 .unwrap();
889 assert_eq!(
890 snapshot_request.request_kind,
891 SnapshotRequestKind::IncrementalSnapshot
892 );
893 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
894
895 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
897 assert!(snapshot_request_handler
898 .get_next_snapshot_request()
899 .is_none());
900 }
901
902 #[test]
904 fn test_pruned_banks_request_handler_handle_request() {
905 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
906 let pruned_banks_request_handler = PrunedBanksRequestHandler {
907 pruned_banks_receiver,
908 };
909 let genesis_config_info = create_genesis_config(10);
910 let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
911 bank.set_initial_accounts_hash_verification_completed();
912 bank.rc.accounts.accounts_db.enable_bank_drop_callback();
913 bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
914 pruned_banks_sender,
915 ))));
916
917 let fork0_bank0 = Arc::new(bank);
918 let fork0_bank1 = Arc::new(Bank::new_from_parent(
919 fork0_bank0.clone(),
920 &Pubkey::new_unique(),
921 fork0_bank0.slot() + 1,
922 ));
923 let fork1_bank1 = Arc::new(Bank::new_from_parent(
924 fork0_bank0.clone(),
925 &Pubkey::new_unique(),
926 fork0_bank0.slot() + 1,
927 ));
928 let fork2_bank1 = Arc::new(Bank::new_from_parent(
929 fork0_bank0.clone(),
930 &Pubkey::new_unique(),
931 fork0_bank0.slot() + 1,
932 ));
933 let fork0_bank2 = Arc::new(Bank::new_from_parent(
934 fork0_bank1.clone(),
935 &Pubkey::new_unique(),
936 fork0_bank1.slot() + 1,
937 ));
938 let fork1_bank2 = Arc::new(Bank::new_from_parent(
939 fork1_bank1.clone(),
940 &Pubkey::new_unique(),
941 fork1_bank1.slot() + 1,
942 ));
943 let fork0_bank3 = Arc::new(Bank::new_from_parent(
944 fork0_bank2.clone(),
945 &Pubkey::new_unique(),
946 fork0_bank2.slot() + 1,
947 ));
948 let fork3_bank3 = Arc::new(Bank::new_from_parent(
949 fork0_bank2.clone(),
950 &Pubkey::new_unique(),
951 fork0_bank2.slot() + 1,
952 ));
953 fork0_bank3.squash();
954
955 drop(fork3_bank3);
956 drop(fork1_bank2);
957 drop(fork0_bank2);
958 drop(fork1_bank1);
959 drop(fork2_bank1);
960 drop(fork0_bank1);
961 drop(fork0_bank0);
962 let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
963 assert_eq!(num_banks_purged, 7);
964 }
965
966 #[test]
967 fn test_cmp_snapshot_request_kinds_by_priority() {
968 use cmp::Ordering::{Equal, Greater, Less};
969 for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
970 (
971 SnapshotRequestKind::FullSnapshot,
972 SnapshotRequestKind::FullSnapshot,
973 Equal,
974 ),
975 (
976 SnapshotRequestKind::FullSnapshot,
977 SnapshotRequestKind::IncrementalSnapshot,
978 Greater,
979 ),
980 (
981 SnapshotRequestKind::IncrementalSnapshot,
982 SnapshotRequestKind::FullSnapshot,
983 Less,
984 ),
985 (
986 SnapshotRequestKind::IncrementalSnapshot,
987 SnapshotRequestKind::IncrementalSnapshot,
988 Equal,
989 ),
990 ] {
991 let actual_result = cmp_snapshot_request_kinds_by_priority(
992 &snapshot_request_kind_a,
993 &snapshot_request_kind_b,
994 );
995 assert_eq!(expected_result, actual_result);
996 }
997 }
998}