gemachain_runtime/
accounts_background_service.rs

1// Service to clean up dead slots in accounts_db
2//
3// This can be expensive since we have to walk the append vecs being cleaned up.
4
5use crate::{
6    bank::{Bank, BankSlotDelta, DropCallback},
7    bank_forks::BankForks,
8    snapshot_config::SnapshotConfig,
9    snapshot_package::{AccountsPackageSender, SnapshotType},
10    snapshot_utils::{self, SnapshotError},
11};
12use crossbeam_channel::{Receiver, SendError, Sender};
13use log::*;
14use rand::{thread_rng, Rng};
15use gemachain_measure::measure::Measure;
16use gemachain_sdk::{
17    clock::{BankId, Slot},
18    hash::Hash,
19};
20use std::{
21    boxed::Box,
22    fmt::{Debug, Formatter},
23    sync::{
24        atomic::{AtomicBool, Ordering},
25        Arc, RwLock,
26    },
27    thread::{self, sleep, Builder, JoinHandle},
28    time::{Duration, Instant},
29};
30
31const INTERVAL_MS: u64 = 100;
32const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
33const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
34    SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
35const CLEAN_INTERVAL_BLOCKS: u64 = 100;
36
37// This value is chosen to spread the dropping cost over 3 expiration checks
38// RecycleStores are fully populated almost all of its lifetime. So, otherwise
39// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
40// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
41// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
42const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
43
44pub type SnapshotRequestSender = Sender<SnapshotRequest>;
45pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
46pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
47pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
48
49#[derive(Clone)]
50pub struct SendDroppedBankCallback {
51    sender: DroppedSlotsSender,
52}
53
54impl DropCallback for SendDroppedBankCallback {
55    fn callback(&self, bank: &Bank) {
56        if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) {
57            warn!("Error sending dropped banks: {:?}", e);
58        }
59    }
60
61    fn clone_box(&self) -> Box<dyn DropCallback + Send + Sync> {
62        Box::new(self.clone())
63    }
64}
65
66impl Debug for SendDroppedBankCallback {
67    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
68        write!(f, "SendDroppedBankCallback({:p})", self)
69    }
70}
71
72impl SendDroppedBankCallback {
73    pub fn new(sender: DroppedSlotsSender) -> Self {
74        Self { sender }
75    }
76}
77
78pub struct SnapshotRequest {
79    pub snapshot_root_bank: Arc<Bank>,
80    pub status_cache_slot_deltas: Vec<BankSlotDelta>,
81}
82
83pub struct SnapshotRequestHandler {
84    pub snapshot_config: SnapshotConfig,
85    pub snapshot_request_receiver: SnapshotRequestReceiver,
86    pub accounts_package_sender: AccountsPackageSender,
87}
88
89impl SnapshotRequestHandler {
90    // Returns the latest requested snapshot slot, if one exists
91    pub fn handle_snapshot_requests(
92        &self,
93        accounts_db_caching_enabled: bool,
94        test_hash_calculation: bool,
95        use_index_hash_calculation: bool,
96        non_snapshot_time_us: u128,
97        last_full_snapshot_slot: &mut Option<Slot>,
98    ) -> Option<Result<u64, SnapshotError>> {
99        self.snapshot_request_receiver
100            .try_iter()
101            .last()
102            .map(|snapshot_request| {
103                let mut total_time = Measure::start("snapshot_request_receiver_total_time");
104                let SnapshotRequest {
105                    snapshot_root_bank,
106                    status_cache_slot_deltas,
107                } = snapshot_request;
108
109                let previous_hash = if test_hash_calculation {
110                    // We have to use the index version here.
111                    // We cannot calculate the non-index way because cache has not been flushed and stores don't match reality.
112                    snapshot_root_bank.update_accounts_hash_with_index_option(true, false, None)
113                } else {
114                    Hash::default()
115                };
116
117                let mut shrink_time = Measure::start("shrink_time");
118                if !accounts_db_caching_enabled {
119                    snapshot_root_bank
120                        .process_stale_slot_with_budget(0, SHRUNKEN_ACCOUNT_PER_INTERVAL);
121                }
122                shrink_time.stop();
123
124                let mut flush_accounts_cache_time = Measure::start("flush_accounts_cache_time");
125                if accounts_db_caching_enabled {
126                    // Forced cache flushing MUST flush all roots <= snapshot_root_bank.slot().
127                    // That's because `snapshot_root_bank.slot()` must be root at this point,
128                    // and contains relevant updates because each bank has at least 1 account update due
129                    // to sysvar maintenance. Otherwise, this would cause missing storages in the snapshot
130                    snapshot_root_bank.force_flush_accounts_cache();
131                    // Ensure all roots <= `self.slot()` have been flushed.
132                    // Note `max_flush_root` could be larger than self.slot() if there are
133                    // `> MAX_CACHE_SLOT` cached and rooted slots which triggered earlier flushes.
134                    assert!(
135                        snapshot_root_bank.slot()
136                            <= snapshot_root_bank
137                                .rc
138                                .accounts
139                                .accounts_db
140                                .accounts_cache
141                                .fetch_max_flush_root()
142                    );
143                }
144                flush_accounts_cache_time.stop();
145
146                let mut hash_time = Measure::start("hash_time");
147                let this_hash = snapshot_root_bank.update_accounts_hash_with_index_option(
148                    use_index_hash_calculation,
149                    test_hash_calculation,
150                    Some(snapshot_root_bank.epoch_schedule().slots_per_epoch),
151                );
152                let hash_for_testing = if test_hash_calculation {
153                    assert_eq!(previous_hash, this_hash);
154                    Some(snapshot_root_bank.get_accounts_hash())
155                } else {
156                    None
157                };
158                hash_time.stop();
159
160                let mut clean_time = Measure::start("clean_time");
161                // Don't clean the slot we're snapshotting because it may have zero-carat
162                // accounts that were included in the bank delta hash when the bank was frozen,
163                // and if we clean them here, the newly created snapshot's hash may not match
164                // the frozen hash.
165                snapshot_root_bank.clean_accounts(true, false, *last_full_snapshot_slot);
166                clean_time.stop();
167
168                if accounts_db_caching_enabled {
169                    shrink_time = Measure::start("shrink_time");
170                    snapshot_root_bank.shrink_candidate_slots();
171                    shrink_time.stop();
172                }
173
174                let block_height = snapshot_root_bank.block_height();
175                let snapshot_type = if snapshot_utils::should_take_full_snapshot(
176                    block_height,
177                    self.snapshot_config.full_snapshot_archive_interval_slots,
178                ) {
179                    *last_full_snapshot_slot = Some(snapshot_root_bank.slot());
180                    Some(SnapshotType::FullSnapshot)
181                } else if snapshot_utils::should_take_incremental_snapshot(
182                    block_height,
183                    self.snapshot_config
184                        .incremental_snapshot_archive_interval_slots,
185                    *last_full_snapshot_slot,
186                ) {
187                    Some(SnapshotType::IncrementalSnapshot(
188                        last_full_snapshot_slot.unwrap(),
189                    ))
190                } else {
191                    None
192                };
193
194                // Snapshot the bank and send over an accounts package
195                let mut snapshot_time = Measure::start("snapshot_time");
196                let result = snapshot_utils::snapshot_bank(
197                    &snapshot_root_bank,
198                    status_cache_slot_deltas,
199                    &self.accounts_package_sender,
200                    &self.snapshot_config.bank_snapshots_dir,
201                    &self.snapshot_config.snapshot_archives_dir,
202                    self.snapshot_config.snapshot_version,
203                    self.snapshot_config.archive_format,
204                    hash_for_testing,
205                    snapshot_type,
206                );
207                if let Err(e) = result {
208                    warn!(
209                        "Error taking bank snapshot. slot: {}, snapshot type: {:?}, err: {:?}",
210                        snapshot_root_bank.slot(),
211                        snapshot_type,
212                        e,
213                    );
214
215                    if Self::is_snapshot_error_fatal(&e) {
216                        return Err(e);
217                    }
218                }
219                snapshot_time.stop();
220                info!("Took bank snapshot. snapshot type: {:?}, slot: {}, accounts hash: {}, bank hash: {}",
221                      snapshot_type,
222                      snapshot_root_bank.slot(),
223                      snapshot_root_bank.get_accounts_hash(),
224                      snapshot_root_bank.hash(),
225                  );
226
227                // Cleanup outdated snapshots
228                let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time");
229                snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.bank_snapshots_dir);
230                purge_old_snapshots_time.stop();
231                total_time.stop();
232
233                datapoint_info!(
234                    "handle_snapshot_requests-timing",
235                    ("hash_time", hash_time.as_us(), i64),
236                    (
237                        "flush_accounts_cache_time",
238                        flush_accounts_cache_time.as_us(),
239                        i64
240                    ),
241                    ("shrink_time", shrink_time.as_us(), i64),
242                    ("clean_time", clean_time.as_us(), i64),
243                    ("snapshot_time", snapshot_time.as_us(), i64),
244                    (
245                        "purge_old_snapshots_time",
246                        purge_old_snapshots_time.as_us(),
247                        i64
248                    ),
249                    ("total_us", total_time.as_us(), i64),
250                    ("non_snapshot_time_us", non_snapshot_time_us, i64),
251                );
252                Ok(snapshot_root_bank.block_height())
253            })
254    }
255
256    /// Check if a SnapshotError should be treated as 'fatal' by SnapshotRequestHandler, and
257    /// `handle_snapshot_requests()` in particular.  Fatal errors will cause the node to shutdown.
258    /// Non-fatal errors are logged and then swallowed.
259    ///
260    /// All `SnapshotError`s are enumerated, and there is **NO** default case.  This way, if
261    /// a new error is added to SnapshotError, a conscious decision must be made on how it should
262    /// be handled.
263    fn is_snapshot_error_fatal(err: &SnapshotError) -> bool {
264        match err {
265            SnapshotError::Io(..) => true,
266            SnapshotError::Serialize(..) => true,
267            SnapshotError::ArchiveGenerationFailure(..) => true,
268            SnapshotError::StoragePathSymlinkInvalid => true,
269            SnapshotError::UnpackError(..) => true,
270            SnapshotError::AccountsPackageSendError(..) => true,
271            SnapshotError::IoWithSource(..) => true,
272            SnapshotError::PathToFileNameError(..) => true,
273            SnapshotError::FileNameToStrError(..) => true,
274            SnapshotError::ParseSnapshotArchiveFileNameError(..) => true,
275            SnapshotError::MismatchedBaseSlot(..) => true,
276            SnapshotError::NoSnapshotArchives => true,
277            SnapshotError::MismatchedSlotHash(..) => true,
278        }
279    }
280}
281
282#[derive(Default)]
283pub struct AbsRequestSender {
284    snapshot_request_sender: Option<SnapshotRequestSender>,
285}
286
287impl AbsRequestSender {
288    pub fn new(snapshot_request_sender: Option<SnapshotRequestSender>) -> Self {
289        AbsRequestSender {
290            snapshot_request_sender,
291        }
292    }
293
294    pub fn is_snapshot_creation_enabled(&self) -> bool {
295        self.snapshot_request_sender.is_some()
296    }
297
298    pub fn send_snapshot_request(
299        &self,
300        snapshot_request: SnapshotRequest,
301    ) -> Result<(), SendError<SnapshotRequest>> {
302        if let Some(ref snapshot_request_sender) = self.snapshot_request_sender {
303            snapshot_request_sender.send(snapshot_request)
304        } else {
305            Ok(())
306        }
307    }
308}
309
310pub struct AbsRequestHandler {
311    pub snapshot_request_handler: Option<SnapshotRequestHandler>,
312    pub pruned_banks_receiver: DroppedSlotsReceiver,
313}
314
315impl AbsRequestHandler {
316    // Returns the latest requested snapshot block height, if one exists
317    pub fn handle_snapshot_requests(
318        &self,
319        accounts_db_caching_enabled: bool,
320        test_hash_calculation: bool,
321        use_index_hash_calculation: bool,
322        non_snapshot_time_us: u128,
323        last_full_snapshot_slot: &mut Option<Slot>,
324    ) -> Option<Result<u64, SnapshotError>> {
325        self.snapshot_request_handler
326            .as_ref()
327            .and_then(|snapshot_request_handler| {
328                snapshot_request_handler.handle_snapshot_requests(
329                    accounts_db_caching_enabled,
330                    test_hash_calculation,
331                    use_index_hash_calculation,
332                    non_snapshot_time_us,
333                    last_full_snapshot_slot,
334                )
335            })
336    }
337
338    /// `is_from_abs` is true if the caller is the AccountsBackgroundService
339    pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize {
340        let mut count = 0;
341        for (pruned_slot, pruned_bank_id) in self.pruned_banks_receiver.try_iter() {
342            count += 1;
343            bank.rc
344                .accounts
345                .purge_slot(pruned_slot, pruned_bank_id, is_from_abs);
346        }
347
348        count
349    }
350}
351
352pub struct AccountsBackgroundService {
353    t_background: JoinHandle<()>,
354}
355
356impl AccountsBackgroundService {
357    pub fn new(
358        bank_forks: Arc<RwLock<BankForks>>,
359        exit: &Arc<AtomicBool>,
360        request_handler: AbsRequestHandler,
361        accounts_db_caching_enabled: bool,
362        test_hash_calculation: bool,
363        use_index_hash_calculation: bool,
364        mut last_full_snapshot_slot: Option<Slot>,
365    ) -> Self {
366        info!("AccountsBackgroundService active");
367        let exit = exit.clone();
368        let mut consumed_budget = 0;
369        let mut last_cleaned_block_height = 0;
370        let mut removed_slots_count = 0;
371        let mut total_remove_slots_time = 0;
372        let mut last_expiration_check_time = Instant::now();
373        let t_background = Builder::new()
374            .name("gemachain-bg-accounts".to_string())
375            .spawn(move || {
376                let mut last_snapshot_end_time = None;
377                loop {
378                    if exit.load(Ordering::Relaxed) {
379                        break;
380                    }
381
382                    // Grab the current root bank
383                    let bank = bank_forks.read().unwrap().root_bank().clone();
384
385                    // Purge accounts of any dead slots
386                    Self::remove_dead_slots(
387                        &bank,
388                        &request_handler,
389                        &mut removed_slots_count,
390                        &mut total_remove_slots_time,
391                    );
392
393                    Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
394
395                    let non_snapshot_time = last_snapshot_end_time
396                        .map(|last_snapshot_end_time: Instant| {
397                            last_snapshot_end_time.elapsed().as_micros()
398                        })
399                        .unwrap_or_default();
400
401                    // Check to see if there were any requests for snapshotting banks
402                    // < the current root bank `bank` above.
403
404                    // Claim: Any snapshot request for slot `N` found here implies that the last cleanup
405                    // slot `M` satisfies `M < N`
406                    //
407                    // Proof: Assume for contradiction that we find a snapshot request for slot `N` here,
408                    // but cleanup has already happened on some slot `M >= N`. Because the call to
409                    // `bank.clean_accounts(true)` (in the code below) implies we only clean slots `<= bank - 1`,
410                    // then that means in some *previous* iteration of this loop, we must have gotten a root
411                    // bank for slot some slot `R` where `R > N`, but did not see the snapshot for `N` in the
412                    // snapshot request channel.
413                    //
414                    // However, this is impossible because BankForks.set_root() will always flush the snapshot
415                    // request for `N` to the snapshot request channel before setting a root `R > N`, and
416                    // snapshot_request_handler.handle_requests() will always look for the latest
417                    // available snapshot in the channel.
418                    let snapshot_block_height_option_result = request_handler
419                        .handle_snapshot_requests(
420                            accounts_db_caching_enabled,
421                            test_hash_calculation,
422                            use_index_hash_calculation,
423                            non_snapshot_time,
424                            &mut last_full_snapshot_slot,
425                        );
426                    if snapshot_block_height_option_result.is_some() {
427                        last_snapshot_end_time = Some(Instant::now());
428                    }
429
430                    if accounts_db_caching_enabled {
431                        // Note that the flush will do an internal clean of the
432                        // cache up to bank.slot(), so should be safe as long
433                        // as any later snapshots that are taken are of
434                        // slots >= bank.slot()
435                        bank.flush_accounts_cache_if_needed();
436                    }
437
438                    if let Some(snapshot_block_height_result) = snapshot_block_height_option_result
439                    {
440                        // Safe, see proof above
441                        if let Ok(snapshot_block_height) = snapshot_block_height_result {
442                            assert!(last_cleaned_block_height <= snapshot_block_height);
443                            last_cleaned_block_height = snapshot_block_height;
444                        } else {
445                            exit.store(true, Ordering::Relaxed);
446                            return;
447                        }
448                    } else {
449                        if accounts_db_caching_enabled {
450                            bank.shrink_candidate_slots();
451                        } else {
452                            // under sustained writes, shrink can lag behind so cap to
453                            // SHRUNKEN_ACCOUNT_PER_INTERVAL (which is based on INTERVAL_MS,
454                            // which in turn roughly associated block time)
455                            consumed_budget = bank
456                                .process_stale_slot_with_budget(
457                                    consumed_budget,
458                                    SHRUNKEN_ACCOUNT_PER_INTERVAL,
459                                )
460                                .min(SHRUNKEN_ACCOUNT_PER_INTERVAL);
461                        }
462                        if bank.block_height() - last_cleaned_block_height
463                            > (CLEAN_INTERVAL_BLOCKS + thread_rng().gen_range(0, 10))
464                        {
465                            if accounts_db_caching_enabled {
466                                // Note that the flush will do an internal clean of the
467                                // cache up to bank.slot(), so should be safe as long
468                                // as any later snapshots that are taken are of
469                                // slots >= bank.slot()
470                                bank.force_flush_accounts_cache();
471                            }
472                            bank.clean_accounts(true, false, last_full_snapshot_slot);
473                            last_cleaned_block_height = bank.block_height();
474                        }
475                    }
476                    sleep(Duration::from_millis(INTERVAL_MS));
477                }
478            })
479            .unwrap();
480        Self { t_background }
481    }
482
483    pub fn join(self) -> thread::Result<()> {
484        self.t_background.join()
485    }
486
487    fn remove_dead_slots(
488        bank: &Bank,
489        request_handler: &AbsRequestHandler,
490        removed_slots_count: &mut usize,
491        total_remove_slots_time: &mut u64,
492    ) {
493        let mut remove_slots_time = Measure::start("remove_slots_time");
494        *removed_slots_count += request_handler.handle_pruned_banks(bank, true);
495        remove_slots_time.stop();
496        *total_remove_slots_time += remove_slots_time.as_us();
497
498        if *removed_slots_count >= 100 {
499            datapoint_info!(
500                "remove_slots_timing",
501                ("remove_slots_time", *total_remove_slots_time, i64),
502                ("removed_slots_count", *removed_slots_count, i64),
503            );
504            *total_remove_slots_time = 0;
505            *removed_slots_count = 0;
506        }
507    }
508
509    fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
510        let now = Instant::now();
511        if now.duration_since(*last_expiration_check_time).as_secs()
512            > RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
513        {
514            bank.expire_old_recycle_stores();
515            *last_expiration_check_time = now;
516        }
517    }
518}
519
520#[cfg(test)]
521mod test {
522    use super::*;
523    use crate::genesis_utils::create_genesis_config;
524    use crossbeam_channel::unbounded;
525    use gemachain_sdk::{account::AccountSharedData, pubkey::Pubkey};
526
527    #[test]
528    fn test_accounts_background_service_remove_dead_slots() {
529        let genesis = create_genesis_config(10);
530        let bank0 = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
531        let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
532        let request_handler = AbsRequestHandler {
533            snapshot_request_handler: None,
534            pruned_banks_receiver,
535        };
536
537        // Store an account in slot 0
538        let account_key = Pubkey::new_unique();
539        bank0.store_account(
540            &account_key,
541            &AccountSharedData::new(264, 0, &Pubkey::default()),
542        );
543        assert!(bank0.get_account(&account_key).is_some());
544        pruned_banks_sender.send((0, 0)).unwrap();
545
546        assert!(!bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
547
548        AccountsBackgroundService::remove_dead_slots(&bank0, &request_handler, &mut 0, &mut 0);
549
550        assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
551    }
552}