1use crate::{
6 bank::{Bank, BankSlotDelta, DropCallback},
7 bank_forks::BankForks,
8 snapshot_config::SnapshotConfig,
9 snapshot_package::{AccountsPackageSender, SnapshotType},
10 snapshot_utils::{self, SnapshotError},
11};
12use crossbeam_channel::{Receiver, SendError, Sender};
13use log::*;
14use rand::{thread_rng, Rng};
15use gemachain_measure::measure::Measure;
16use gemachain_sdk::{
17 clock::{BankId, Slot},
18 hash::Hash,
19};
20use std::{
21 boxed::Box,
22 fmt::{Debug, Formatter},
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc, RwLock,
26 },
27 thread::{self, sleep, Builder, JoinHandle},
28 time::{Duration, Instant},
29};
30
31const INTERVAL_MS: u64 = 100;
32const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
33const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
34 SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
35const CLEAN_INTERVAL_BLOCKS: u64 = 100;
36
37const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49#[derive(Clone)]
50pub struct SendDroppedBankCallback {
51 sender: DroppedSlotsSender,
52}
53
54impl DropCallback for SendDroppedBankCallback {
55 fn callback(&self, bank: &Bank) {
56 if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) {
57 warn!("Error sending dropped banks: {:?}", e);
58 }
59 }
60
61 fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
62 Box::new(self.clone())
63 }
64}
65
66impl Debug for SendDroppedBankCallback {
67 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
68 write!(f, "SendDroppedBankCallback({:p})", self)
69 }
70}
71
72impl SendDroppedBankCallback {
73 pub fn new(sender: DroppedSlotsSender) -> Self {
74 Self { sender }
75 }
76}
77
78pub struct SnapshotRequest {
79 pub snapshot_root_bank: Arc<Bank>,
80 pub status_cache_slot_deltas: Vec<BankSlotDelta>,
81}
82
83pub struct SnapshotRequestHandler {
84 pub snapshot_config: SnapshotConfig,
85 pub snapshot_request_receiver: SnapshotRequestReceiver,
86 pub accounts_package_sender: AccountsPackageSender,
87}
88
89impl SnapshotRequestHandler {
90 pub fn handle_snapshot_requests(
92 &self,
93 accounts_db_caching_enabled: bool,
94 test_hash_calculation: bool,
95 use_index_hash_calculation: bool,
96 non_snapshot_time_us: u128,
97 last_full_snapshot_slot: &mut Option<Slot>,
98 ) -> Option<Result<u64, SnapshotError>> {
99 self.snapshot_request_receiver
100 .try_iter()
101 .last()
102 .map(|snapshot_request| {
103 let mut total_time = Measure::start("snapshot_request_receiver_total_time");
104 let SnapshotRequest {
105 snapshot_root_bank,
106 status_cache_slot_deltas,
107 } = snapshot_request;
108
109 let previous_hash = if test_hash_calculation {
110 snapshot_root_bank.update_accounts_hash_with_index_option(true, false, None)
113 } else {
114 Hash::default()
115 };
116
117 let mut shrink_time = Measure::start("shrink_time");
118 if !accounts_db_caching_enabled {
119 snapshot_root_bank
120 .process_stale_slot_with_budget(0, SHRUNKEN_ACCOUNT_PER_INTERVAL);
121 }
122 shrink_time.stop();
123
124 let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
125 if accounts_db_caching_enabled {
126 snapshot_root_bank.force_flush_accounts_cache();
131 assert!(
135 snapshot_root_bank.slot()
136 <= snapshot_root_bank
137 .rc
138 .accounts
139 .accounts_db
140 .accounts_cache
141 .fetch_max_flush_root()
142 );
143 }
144 flush_accounts_cache_time.stop();
145
146 let mut hash_time = Measure::start("hash_time");
147 let this_hash = snapshot_root_bank.update_accounts_hash_with_index_option(
148 use_index_hash_calculation,
149 test_hash_calculation,
150 Some(snapshot_root_bank.epoch_schedule().slots_per_epoch),
151 );
152 let hash_for_testing = if test_hash_calculation {
153 assert_eq!(previous_hash, this_hash);
154 Some(snapshot_root_bank.get_accounts_hash())
155 } else {
156 None
157 };
158 hash_time.stop();
159
160 let mut clean_time = Measure::start("clean_time");
161 snapshot_root_bank.clean_accounts(true, false, *last_full_snapshot_slot);
166 clean_time.stop();
167
168 if accounts_db_caching_enabled {
169 shrink_time = Measure::start("shrink_time");
170 snapshot_root_bank.shrink_candidate_slots();
171 shrink_time.stop();
172 }
173
174 let block_height = snapshot_root_bank.block_height();
175 let snapshot_type = if snapshot_utils::should_take_full_snapshot(
176 block_height,
177 self.snapshot_config.full_snapshot_archive_interval_slots,
178 ) {
179 *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
180 Some(SnapshotType::FullSnapshot)
181 } else if snapshot_utils::should_take_incremental_snapshot(
182 block_height,
183 self.snapshot_config
184 .incremental_snapshot_archive_interval_slots,
185 *last_full_snapshot_slot,
186 ) {
187 Some(SnapshotType::IncrementalSnapshot(
188 last_full_snapshot_slot.unwrap(),
189 ))
190 } else {
191 None
192 };
193
194 let mut snapshot_time = Measure::start("snapshot_time");
196 let result = snapshot_utils::snapshot_bank(
197 &snapshot_root_bank,
198 status_cache_slot_deltas,
199 &self.accounts_package_sender,
200 &self.snapshot_config.bank_snapshots_dir,
201 &self.snapshot_config.snapshot_archives_dir,
202 self.snapshot_config.snapshot_version,
203 self.snapshot_config.archive_format,
204 hash_for_testing,
205 snapshot_type,
206 );
207 if let Err(e) = result {
208 warn!(
209 "Error taking bank snapshot. slot: {}, snapshot type: {:?}, err: {:?}",
210 snapshot_root_bank.slot(),
211 snapshot_type,
212 e,
213 );
214
215 if Self::is_snapshot_error_fatal(&e) {
216 return Err(e);
217 }
218 }
219 snapshot_time.stop();
220 info!("Took bank snapshot. snapshot type: {:?}, slot: {}, accounts hash: {}, bank hash: {}",
221 snapshot_type,
222 snapshot_root_bank.slot(),
223 snapshot_root_bank.get_accounts_hash(),
224 snapshot_root_bank.hash(),
225 );
226
227 let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
229 snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
230 purge_old_snapshots_time.stop();
231 total_time.stop();
232
233 datapoint_info!(
234 "handle_snapshot_requests-timing",
235 ("hash_time", hash_time.as_us(), i64),
236 (
237 "flush_accounts_cache_time",
238 flush_accounts_cache_time.as_us(),
239 i64
240 ),
241 ("shrink_time", shrink_time.as_us(), i64),
242 ("clean_time", clean_time.as_us(), i64),
243 ("snapshot_time", snapshot_time.as_us(), i64),
244 (
245 "purge_old_snapshots_time",
246 purge_old_snapshots_time.as_us(),
247 i64
248 ),
249 ("total_us", total_time.as_us(), i64),
250 ("non_snapshot_time_us", non_snapshot_time_us, i64),
251 );
252 Ok(snapshot_root_bank.block_height())
253 })
254 }
255
256 fn is_snapshot_error_fatal(err: &SnapshotError) -> bool {
264 match err {
265 SnapshotError::Io(..) => true,
266 SnapshotError::Serialize(..) => true,
267 SnapshotError::ArchiveGenerationFailure(..) => true,
268 SnapshotError::StoragePathSymlinkInvalid => true,
269 SnapshotError::UnpackError(..) => true,
270 SnapshotError::AccountsPackageSendError(..) => true,
271 SnapshotError::IoWithSource(..) => true,
272 SnapshotError::PathToFileNameError(..) => true,
273 SnapshotError::FileNameToStrError(..) => true,
274 SnapshotError::ParseSnapshotArchiveFileNameError(..) => true,
275 SnapshotError::MismatchedBaseSlot(..) => true,
276 SnapshotError::NoSnapshotArchives => true,
277 SnapshotError::MismatchedSlotHash(..) => true,
278 }
279 }
280}
281
282#[derive(Default)]
283pub struct AbsRequestSender {
284 snapshot_request_sender: Option<SnapshotRequestSender>,
285}
286
287impl AbsRequestSender {
288 pub fn new(snapshot_request_sender: Option<SnapshotRequestSender>) -> Self {
289 AbsRequestSender {
290 snapshot_request_sender,
291 }
292 }
293
294 pub fn is_snapshot_creation_enabled(&self) -> bool {
295 self.snapshot_request_sender.is_some()
296 }
297
298 pub fn send_snapshot_request(
299 &self,
300 snapshot_request: SnapshotRequest,
301 ) -> Result<(), SendError<SnapshotRequest>> {
302 if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
303 snapshot_request_sender.send(snapshot_request)
304 } else {
305 Ok(())
306 }
307 }
308}
309
310pub struct AbsRequestHandler {
311 pub snapshot_request_handler: Option<SnapshotRequestHandler>,
312 pub pruned_banks_receiver: DroppedSlotsReceiver,
313}
314
315impl AbsRequestHandler {
316 pub fn handle_snapshot_requests(
318 &self,
319 accounts_db_caching_enabled: bool,
320 test_hash_calculation: bool,
321 use_index_hash_calculation: bool,
322 non_snapshot_time_us: u128,
323 last_full_snapshot_slot: &mut Option<Slot>,
324 ) -> Option<Result<u64, SnapshotError>> {
325 self.snapshot_request_handler
326 .as_ref()
327 .and_then(|snapshot_request_handler| {
328 snapshot_request_handler.handle_snapshot_requests(
329 accounts_db_caching_enabled,
330 test_hash_calculation,
331 use_index_hash_calculation,
332 non_snapshot_time_us,
333 last_full_snapshot_slot,
334 )
335 })
336 }
337
338 pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize {
340 let mut count = 0;
341 for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
342 count += 1;
343 bank.rc
344 .accounts
345 .purge_slot(pruned_slot, pruned_bank_id, is_from_abs);
346 }
347
348 count
349 }
350}
351
352pub struct AccountsBackgroundService {
353 t_background: JoinHandle<()>,
354}
355
356impl AccountsBackgroundService {
357 pub fn new(
358 bank_forks: Arc<RwLock<BankForks>>,
359 exit: &Arc<AtomicBool>,
360 request_handler: AbsRequestHandler,
361 accounts_db_caching_enabled: bool,
362 test_hash_calculation: bool,
363 use_index_hash_calculation: bool,
364 mut last_full_snapshot_slot: Option<Slot>,
365 ) -> Self {
366 info!("AccountsBackgroundService active");
367 let exit = exit.clone();
368 let mut consumed_budget = 0;
369 let mut last_cleaned_block_height = 0;
370 let mut removed_slots_count = 0;
371 let mut total_remove_slots_time = 0;
372 let mut last_expiration_check_time = Instant::now();
373 let t_background = Builder::new()
374 .name("gemachain-bg-accounts".to_string())
375 .spawn(move || {
376 let mut last_snapshot_end_time = None;
377 loop {
378 if exit.load(Ordering::Relaxed) {
379 break;
380 }
381
382 let bank = bank_forks.read().unwrap().root_bank().clone();
384
385 Self::remove_dead_slots(
387 &bank,
388 &request_handler,
389 &mut removed_slots_count,
390 &mut total_remove_slots_time,
391 );
392
393 Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
394
395 let non_snapshot_time = last_snapshot_end_time
396 .map(|last_snapshot_end_time: Instant| {
397 last_snapshot_end_time.elapsed().as_micros()
398 })
399 .unwrap_or_default();
400
401 let snapshot_block_height_option_result = request_handler
419 .handle_snapshot_requests(
420 accounts_db_caching_enabled,
421 test_hash_calculation,
422 use_index_hash_calculation,
423 non_snapshot_time,
424 &mut last_full_snapshot_slot,
425 );
426 if snapshot_block_height_option_result.is_some() {
427 last_snapshot_end_time = Some(Instant::now());
428 }
429
430 if accounts_db_caching_enabled {
431 bank.flush_accounts_cache_if_needed();
436 }
437
438 if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
439 {
440 if let Ok(snapshot_block_height) = snapshot_block_height_result {
442 assert!(last_cleaned_block_height <= snapshot_block_height);
443 last_cleaned_block_height = snapshot_block_height;
444 } else {
445 exit.store(true, Ordering::Relaxed);
446 return;
447 }
448 } else {
449 if accounts_db_caching_enabled {
450 bank.shrink_candidate_slots();
451 } else {
452 consumed_budget = bank
456 .process_stale_slot_with_budget(
457 consumed_budget,
458 SHRUNKEN_ACCOUNT_PER_INTERVAL,
459 )
460 .min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
461 }
462 if bank.block_height() - last_cleaned_block_height
463 > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
464 {
465 if accounts_db_caching_enabled {
466 bank.force_flush_accounts_cache();
471 }
472 bank.clean_accounts(true, false, last_full_snapshot_slot);
473 last_cleaned_block_height = bank.block_height();
474 }
475 }
476 sleep(Duration::from_millis(INTERVAL_MS));
477 }
478 })
479 .unwrap();
480 Self { t_background }
481 }
482
483 pub fn join(self) -> thread::Result<()> {
484 self.t_background.join()
485 }
486
487 fn remove_dead_slots(
488 bank: &Bank,
489 request_handler: &AbsRequestHandler,
490 removed_slots_count: &mut usize,
491 total_remove_slots_time: &mut u64,
492 ) {
493 let mut remove_slots_time = Measure::start("remove_slots_time");
494 *removed_slots_count += request_handler.handle_pruned_banks(bank, true);
495 remove_slots_time.stop();
496 *total_remove_slots_time += remove_slots_time.as_us();
497
498 if *removed_slots_count >= 100 {
499 datapoint_info!(
500 "remove_slots_timing",
501 ("remove_slots_time", *total_remove_slots_time, i64),
502 ("removed_slots_count", *removed_slots_count, i64),
503 );
504 *total_remove_slots_time = 0;
505 *removed_slots_count = 0;
506 }
507 }
508
509 fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
510 let now = Instant::now();
511 if now.duration_since(*last_expiration_check_time).as_secs()
512 > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
513 {
514 bank.expire_old_recycle_stores();
515 *last_expiration_check_time = now;
516 }
517 }
518}
519
520#[cfg(test)]
521mod test {
522 use super::*;
523 use crate::genesis_utils::create_genesis_config;
524 use crossbeam_channel::unbounded;
525 use gemachain_sdk::{account::AccountSharedData, pubkey::Pubkey};
526
527 #[test]
528 fn test_accounts_background_service_remove_dead_slots() {
529 let genesis = create_genesis_config(10);
530 let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
531 let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
532 let request_handler = AbsRequestHandler {
533 snapshot_request_handler: None,
534 pruned_banks_receiver,
535 };
536
537 let account_key = Pubkey::new_unique();
539 bank0.store_account(
540 &account_key,
541 &AccountSharedData::new(264, 0, &Pubkey::default()),
542 );
543 assert!(bank0.get_account(&account_key).is_some());
544 pruned_banks_sender.send((0, 0)).unwrap();
545
546 assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
547
548 AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
549
550 assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
551 }
552}