1mod pending_snapshot_packages;
6mod stats;
7pub use pending_snapshot_packages::PendingSnapshotPackages;
8#[cfg(feature = "dev-context-only-utils")]
9use qualifier_attr::qualifiers;
10use {
11 crate::{
12 bank::{Bank, BankSlotDelta, DropCallback},
13 bank_forks::BankForks,
14 snapshot_controller::SnapshotController,
15 snapshot_package::SnapshotPackage,
16 },
17 agave_snapshots::{error::SnapshotError, SnapshotKind},
18 crossbeam_channel::{Receiver, SendError, Sender},
19 log::*,
20 rayon::iter::{IntoParallelIterator, ParallelIterator},
21 solana_clock::{BankId, Slot},
22 solana_measure::{measure::Measure, measure_us},
23 stats::StatsManager,
24 std::{
25 boxed::Box,
26 cmp,
27 fmt::{self, Debug, Formatter},
28 sync::{
29 atomic::{AtomicBool, AtomicU64, Ordering},
30 Arc, LazyLock, Mutex, RwLock,
31 },
32 thread::{self, sleep, Builder, JoinHandle},
33 time::{Duration, Instant},
34 },
35};
36
37const INTERVAL_MS: u64 = 100;
38const CLEAN_INTERVAL: Duration = Duration::from_secs(50);
42const SHRINK_INTERVAL: Duration = Duration::from_secs(1);
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
51const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
53
54#[derive(Debug, Default)]
55struct PrunedBankQueueLenReporter {
56 last_report_time: AtomicU64,
57}
58
59impl PrunedBankQueueLenReporter {
60 fn report(&self, q_len: usize) {
61 let now = solana_time_utils::timestamp();
62 let last_report_time = self.last_report_time.load(Ordering::Acquire);
63 if q_len > MAX_DROP_BANK_SIGNAL_QUEUE_SIZE
64 && now.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL
65 {
66 datapoint_warn!("excessive_pruned_bank_channel_len", ("len", q_len, i64));
67 self.last_report_time.store(now, Ordering::Release);
68 }
69 }
70}
71
72static BANK_DROP_QUEUE_REPORTER: LazyLock<PrunedBankQueueLenReporter> =
73 LazyLock::new(PrunedBankQueueLenReporter::default);
74
75#[derive(Clone)]
76pub struct SendDroppedBankCallback {
77 sender: DroppedSlotsSender,
78}
79
80impl DropCallback for SendDroppedBankCallback {
81 fn callback(&self, bank: &Bank) {
82 BANK_DROP_QUEUE_REPORTER.report(self.sender.len());
83 if let Err(SendError(_)) = self.sender.send((bank.slot(), bank.bank_id())) {
84 info!("bank DropCallback signal queue disconnected.");
85 }
86 }
87
88 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
89 Box::new(self.clone())
90 }
91}
92
93impl Debug for SendDroppedBankCallback {
94 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
95 write!(f, "SendDroppedBankCallback({self:p})")
96 }
97}
98
99impl SendDroppedBankCallback {
100 pub fn new(sender: DroppedSlotsSender) -> Self {
101 Self { sender }
102 }
103}
104
105pub struct SnapshotRequest {
106 pub snapshot_root_bank: Arc<Bank>,
107 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
108 pub request_kind: SnapshotRequestKind,
109
110 pub enqueued: Instant,
113}
114
115impl Debug for SnapshotRequest {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("SnapshotRequest")
118 .field("request kind", &self.request_kind)
119 .field("bank slot", &self.snapshot_root_bank.slot())
120 .field("block height", &self.snapshot_root_bank.block_height())
121 .finish_non_exhaustive()
122 }
123}
124
125#[derive(Debug, Copy, Clone, Eq, PartialEq)]
127pub enum SnapshotRequestKind {
128 FullSnapshot,
129 IncrementalSnapshot,
130}
131
132pub struct SnapshotRequestHandler {
133 pub snapshot_controller: Arc<SnapshotController>,
134 pub snapshot_request_receiver: SnapshotRequestReceiver,
135 pub pending_snapshot_packages: Arc<Mutex<PendingSnapshotPackages>>,
136}
137
138impl SnapshotRequestHandler {
139 #[allow(clippy::type_complexity)]
141 pub fn handle_snapshot_requests(
142 &self,
143 non_snapshot_time_us: u128,
144 ) -> Option<Result<Slot, SnapshotError>> {
145 let (snapshot_request, num_outstanding_requests, num_re_enqueued_requests) =
146 self.get_next_snapshot_request()?;
147
148 datapoint_info!(
149 "handle_snapshot_requests",
150 ("num_outstanding_requests", num_outstanding_requests, i64),
151 ("num_re_enqueued_requests", num_re_enqueued_requests, i64),
152 (
153 "enqueued_time_us",
154 snapshot_request.enqueued.elapsed().as_micros(),
155 i64
156 ),
157 );
158
159 let snapshot_kind = new_snapshot_kind(&snapshot_request)?;
160 Some(self.handle_snapshot_request(non_snapshot_time_us, snapshot_request, snapshot_kind))
161 }
162
163 fn get_next_snapshot_request(
173 &self,
174 ) -> Option<(
175 SnapshotRequest,
176 usize,
177 usize,
178 )> {
179 let mut requests: Vec<_> = self.snapshot_request_receiver.try_iter().collect();
180 let requests_len = requests.len();
181 debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
182
183 match requests_len {
184 0 => None,
185 1 => {
186 let snapshot_request = requests.pop().unwrap();
188 Some((snapshot_request, 1, 0))
189 }
190 _ => {
191 let (_, _y, z) =
196 requests.select_nth_unstable_by(requests_len - 2, cmp_requests_by_priority);
197 assert_eq!(z.len(), 1);
198
199 let snapshot_request = requests.pop().unwrap();
201
202 let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
203 let num_re_enqueued_requests = requests
205 .into_iter()
206 .filter(|snapshot_request| {
207 snapshot_request.snapshot_root_bank.slot() > handled_request_slot
208 })
209 .map(|snapshot_request| {
210 self.snapshot_controller
211 .request_sender()
212 .try_send(snapshot_request)
213 .expect("re-enqueue snapshot request");
214 })
215 .count();
216
217 Some((snapshot_request, requests_len, num_re_enqueued_requests))
218 }
219 }
220 }
221
222 fn handle_snapshot_request(
223 &self,
224 non_snapshot_time_us: u128,
225 snapshot_request: SnapshotRequest,
226 snapshot_kind: SnapshotKind,
227 ) -> Result<Slot, SnapshotError> {
228 info!("handling snapshot request: {snapshot_request:?}, {snapshot_kind:?}");
229 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
230 let SnapshotRequest {
231 snapshot_root_bank,
232 status_cache_slot_deltas,
233 request_kind: _,
234 enqueued: _,
235 } = snapshot_request;
236
237 if snapshot_kind.is_full_snapshot() {
238 snapshot_root_bank
242 .rc
243 .accounts
244 .accounts_db
245 .set_latest_full_snapshot_slot(snapshot_root_bank.slot());
246 }
247
248 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
249 snapshot_root_bank.force_flush_accounts_cache();
254 assert!(
258 snapshot_root_bank.slot()
259 <= snapshot_root_bank
260 .rc
261 .accounts
262 .accounts_db
263 .accounts_cache
264 .fetch_max_flush_root()
265 );
266 flush_accounts_cache_time.stop();
267
268 let mut clean_time = Measure::start("clean_time");
269 snapshot_root_bank.clean_accounts();
270 clean_time.stop();
271
272 let (_, shrink_ancient_time_us) = measure_us!(snapshot_root_bank.shrink_ancient_slots());
273
274 let mut shrink_time = Measure::start("shrink_time");
275 snapshot_root_bank.shrink_candidate_slots();
276 shrink_time.stop();
277
278 let mut snapshot_time = Measure::start("snapshot_time");
280 let snapshot_package = SnapshotPackage::new(
281 snapshot_kind,
282 &snapshot_root_bank,
283 snapshot_root_bank.get_snapshot_storages(None),
284 status_cache_slot_deltas,
285 );
286 self.pending_snapshot_packages
287 .lock()
288 .unwrap()
289 .push(snapshot_package);
290 snapshot_time.stop();
291 info!(
292 "Handled snapshot request. snapshot kind: {:?}, slot: {}, bank hash: {}",
293 snapshot_kind,
294 snapshot_root_bank.slot(),
295 snapshot_root_bank.hash(),
296 );
297
298 total_time.stop();
299
300 datapoint_info!(
301 "handle_snapshot_requests-timing",
302 (
303 "flush_accounts_cache_time",
304 flush_accounts_cache_time.as_us(),
305 i64
306 ),
307 ("shrink_time", shrink_time.as_us(), i64),
308 ("clean_time", clean_time.as_us(), i64),
309 ("snapshot_time", snapshot_time.as_us(), i64),
310 ("total_us", total_time.as_us(), i64),
311 ("non_snapshot_time_us", non_snapshot_time_us, i64),
312 ("shrink_ancient_time_us", shrink_ancient_time_us, i64),
313 );
314 Ok(snapshot_root_bank.slot())
315 }
316
317 fn peek_next_snapshot_request_slot(&self) -> Option<Slot> {
319 let (next_request, _, _) = self.get_next_snapshot_request()?;
324 let next_slot = next_request.snapshot_root_bank.slot();
325
326 self.snapshot_controller
328 .request_sender()
329 .try_send(next_request)
330 .expect("re-enqueue snapshot request");
331
332 Some(next_slot)
333 }
334}
335
336#[derive(Debug)]
337pub struct PrunedBanksRequestHandler {
338 pub pruned_banks_receiver: DroppedSlotsReceiver,
339}
340
341impl PrunedBanksRequestHandler {
342 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
343 fn handle_request(&self, bank: &Bank) -> usize {
344 let mut banks_to_purge: Vec<_> = self.pruned_banks_receiver.try_iter().collect();
345 banks_to_purge.sort_by_key(|(slot, _id)| *slot);
348 let num_banks_to_purge = banks_to_purge.len();
349
350 let grouped_banks_to_purge: Vec<_> = banks_to_purge.chunk_by(|a, b| a.0 == b.0).collect();
352
353 let num_banks_with_same_slot =
356 num_banks_to_purge.saturating_sub(grouped_banks_to_purge.len());
357 if num_banks_with_same_slot > 0 {
358 datapoint_info!(
359 "pruned_banks_request_handler",
360 ("num_pruned_banks", num_banks_to_purge, i64),
361 ("num_banks_with_same_slot", num_banks_with_same_slot, i64),
362 );
363 }
364
365 let accounts_db = bank.rc.accounts.accounts_db.as_ref();
368 accounts_db.thread_pool_background.install(|| {
369 grouped_banks_to_purge.into_par_iter().for_each(|group| {
370 group.iter().for_each(|(slot, bank_id)| {
371 accounts_db.purge_slot(*slot, *bank_id, true);
372 })
373 });
374 });
375
376 num_banks_to_purge
377 }
378
379 fn remove_dead_slots(
380 &self,
381 bank: &Bank,
382 removed_slots_count: &mut usize,
383 total_remove_slots_time: &mut u64,
384 ) {
385 let mut remove_slots_time = Measure::start("remove_slots_time");
386 *removed_slots_count += self.handle_request(bank);
387 remove_slots_time.stop();
388 *total_remove_slots_time += remove_slots_time.as_us();
389
390 if *removed_slots_count >= 100 {
391 datapoint_info!(
392 "remove_slots_timing",
393 ("remove_slots_time", *total_remove_slots_time, i64),
394 ("removed_slots_count", *removed_slots_count, i64),
395 );
396 *total_remove_slots_time = 0;
397 *removed_slots_count = 0;
398 }
399 }
400}
401
402pub struct AbsRequestHandlers {
403 pub snapshot_request_handler: SnapshotRequestHandler,
404 pub pruned_banks_request_handler: PrunedBanksRequestHandler,
405}
406
407impl AbsRequestHandlers {
408 #[allow(clippy::type_complexity)]
410 pub fn handle_snapshot_requests(
411 &self,
412 non_snapshot_time_us: u128,
413 ) -> Option<Result<Slot, SnapshotError>> {
414 self.snapshot_request_handler
415 .handle_snapshot_requests(non_snapshot_time_us)
416 }
417}
418
419pub struct AccountsBackgroundService {
420 t_background: JoinHandle<()>,
421 status: AbsStatus,
422}
423
424impl AccountsBackgroundService {
425 pub fn new(
426 bank_forks: Arc<RwLock<BankForks>>,
427 exit: Arc<AtomicBool>,
428 request_handlers: AbsRequestHandlers,
429 ) -> Self {
430 let is_running = Arc::new(AtomicBool::new(true));
431 let stop = Arc::new(AtomicBool::new(false));
432 let mut last_cleaned_slot = 0;
433 let mut removed_slots_count = 0;
434 let mut total_remove_slots_time = 0;
435 let t_background = Builder::new()
436 .name("solAcctsBgSvc".to_string())
437 .spawn({
438 let is_running = is_running.clone();
439 let stop = stop.clone();
440
441 move || {
442 info!("AccountsBackgroundService has started");
443 let mut stats = StatsManager::new();
444 let mut last_snapshot_end_time = None;
445 let mut previous_clean_time = Instant::now();
446 let mut previous_shrink_time = Instant::now();
447
448 loop {
449 if exit.load(Ordering::Relaxed) || stop.load(Ordering::Relaxed) {
450 break;
451 }
452 let start_time = Instant::now();
453
454 let bank = bank_forks.read().unwrap().root_bank();
456
457 request_handlers
459 .pruned_banks_request_handler
460 .remove_dead_slots(
461 &bank,
462 &mut removed_slots_count,
463 &mut total_remove_slots_time,
464 );
465
466 let non_snapshot_time = last_snapshot_end_time
467 .map(|last_snapshot_end_time: Instant| {
468 last_snapshot_end_time.elapsed().as_micros()
469 })
470 .unwrap_or_default();
471
472 let snapshot_handle_result =
492 request_handlers.handle_snapshot_requests(non_snapshot_time);
493
494 if let Some(snapshot_handle_result) = snapshot_handle_result {
495 last_snapshot_end_time = Some(Instant::now());
498 match snapshot_handle_result {
499 Ok(snapshot_slot) => {
500 assert!(
501 last_cleaned_slot <= snapshot_slot,
502 "last cleaned slot: {last_cleaned_slot}, snapshot request \
503 slot: {snapshot_slot}, enqueued snapshot requests: {:?}",
504 request_handlers
505 .snapshot_request_handler
506 .snapshot_request_receiver
507 .try_iter()
508 .collect::<Vec<_>>(),
509 );
510 last_cleaned_slot = snapshot_slot;
511 previous_clean_time = Instant::now();
512 previous_shrink_time = Instant::now();
513 }
514 Err(err) => {
515 error!(
516 "Stopping AccountsBackgroundService! Fatal error while \
517 handling snapshot requests: {err}",
518 );
519 exit.store(true, Ordering::Relaxed);
520 break;
521 }
522 }
523 } else {
524 let next_snapshot_request_slot = request_handlers
527 .snapshot_request_handler
528 .peek_next_snapshot_request_slot();
529
530 let max_clean_slot_inclusive = cmp::min(
534 next_snapshot_request_slot.unwrap_or(Slot::MAX),
535 bank.slot(),
536 )
537 .saturating_sub(1);
538
539 let duration_since_previous_clean = previous_clean_time.elapsed();
540 let should_clean = duration_since_previous_clean > CLEAN_INTERVAL;
541
542 let force_flush = should_clean;
544 bank.rc
545 .accounts
546 .accounts_db
547 .flush_accounts_cache(force_flush, Some(max_clean_slot_inclusive));
548
549 if should_clean {
550 bank.rc.accounts.accounts_db.clean_accounts(
551 Some(max_clean_slot_inclusive),
552 false,
553 bank.epoch_schedule(),
554 );
555 last_cleaned_slot = max_clean_slot_inclusive;
556 previous_clean_time = Instant::now();
557 }
558
559 let duration_since_previous_shrink = previous_shrink_time.elapsed();
560 let should_shrink = duration_since_previous_shrink > SHRINK_INTERVAL;
561 if should_shrink || should_clean {
564 if should_clean {
565 bank.shrink_ancient_slots();
568 }
569 bank.shrink_candidate_slots();
570 previous_shrink_time = Instant::now();
571 }
572 }
573 stats.record_and_maybe_submit(start_time.elapsed());
574 sleep(Duration::from_millis(INTERVAL_MS));
575 }
576 info!("AccountsBackgroundService has stopped");
577 is_running.store(false, Ordering::Relaxed);
578 }
579 })
580 .unwrap();
581
582 Self {
583 t_background,
584 status: AbsStatus { is_running, stop },
585 }
586 }
587
588 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
593 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
594
595 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
596 {
597 let root_bank = bank_forks.read().unwrap().root_bank();
598
599 root_bank
600 .rc
601 .accounts
602 .accounts_db
603 .enable_bank_drop_callback();
604 root_bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
605 pruned_banks_sender,
606 ))));
607 }
608 pruned_banks_receiver
609 }
610
611 pub fn join(self) -> thread::Result<()> {
612 self.t_background.join()
613 }
614
615 pub fn status(&self) -> &AbsStatus {
617 &self.status
618 }
619}
620
621#[derive(Debug, Clone)]
623pub struct AbsStatus {
624 is_running: Arc<AtomicBool>,
626 stop: Arc<AtomicBool>,
628}
629
630impl AbsStatus {
631 pub fn is_running(&self) -> bool {
633 self.is_running.load(Ordering::Relaxed)
634 }
635
636 pub fn stop(&self) {
638 self.stop.store(true, Ordering::Relaxed)
639 }
640
641 #[cfg(feature = "dev-context-only-utils")]
642 pub fn new_for_tests() -> Self {
643 Self {
644 is_running: Arc::new(AtomicBool::new(false)),
645 stop: Arc::new(AtomicBool::new(false)),
646 }
647 }
648}
649
650#[must_use]
652fn new_snapshot_kind(snapshot_request: &SnapshotRequest) -> Option<SnapshotKind> {
653 match snapshot_request.request_kind {
654 SnapshotRequestKind::FullSnapshot => Some(SnapshotKind::FullSnapshot),
655 SnapshotRequestKind::IncrementalSnapshot => {
656 if let Some(latest_full_snapshot_slot) = snapshot_request
657 .snapshot_root_bank
658 .rc
659 .accounts
660 .accounts_db
661 .latest_full_snapshot_slot()
662 {
663 Some(SnapshotKind::IncrementalSnapshot(latest_full_snapshot_slot))
664 } else {
665 warn!(
666 "Ignoring IncrementalSnapshot request for slot {} because there is no latest \
667 full snapshot",
668 snapshot_request.snapshot_root_bank.slot()
669 );
670 None
671 }
672 }
673 }
674}
675
676#[must_use]
685fn cmp_requests_by_priority(a: &SnapshotRequest, b: &SnapshotRequest) -> cmp::Ordering {
686 let slot_a = a.snapshot_root_bank.slot();
687 let slot_b = b.snapshot_root_bank.slot();
688 cmp_snapshot_request_kinds_by_priority(&a.request_kind, &b.request_kind)
689 .then(slot_a.cmp(&slot_b))
690}
691
692#[must_use]
698fn cmp_snapshot_request_kinds_by_priority(
699 a: &SnapshotRequestKind,
700 b: &SnapshotRequestKind,
701) -> cmp::Ordering {
702 use {
703 cmp::Ordering::{Equal, Greater, Less},
704 SnapshotRequestKind as Kind,
705 };
706 match (a, b) {
707 (Kind::FullSnapshot, Kind::FullSnapshot) => Equal,
708 (Kind::FullSnapshot, Kind::IncrementalSnapshot) => Greater,
709 (Kind::IncrementalSnapshot, Kind::FullSnapshot) => Less,
710 (Kind::IncrementalSnapshot, Kind::IncrementalSnapshot) => Equal,
711 }
712}
713
714#[cfg(test)]
715mod test {
716 use {
717 super::*,
718 crate::genesis_utils::create_genesis_config,
719 agave_snapshots::{snapshot_config::SnapshotConfig, SnapshotInterval},
720 crossbeam_channel::unbounded,
721 solana_account::AccountSharedData,
722 solana_epoch_schedule::EpochSchedule,
723 solana_pubkey::Pubkey,
724 std::num::NonZeroU64,
725 };
726
727 #[test]
728 fn test_accounts_background_service_remove_dead_slots() {
729 let genesis = create_genesis_config(10);
730 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
731 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
732 let pruned_banks_request_handler = PrunedBanksRequestHandler {
733 pruned_banks_receiver,
734 };
735
736 let account_key = Pubkey::new_unique();
738 bank0.store_account(
739 &account_key,
740 &AccountSharedData::new(264, 0, &Pubkey::default()),
741 );
742 assert!(bank0.get_account(&account_key).is_some());
743 pruned_banks_sender.send((0, 0)).unwrap();
744
745 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
746
747 pruned_banks_request_handler.remove_dead_slots(&bank0, &mut 0, &mut 0);
748
749 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
750 }
751
752 #[test]
757 fn test_get_next_snapshot_request() {
758 const SLOTS_PER_EPOCH: Slot = 400;
761 const FULL_SNAPSHOT_INTERVAL: Slot = 80;
762 const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 30;
763
764 let snapshot_config = SnapshotConfig {
765 full_snapshot_archive_interval: SnapshotInterval::Slots(
766 NonZeroU64::new(FULL_SNAPSHOT_INTERVAL).unwrap(),
767 ),
768 incremental_snapshot_archive_interval: SnapshotInterval::Slots(
769 NonZeroU64::new(INCREMENTAL_SNAPSHOT_INTERVAL).unwrap(),
770 ),
771 ..SnapshotConfig::default()
772 };
773
774 let pending_snapshot_packages = Arc::new(Mutex::new(PendingSnapshotPackages::default()));
775 let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
776 let snapshot_controller = Arc::new(SnapshotController::new(
777 snapshot_request_sender.clone(),
778 snapshot_config,
779 0,
780 ));
781 let snapshot_request_handler = SnapshotRequestHandler {
782 snapshot_controller,
783 snapshot_request_receiver,
784 pending_snapshot_packages,
785 };
786
787 let send_snapshot_request = |snapshot_root_bank, request_kind| {
788 let snapshot_request = SnapshotRequest {
789 snapshot_root_bank,
790 status_cache_slot_deltas: Vec::default(),
791 request_kind,
792 enqueued: Instant::now(),
793 };
794 snapshot_request_sender.send(snapshot_request).unwrap();
795 };
796
797 let mut genesis_config_info = create_genesis_config(10);
798 genesis_config_info.genesis_config.epoch_schedule =
799 EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
800 let mut bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
801
802 let bank0 = bank.clone();
806 fn latest_full_snapshot_slot(bank: &Bank) -> Option<Slot> {
807 bank.rc.accounts.accounts_db.latest_full_snapshot_slot()
808 }
809 fn set_latest_full_snapshot_slot(bank: &Bank, slot: Slot) {
810 bank.rc
811 .accounts
812 .accounts_db
813 .set_latest_full_snapshot_slot(slot);
814 }
815
816 let mut make_banks = |num_banks| {
833 for _ in 0..num_banks {
834 let slot = bank.slot() + 1;
835 bank = Arc::new(Bank::new_from_parent(
836 bank.clone(),
837 &Pubkey::new_unique(),
838 slot,
839 ));
840
841 if bank.block_height() % FULL_SNAPSHOT_INTERVAL == 0 {
844 send_snapshot_request(Arc::clone(&bank), SnapshotRequestKind::FullSnapshot);
845 } else if bank.block_height() % INCREMENTAL_SNAPSHOT_INTERVAL == 0 {
846 send_snapshot_request(
847 Arc::clone(&bank),
848 SnapshotRequestKind::IncrementalSnapshot,
849 );
850 }
851 }
852 };
853 make_banks(303);
854
855 assert_eq!(latest_full_snapshot_slot(&bank0), None);
858 let (snapshot_request, ..) = snapshot_request_handler
859 .get_next_snapshot_request()
860 .unwrap();
861 assert_eq!(
862 snapshot_request.request_kind,
863 SnapshotRequestKind::FullSnapshot
864 );
865 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 240);
866 set_latest_full_snapshot_slot(&bank0, 240);
867
868 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
871 let (snapshot_request, ..) = snapshot_request_handler
872 .get_next_snapshot_request()
873 .unwrap();
874 assert_eq!(
875 snapshot_request.request_kind,
876 SnapshotRequestKind::IncrementalSnapshot
877 );
878 assert_eq!(snapshot_request.snapshot_root_bank.slot(), 300);
879
880 assert_eq!(latest_full_snapshot_slot(&bank0), Some(240));
882 assert!(snapshot_request_handler
883 .get_next_snapshot_request()
884 .is_none());
885 }
886
887 #[test]
889 fn test_pruned_banks_request_handler_handle_request() {
890 let (pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
891 let pruned_banks_request_handler = PrunedBanksRequestHandler {
892 pruned_banks_receiver,
893 };
894 let genesis_config_info = create_genesis_config(10);
895 let bank = Bank::new_for_tests(&genesis_config_info.genesis_config);
896 bank.rc.accounts.accounts_db.enable_bank_drop_callback();
897 bank.set_callback(Some(Box::new(SendDroppedBankCallback::new(
898 pruned_banks_sender,
899 ))));
900
901 let fork0_bank0 = Arc::new(bank);
902 let fork0_bank1 = Arc::new(Bank::new_from_parent(
903 fork0_bank0.clone(),
904 &Pubkey::new_unique(),
905 fork0_bank0.slot() + 1,
906 ));
907 let fork1_bank1 = Arc::new(Bank::new_from_parent(
908 fork0_bank0.clone(),
909 &Pubkey::new_unique(),
910 fork0_bank0.slot() + 1,
911 ));
912 let fork2_bank1 = Arc::new(Bank::new_from_parent(
913 fork0_bank0.clone(),
914 &Pubkey::new_unique(),
915 fork0_bank0.slot() + 1,
916 ));
917 let fork0_bank2 = Arc::new(Bank::new_from_parent(
918 fork0_bank1.clone(),
919 &Pubkey::new_unique(),
920 fork0_bank1.slot() + 1,
921 ));
922 let fork1_bank2 = Arc::new(Bank::new_from_parent(
923 fork1_bank1.clone(),
924 &Pubkey::new_unique(),
925 fork1_bank1.slot() + 1,
926 ));
927 let fork0_bank3 = Arc::new(Bank::new_from_parent(
928 fork0_bank2.clone(),
929 &Pubkey::new_unique(),
930 fork0_bank2.slot() + 1,
931 ));
932 let fork3_bank3 = Arc::new(Bank::new_from_parent(
933 fork0_bank2.clone(),
934 &Pubkey::new_unique(),
935 fork0_bank2.slot() + 1,
936 ));
937 fork0_bank3.squash();
938
939 drop(fork3_bank3);
940 drop(fork1_bank2);
941 drop(fork0_bank2);
942 drop(fork1_bank1);
943 drop(fork2_bank1);
944 drop(fork0_bank1);
945 drop(fork0_bank0);
946 let num_banks_purged = pruned_banks_request_handler.handle_request(&fork0_bank3);
947 assert_eq!(num_banks_purged, 7);
948 }
949
950 #[test]
951 fn test_cmp_snapshot_request_kinds_by_priority() {
952 use cmp::Ordering::{Equal, Greater, Less};
953 for (snapshot_request_kind_a, snapshot_request_kind_b, expected_result) in [
954 (
955 SnapshotRequestKind::FullSnapshot,
956 SnapshotRequestKind::FullSnapshot,
957 Equal,
958 ),
959 (
960 SnapshotRequestKind::FullSnapshot,
961 SnapshotRequestKind::IncrementalSnapshot,
962 Greater,
963 ),
964 (
965 SnapshotRequestKind::IncrementalSnapshot,
966 SnapshotRequestKind::FullSnapshot,
967 Less,
968 ),
969 (
970 SnapshotRequestKind::IncrementalSnapshot,
971 SnapshotRequestKind::IncrementalSnapshot,
972 Equal,
973 ),
974 ] {
975 let actual_result = cmp_snapshot_request_kinds_by_priority(
976 &snapshot_request_kind_a,
977 &snapshot_request_kind_b,
978 );
979 assert_eq!(expected_result, actual_result);
980 }
981 }
982}