1mod stats;
6use {
7 crate::{
8 accounts_hash::CalcAccountsHashConfig,
9 bank::{Bank, BankSlotDelta, DropCallback},
10 bank_forks::BankForks,
11 snapshot_config::SnapshotConfig,
12 snapshot_package::{PendingAccountsPackage, SnapshotType},
13 snapshot_utils::{self, SnapshotError},
14 },
15 crossbeam_channel::{Receiver, SendError, Sender, TrySendError},
16 log::*,
17 rand::{thread_rng, Rng},
18 safecoin_measure::measure::Measure,
19 solana_sdk::{
20 clock::{BankId, Slot},
21 hash::Hash,
22 },
23 stats::StatsManager,
24 std::{
25 boxed::Box,
26 fmt::{Debug, Formatter},
27 sync::{
28 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
29 Arc, RwLock,
30 },
31 thread::{self, sleep, Builder, JoinHandle},
32 time::{Duration, Instant},
33 },
34};
35
36const INTERVAL_MS: u64 = 100;
37const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
38const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
39 SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
40const CLEAN_INTERVAL_BLOCKS: u64 = 100;
41
42const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
48
49pub type SnapshotRequestSender = Sender<SnapshotRequest>;
50pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
51pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
52pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
53
54const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
56const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
58
59#[allow(dead_code)]
61enum BankDropQueueEvent {
62 Full,
63 Disconnected,
64}
65
66#[derive(Debug, Default)]
68struct BankDropQueueStats {
69 report_time: AtomicU64,
70 queue_full: AtomicUsize,
71 queue_disconnected: AtomicUsize,
72}
73
74impl BankDropQueueStats {
75 fn increase(&self, event: BankDropQueueEvent) {
77 let counter = match event {
78 BankDropQueueEvent::Full => &self.queue_full,
79 BankDropQueueEvent::Disconnected => &self.queue_disconnected,
80 };
81
82 counter.fetch_add(1, Ordering::Relaxed);
83 }
84
85 fn report(&self, event: BankDropQueueEvent) {
87 let counter = match event {
88 BankDropQueueEvent::Full => &self.queue_full,
89 BankDropQueueEvent::Disconnected => &self.queue_disconnected,
90 };
91
92 let name = match event {
93 BankDropQueueEvent::Full => "full",
94 BankDropQueueEvent::Disconnected => "disconnected",
95 };
96
97 let ts = solana_sdk::timing::timestamp();
98 let last_report_time = self.report_time.load(Ordering::Acquire);
99 if ts.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL {
100 let val = counter.load(Ordering::Relaxed);
101
102 if counter
103 .compare_exchange_weak(val, 0, Ordering::AcqRel, Ordering::Acquire)
104 .is_ok()
105 {
106 if val > 0 {
107 datapoint_info!("bank_drop_queue_event", (name, val, i64));
108 }
109 self.report_time.store(ts, Ordering::Release);
110 }
111 }
112 }
113}
114
115lazy_static! {
116 static ref BANK_DROP_QUEUE_STATS: BankDropQueueStats = BankDropQueueStats::default();
117}
118
119#[derive(Clone)]
120pub struct SendDroppedBankCallback {
121 sender: DroppedSlotsSender,
122}
123
124impl DropCallback for SendDroppedBankCallback {
125 fn callback(&self, bank: &Bank) {
126 BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
127 match self.sender.try_send((bank.slot(), bank.bank_id())) {
128 Err(TrySendError::Full(_)) => {
129 BANK_DROP_QUEUE_STATS.increase(BankDropQueueEvent::Full);
130 BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
131
132 let _ = self.sender.send((bank.slot(), bank.bank_id()));
134 }
135
136 Err(TrySendError::Disconnected(_)) => {
137 info!("bank DropCallback signal queue disconnected.");
138 }
139 Ok(_) => {}
141 }
142 }
143
144 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
145 Box::new(self.clone())
146 }
147}
148
149impl Debug for SendDroppedBankCallback {
150 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
151 write!(f, "SendDroppedBankCallback({:p})", self)
152 }
153}
154
155impl SendDroppedBankCallback {
156 pub fn new(sender: DroppedSlotsSender) -> Self {
157 Self { sender }
158 }
159}
160
161pub struct SnapshotRequest {
162 pub snapshot_root_bank: Arc<Bank>,
163 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
164}
165
166pub struct SnapshotRequestHandler {
167 pub snapshot_config: SnapshotConfig,
168 pub snapshot_request_receiver: SnapshotRequestReceiver,
169 pub pending_accounts_package: PendingAccountsPackage,
170}
171
172impl SnapshotRequestHandler {
173 pub fn handle_snapshot_requests(
175 &self,
176 accounts_db_caching_enabled: bool,
177 test_hash_calculation: bool,
178 non_snapshot_time_us: u128,
179 last_full_snapshot_slot: &mut Option<Slot>,
180 ) -> Option<Result<u64, SnapshotError>> {
181 self.snapshot_request_receiver
182 .try_iter()
183 .last()
184 .map(|snapshot_request| {
185 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
186 let SnapshotRequest {
187 snapshot_root_bank,
188 status_cache_slot_deltas,
189 } = snapshot_request;
190
191 assert!(snapshot_root_bank.is_startup_verification_complete());
193
194 let previous_hash = if test_hash_calculation {
195 snapshot_root_bank.update_accounts_hash_with_index_option(true, false, false)
198 } else {
199 Hash::default()
200 };
201
202 let mut shrink_time = Measure::start("shrink_time");
203 if !accounts_db_caching_enabled {
204 snapshot_root_bank
205 .process_stale_slot_with_budget(0, SHRUNKEN_ACCOUNT_PER_INTERVAL);
206 }
207 shrink_time.stop();
208
209 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
210 if accounts_db_caching_enabled {
211 snapshot_root_bank.force_flush_accounts_cache();
216 assert!(
220 snapshot_root_bank.slot()
221 <= snapshot_root_bank
222 .rc
223 .accounts
224 .accounts_db
225 .accounts_cache
226 .fetch_max_flush_root()
227 );
228 }
229 flush_accounts_cache_time.stop();
230
231 let hash_for_testing = if test_hash_calculation {
232 let use_index_hash_calculation = false;
233 let check_hash = false;
234
235 let (this_hash, capitalization) = snapshot_root_bank.accounts().accounts_db.calculate_accounts_hash_helper(
236 use_index_hash_calculation,
237 snapshot_root_bank.slot(),
238 &CalcAccountsHashConfig {
239 use_bg_thread_pool: true,
240 check_hash,
241 ancestors: None,
242 use_write_cache: false,
243 epoch_schedule: snapshot_root_bank.epoch_schedule(),
244 rent_collector: snapshot_root_bank.rent_collector(),
245 store_detailed_debug_info_on_failure: false,
246 full_snapshot: None,
247 },
248 ).unwrap();
249 assert_eq!(previous_hash, this_hash);
250 assert_eq!(capitalization, snapshot_root_bank.capitalization());
251 Some(this_hash)
252 } else {
253 None
254 };
255
256 let mut clean_time = Measure::start("clean_time");
257 snapshot_root_bank.clean_accounts(true, false, *last_full_snapshot_slot);
262 clean_time.stop();
263
264 if accounts_db_caching_enabled {
265 shrink_time = Measure::start("shrink_time");
266 snapshot_root_bank.shrink_candidate_slots();
267 shrink_time.stop();
268 }
269
270 let block_height = snapshot_root_bank.block_height();
271 let snapshot_type = if snapshot_utils::should_take_full_snapshot(
272 block_height,
273 self.snapshot_config.full_snapshot_archive_interval_slots,
274 ) {
275 *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
276 Some(SnapshotType::FullSnapshot)
277 } else if snapshot_utils::should_take_incremental_snapshot(
278 block_height,
279 self.snapshot_config
280 .incremental_snapshot_archive_interval_slots,
281 *last_full_snapshot_slot,
282 ) {
283 Some(SnapshotType::IncrementalSnapshot(
284 last_full_snapshot_slot.unwrap(),
285 ))
286 } else {
287 None
288 };
289
290 let mut snapshot_time = Measure::start("snapshot_time");
292 let result = snapshot_utils::snapshot_bank(
293 &snapshot_root_bank,
294 status_cache_slot_deltas,
295 &self.pending_accounts_package,
296 &self.snapshot_config.bank_snapshots_dir,
297 &self.snapshot_config.full_snapshot_archives_dir,
298 &self.snapshot_config.incremental_snapshot_archives_dir,
299 self.snapshot_config.snapshot_version,
300 self.snapshot_config.archive_format,
301 hash_for_testing,
302 snapshot_type,
303 );
304 if let Err(e) = result {
305 warn!(
306 "Error taking bank snapshot. slot: {}, snapshot type: {:?}, err: {:?}",
307 snapshot_root_bank.slot(),
308 snapshot_type,
309 e,
310 );
311
312 if Self::is_snapshot_error_fatal(&e) {
313 return Err(e);
314 }
315 }
316 snapshot_time.stop();
317 info!("Took bank snapshot. snapshot type: {:?}, slot: {}, accounts hash: {}, bank hash: {}",
318 snapshot_type,
319 snapshot_root_bank.slot(),
320 snapshot_root_bank.get_accounts_hash(),
321 snapshot_root_bank.hash(),
322 );
323
324 let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
326 snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
327 purge_old_snapshots_time.stop();
328 total_time.stop();
329
330 datapoint_info!(
331 "handle_snapshot_requests-timing",
332 (
333 "flush_accounts_cache_time",
334 flush_accounts_cache_time.as_us(),
335 i64
336 ),
337 ("shrink_time", shrink_time.as_us(), i64),
338 ("clean_time", clean_time.as_us(), i64),
339 ("snapshot_time", snapshot_time.as_us(), i64),
340 (
341 "purge_old_snapshots_time",
342 purge_old_snapshots_time.as_us(),
343 i64
344 ),
345 ("total_us", total_time.as_us(), i64),
346 ("non_snapshot_time_us", non_snapshot_time_us, i64),
347 );
348 Ok(snapshot_root_bank.block_height())
349 })
350 }
351
352 fn is_snapshot_error_fatal(err: &SnapshotError) -> bool {
360 match err {
361 SnapshotError::Io(..) => true,
362 SnapshotError::Serialize(..) => true,
363 SnapshotError::ArchiveGenerationFailure(..) => true,
364 SnapshotError::StoragePathSymlinkInvalid => true,
365 SnapshotError::UnpackError(..) => true,
366 SnapshotError::IoWithSource(..) => true,
367 SnapshotError::PathToFileNameError(..) => true,
368 SnapshotError::FileNameToStrError(..) => true,
369 SnapshotError::ParseSnapshotArchiveFileNameError(..) => true,
370 SnapshotError::MismatchedBaseSlot(..) => true,
371 SnapshotError::NoSnapshotArchives => true,
372 SnapshotError::MismatchedSlotHash(..) => true,
373 SnapshotError::VerifySlotDeltas(..) => true,
374 }
375 }
376}
377
378#[derive(Default, Clone)]
379pub struct AbsRequestSender {
380 snapshot_request_sender: Option<SnapshotRequestSender>,
381}
382
383impl AbsRequestSender {
384 pub fn new(snapshot_request_sender: SnapshotRequestSender) -> Self {
385 Self {
386 snapshot_request_sender: Some(snapshot_request_sender),
387 }
388 }
389
390 pub fn is_snapshot_creation_enabled(&self) -> bool {
391 self.snapshot_request_sender.is_some()
392 }
393
394 pub fn send_snapshot_request(
395 &self,
396 snapshot_request: SnapshotRequest,
397 ) -> Result<(), SendError<SnapshotRequest>> {
398 if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
399 snapshot_request_sender.send(snapshot_request)
400 } else {
401 Ok(())
402 }
403 }
404}
405
406pub struct AbsRequestHandler {
407 pub snapshot_request_handler: Option<SnapshotRequestHandler>,
408 pub pruned_banks_receiver: DroppedSlotsReceiver,
409}
410
411impl AbsRequestHandler {
412 pub fn handle_snapshot_requests(
414 &self,
415 accounts_db_caching_enabled: bool,
416 test_hash_calculation: bool,
417 non_snapshot_time_us: u128,
418 last_full_snapshot_slot: &mut Option<Slot>,
419 ) -> Option<Result<u64, SnapshotError>> {
420 self.snapshot_request_handler
421 .as_ref()
422 .and_then(|snapshot_request_handler| {
423 snapshot_request_handler.handle_snapshot_requests(
424 accounts_db_caching_enabled,
425 test_hash_calculation,
426 non_snapshot_time_us,
427 last_full_snapshot_slot,
428 )
429 })
430 }
431
432 pub fn handle_pruned_banks(&self, bank: &Bank, is_serialized_with_abs: bool) -> usize {
433 let mut count = 0;
434 for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
435 count += 1;
436 bank.rc.accounts.accounts_db.purge_slot(
437 pruned_slot,
438 pruned_bank_id,
439 is_serialized_with_abs,
440 );
441 }
442
443 count
444 }
445}
446
447pub struct AccountsBackgroundService {
448 t_background: JoinHandle<()>,
449}
450
451impl AccountsBackgroundService {
452 pub fn new(
453 bank_forks: Arc<RwLock<BankForks>>,
454 exit: &Arc<AtomicBool>,
455 request_handler: AbsRequestHandler,
456 accounts_db_caching_enabled: bool,
457 test_hash_calculation: bool,
458 mut last_full_snapshot_slot: Option<Slot>,
459 ) -> Self {
460 info!("AccountsBackgroundService active");
461 let exit = exit.clone();
462 let mut consumed_budget = 0;
463 let mut last_cleaned_block_height = 0;
464 let mut removed_slots_count = 0;
465 let mut total_remove_slots_time = 0;
466 let mut last_expiration_check_time = Instant::now();
467 let t_background = Builder::new()
468 .name("solBgAccounts".to_string())
469 .spawn(move || {
470 let mut stats = StatsManager::new();
471 let mut last_snapshot_end_time = None;
472 loop {
473 if exit.load(Ordering::Relaxed) {
474 break;
475 }
476 let start_time = Instant::now();
477
478 let bank = bank_forks.read().unwrap().root_bank().clone();
480
481 Self::remove_dead_slots(
483 &bank,
484 &request_handler,
485 &mut removed_slots_count,
486 &mut total_remove_slots_time,
487 );
488
489 Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
490
491 let non_snapshot_time = last_snapshot_end_time
492 .map(|last_snapshot_end_time: Instant| {
493 last_snapshot_end_time.elapsed().as_micros()
494 })
495 .unwrap_or_default();
496
497 let snapshot_block_height_option_result = request_handler
515 .handle_snapshot_requests(
516 accounts_db_caching_enabled,
517 test_hash_calculation,
518 non_snapshot_time,
519 &mut last_full_snapshot_slot,
520 );
521 if snapshot_block_height_option_result.is_some() {
522 last_snapshot_end_time = Some(Instant::now());
523 }
524
525 if accounts_db_caching_enabled {
526 bank.flush_accounts_cache_if_needed();
531 }
532
533 if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
534 {
535 if let Ok(snapshot_block_height) = snapshot_block_height_result {
537 assert!(last_cleaned_block_height <= snapshot_block_height);
538 last_cleaned_block_height = snapshot_block_height;
539 } else {
540 exit.store(true, Ordering::Relaxed);
541 return;
542 }
543 } else {
544 if accounts_db_caching_enabled {
545 bank.shrink_candidate_slots();
546 } else {
547 consumed_budget = bank
551 .process_stale_slot_with_budget(
552 consumed_budget,
553 SHRUNKEN_ACCOUNT_PER_INTERVAL,
554 )
555 .min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
556 }
557 if bank.block_height() - last_cleaned_block_height
558 > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
559 {
560 if accounts_db_caching_enabled {
561 bank.force_flush_accounts_cache();
566 }
567 bank.clean_accounts(true, false, last_full_snapshot_slot);
568 last_cleaned_block_height = bank.block_height();
569 }
570 }
571 stats.record_and_maybe_submit(start_time.elapsed());
572 sleep(Duration::from_millis(INTERVAL_MS));
573 }
574 })
575 .unwrap();
576 Self { t_background }
577 }
578
579 pub fn setup_bank_drop_callback(bank_forks: Arc<RwLock<BankForks>>) -> DroppedSlotsReceiver {
584 assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
585
586 let (pruned_banks_sender, pruned_banks_receiver) =
587 crossbeam_channel::bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
588 {
589 let root_bank = bank_forks.read().unwrap().root_bank();
590 root_bank.set_callback(Some(Box::new(
591 root_bank
592 .rc
593 .accounts
594 .accounts_db
595 .create_drop_bank_callback(pruned_banks_sender),
596 )));
597 }
598 pruned_banks_receiver
599 }
600
601 pub fn join(self) -> thread::Result<()> {
602 self.t_background.join()
603 }
604
605 fn remove_dead_slots(
606 bank: &Bank,
607 request_handler: &AbsRequestHandler,
608 removed_slots_count: &mut usize,
609 total_remove_slots_time: &mut u64,
610 ) {
611 let mut remove_slots_time = Measure::start("remove_slots_time");
612 *removed_slots_count += request_handler.handle_pruned_banks(bank, true);
613 remove_slots_time.stop();
614 *total_remove_slots_time += remove_slots_time.as_us();
615
616 if *removed_slots_count >= 100 {
617 datapoint_info!(
618 "remove_slots_timing",
619 ("remove_slots_time", *total_remove_slots_time, i64),
620 ("removed_slots_count", *removed_slots_count, i64),
621 );
622 *total_remove_slots_time = 0;
623 *removed_slots_count = 0;
624 }
625 }
626
627 fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
628 let now = Instant::now();
629 if now.duration_since(*last_expiration_check_time).as_secs()
630 > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
631 {
632 bank.expire_old_recycle_stores();
633 *last_expiration_check_time = now;
634 }
635 }
636}
637
638#[cfg(test)]
639mod test {
640 use {
641 super::*,
642 crate::genesis_utils::create_genesis_config,
643 crossbeam_channel::unbounded,
644 solana_sdk::{account::AccountSharedData, pubkey::Pubkey},
645 };
646
647 #[test]
648 fn test_accounts_background_service_remove_dead_slots() {
649 let genesis = create_genesis_config(10);
650 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
651 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
652 let request_handler = AbsRequestHandler {
653 snapshot_request_handler: None,
654 pruned_banks_receiver,
655 };
656
657 let account_key = Pubkey::new_unique();
659 bank0.store_account(
660 &account_key,
661 &AccountSharedData::new(264, 0, &Pubkey::default()),
662 );
663 assert!(bank0.get_account(&account_key).is_some());
664 pruned_banks_sender.send((0, 0)).unwrap();
665
666 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
667
668 AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
669
670 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
671 }
672}