1mod geyser_plugin_utils;
22pub mod stats;
23pub mod tests;
24
25#[cfg(test)]
26use crate::append_vec::StoredAccountMeta;
27#[cfg(feature = "dev-context-only-utils")]
28use qualifier_attr::qualifiers;
29use {
30 crate::{
31 account_info::{AccountInfo, Offset, StorageLocation},
32 account_storage::{
33 stored_account_info::{StoredAccountInfo, StoredAccountInfoWithoutData},
34 AccountStorage, AccountStorageStatus, AccountStoragesOrderer, ShrinkInProgress,
35 },
36 accounts_cache::{AccountsCache, CachedAccount, SlotCache},
37 accounts_db::stats::{
38 AccountsStats, CleanAccountsStats, FlushStats, ObsoleteAccountsStats, PurgeStats,
39 ShrinkAncientStats, ShrinkStats, ShrinkStatsSub, StoreAccountsTiming,
40 },
41 accounts_file::{AccountsFile, AccountsFileError, AccountsFileProvider, StorageAccess},
42 accounts_hash::{AccountLtHash, AccountsLtHash, ZERO_LAMPORT_ACCOUNT_LT_HASH},
43 accounts_index::{
44 in_mem_accounts_index::StartupStats, AccountSecondaryIndexes, AccountsIndex,
45 AccountsIndexConfig, AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue,
46 IndexKey, IndexValue, IsCached, RefCount, ScanConfig, ScanFilter, ScanResult, SlotList,
47 UpsertReclaim, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING,
48 },
49 accounts_index_storage::Startup,
50 accounts_update_notifier_interface::AccountsUpdateNotifier,
51 active_stats::{ActiveStatItem, ActiveStats},
52 ancestors::Ancestors,
53 append_vec::{self, aligned_stored_size, IndexInfo, IndexInfoInner, STORE_META_OVERHEAD},
54 buffered_reader::RequiredLenBufFileRead,
55 contains::Contains,
56 is_zero_lamport::IsZeroLamport,
57 partitioned_rewards::{
58 PartitionedEpochRewardsConfig, DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
59 },
60 read_only_accounts_cache::ReadOnlyAccountsCache,
61 storable_accounts::{StorableAccounts, StorableAccountsBySlot},
62 u64_align, utils,
63 verify_accounts_hash_in_background::VerifyAccountsHashInBackground,
64 },
65 dashmap::{DashMap, DashSet},
66 log::*,
67 rand::{thread_rng, Rng},
68 rayon::{prelude::*, ThreadPool},
69 seqlock::SeqLock,
70 smallvec::SmallVec,
71 solana_account::{Account, AccountSharedData, ReadableAccount},
72 solana_clock::{BankId, Epoch, Slot},
73 solana_epoch_schedule::EpochSchedule,
74 solana_lattice_hash::lt_hash::LtHash,
75 solana_measure::{meas_dur, measure::Measure, measure_us},
76 solana_nohash_hasher::{BuildNoHashHasher, IntMap, IntSet},
77 solana_pubkey::Pubkey,
78 solana_rayon_threadlimit::get_thread_count,
79 solana_transaction::sanitized::SanitizedTransaction,
80 std::{
81 borrow::Cow,
82 boxed::Box,
83 collections::{BTreeSet, HashMap, HashSet, VecDeque},
84 io, iter, mem,
85 num::{NonZeroUsize, Saturating},
86 ops::RangeBounds,
87 path::{Path, PathBuf},
88 sync::{
89 atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
90 Arc, Condvar, Mutex, RwLock,
91 },
92 thread::{self, sleep},
93 time::{Duration, Instant},
94 },
95 tempfile::TempDir,
96};
97
98const WRITE_CACHE_LIMIT_BYTES_DEFAULT: u64 = 15_000_000_000;
101const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
102
103const UNREF_ACCOUNTS_BATCH_SIZE: usize = 10_000;
104
105const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
106const DEFAULT_NUM_DIRS: u32 = 4;
107
108const SHRINK_COLLECT_CHUNK_SIZE: usize = 50;
111
112const SHRINK_INSERT_ANCIENT_THRESHOLD: usize = 10;
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub(crate) enum ScanAccountStorageData {
119 #[cfg_attr(not(test), allow(dead_code))]
122 NoData,
123 DataRefForStorage,
126}
127
128#[derive(Default, Debug)]
129pub(crate) struct AliveAccounts<'a> {
132 pub(crate) slot: Slot,
134 pub(crate) accounts: Vec<&'a AccountFromStorage>,
135 pub(crate) bytes: usize,
136}
137
138#[derive(Debug)]
140pub(crate) struct ShrinkCollectAliveSeparatedByRefs<'a> {
141 pub(crate) one_ref: AliveAccounts<'a>,
143 pub(crate) many_refs_this_is_newest_alive: AliveAccounts<'a>,
145 pub(crate) many_refs_old_alive: AliveAccounts<'a>,
147}
148
149pub(crate) trait ShrinkCollectRefs<'a>: Sync + Send {
150 fn with_capacity(capacity: usize, slot: Slot) -> Self;
151 fn collect(&mut self, other: Self);
152 fn add(
153 &mut self,
154 ref_count: u64,
155 account: &'a AccountFromStorage,
156 slot_list: &[(Slot, AccountInfo)],
157 );
158 fn len(&self) -> usize;
159 fn alive_bytes(&self) -> usize;
160 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage>;
161}
162
163impl<'a> ShrinkCollectRefs<'a> for AliveAccounts<'a> {
164 fn collect(&mut self, mut other: Self) {
165 self.bytes = self.bytes.saturating_add(other.bytes);
166 self.accounts.append(&mut other.accounts);
167 }
168 fn with_capacity(capacity: usize, slot: Slot) -> Self {
169 Self {
170 accounts: Vec::with_capacity(capacity),
171 bytes: 0,
172 slot,
173 }
174 }
175 fn add(
176 &mut self,
177 _ref_count: u64,
178 account: &'a AccountFromStorage,
179 _slot_list: &[(Slot, AccountInfo)],
180 ) {
181 self.accounts.push(account);
182 self.bytes = self.bytes.saturating_add(account.stored_size());
183 }
184 fn len(&self) -> usize {
185 self.accounts.len()
186 }
187 fn alive_bytes(&self) -> usize {
188 self.bytes
189 }
190 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
191 &self.accounts
192 }
193}
194
195impl<'a> ShrinkCollectRefs<'a> for ShrinkCollectAliveSeparatedByRefs<'a> {
196 fn collect(&mut self, other: Self) {
197 self.one_ref.collect(other.one_ref);
198 self.many_refs_this_is_newest_alive
199 .collect(other.many_refs_this_is_newest_alive);
200 self.many_refs_old_alive.collect(other.many_refs_old_alive);
201 }
202 fn with_capacity(capacity: usize, slot: Slot) -> Self {
203 Self {
204 one_ref: AliveAccounts::with_capacity(capacity, slot),
205 many_refs_this_is_newest_alive: AliveAccounts::with_capacity(0, slot),
206 many_refs_old_alive: AliveAccounts::with_capacity(0, slot),
207 }
208 }
209 fn add(
210 &mut self,
211 ref_count: u64,
212 account: &'a AccountFromStorage,
213 slot_list: &[(Slot, AccountInfo)],
214 ) {
215 let other = if ref_count == 1 {
216 &mut self.one_ref
217 } else if slot_list.len() == 1
218 || !slot_list
219 .iter()
220 .any(|(slot_list_slot, _info)| slot_list_slot > &self.many_refs_old_alive.slot)
221 {
222 &mut self.many_refs_this_is_newest_alive
224 } else {
225 &mut self.many_refs_old_alive
228 };
229 other.add(ref_count, account, slot_list);
230 }
231 fn len(&self) -> usize {
232 self.one_ref
233 .len()
234 .saturating_add(self.many_refs_old_alive.len())
235 .saturating_add(self.many_refs_this_is_newest_alive.len())
236 }
237 fn alive_bytes(&self) -> usize {
238 self.one_ref
239 .alive_bytes()
240 .saturating_add(self.many_refs_old_alive.alive_bytes())
241 .saturating_add(self.many_refs_this_is_newest_alive.alive_bytes())
242 }
243 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
244 unimplemented!("illegal use");
245 }
246}
247
248pub enum StoreReclaims {
249 Default,
251 Ignore,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257enum LoadZeroLamports {
258 None,
260 #[cfg(feature = "dev-context-only-utils")]
266 SomeWithZeroLamportAccountForTests,
267}
268
269#[derive(Debug)]
270pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
271 pub(crate) slot: Slot,
272 pub(crate) capacity: u64,
273 pub(crate) pubkeys_to_unref: Vec<&'a Pubkey>,
274 pub(crate) zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
275 pub(crate) alive_accounts: T,
276 pub(crate) alive_total_bytes: usize,
278 pub(crate) total_starting_accounts: usize,
279 pub(crate) all_are_zero_lamports: bool,
281}
282
283pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
284 index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING),
285 account_indexes: None,
286 base_working_path: None,
287 shrink_paths: None,
288 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION,
289 read_cache_limit_bytes: None,
290 read_cache_evict_sample_size: None,
291 write_cache_limit_bytes: None,
292 ancient_append_vec_offset: None,
293 ancient_storage_ideal_size: None,
294 max_ancient_storages: None,
295 skip_initial_hash_calc: false,
296 exhaustively_verify_refcounts: false,
297 partitioned_epoch_rewards_config: DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
298 storage_access: StorageAccess::File,
299 scan_filter_for_shrinking: ScanFilter::OnlyAbnormalTest,
300 mark_obsolete_accounts: false,
301 num_background_threads: None,
302 num_foreground_threads: None,
303 num_hash_threads: None,
304};
305pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
306 index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
307 account_indexes: None,
308 base_working_path: None,
309 shrink_paths: None,
310 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION,
311 read_cache_limit_bytes: None,
312 read_cache_evict_sample_size: None,
313 write_cache_limit_bytes: None,
314 ancient_append_vec_offset: None,
315 ancient_storage_ideal_size: None,
316 max_ancient_storages: None,
317 skip_initial_hash_calc: false,
318 exhaustively_verify_refcounts: false,
319 partitioned_epoch_rewards_config: DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
320 storage_access: StorageAccess::File,
321 scan_filter_for_shrinking: ScanFilter::OnlyAbnormal,
322 mark_obsolete_accounts: false,
323 num_background_threads: None,
324 num_foreground_threads: None,
325 num_hash_threads: None,
326};
327
328struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
329 alive_accounts: T,
331 pubkeys_to_unref: Vec<&'a Pubkey>,
334 zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
336 all_are_zero_lamports: bool,
338}
339
340#[derive(Debug, PartialEq, Copy, Clone)]
343pub struct AccountFromStorage {
344 pub index_info: AccountInfo,
345 pub data_len: u64,
346 pub pubkey: Pubkey,
347}
348
349impl IsZeroLamport for AccountFromStorage {
350 fn is_zero_lamport(&self) -> bool {
351 self.index_info.is_zero_lamport()
352 }
353}
354
355impl AccountFromStorage {
356 pub fn pubkey(&self) -> &Pubkey {
357 &self.pubkey
358 }
359 pub fn stored_size(&self) -> usize {
360 aligned_stored_size(self.data_len as usize)
361 }
362 pub fn data_len(&self) -> usize {
363 self.data_len as usize
364 }
365 #[cfg(test)]
366 pub fn new(account: &StoredAccountMeta) -> Self {
367 let storage_id = 0;
371 AccountFromStorage {
372 index_info: AccountInfo::new(
373 StorageLocation::AppendVec(storage_id, account.offset()),
374 account.is_zero_lamport(),
375 ),
376 pubkey: *account.pubkey(),
377 data_len: account.data_len() as u64,
378 }
379 }
380}
381
382pub struct GetUniqueAccountsResult {
383 pub stored_accounts: Vec<AccountFromStorage>,
384 pub capacity: u64,
385 pub num_duplicated_accounts: usize,
386}
387
388pub struct AccountsAddRootTiming {
389 pub index_us: u64,
390 pub cache_us: u64,
391 pub store_us: u64,
392}
393
394const ANCIENT_APPEND_VEC_DEFAULT_OFFSET: Option<i64> = Some(100_000);
412const DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE: u64 = 100_000;
416pub const DEFAULT_MAX_ANCIENT_STORAGES: usize = 100_000;
419
420#[derive(Debug, Default, Clone)]
421pub struct AccountsDbConfig {
422 pub index: Option<AccountsIndexConfig>,
423 pub account_indexes: Option<AccountSecondaryIndexes>,
424 pub base_working_path: Option<PathBuf>,
426 pub shrink_paths: Option<Vec<PathBuf>>,
427 pub shrink_ratio: AccountShrinkThreshold,
428 pub read_cache_limit_bytes: Option<(usize, usize)>,
431 pub read_cache_evict_sample_size: Option<usize>,
434 pub write_cache_limit_bytes: Option<u64>,
435 pub ancient_append_vec_offset: Option<i64>,
438 pub ancient_storage_ideal_size: Option<u64>,
439 pub max_ancient_storages: Option<usize>,
440 pub skip_initial_hash_calc: bool,
441 pub exhaustively_verify_refcounts: bool,
442 pub partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig,
443 pub storage_access: StorageAccess,
444 pub scan_filter_for_shrinking: ScanFilter,
445 pub mark_obsolete_accounts: bool,
446 pub num_background_threads: Option<NonZeroUsize>,
448 pub num_foreground_threads: Option<NonZeroUsize>,
450 pub num_hash_threads: Option<NonZeroUsize>,
452}
453
454#[cfg(not(test))]
455const ABSURD_CONSECUTIVE_FAILED_ITERATIONS: usize = 100;
456
457#[derive(Debug, Clone, Copy)]
458pub enum AccountShrinkThreshold {
459 TotalSpace { shrink_ratio: f64 },
464 IndividualStore { shrink_ratio: f64 },
467}
468pub const DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE: bool = true;
469pub const DEFAULT_ACCOUNTS_SHRINK_RATIO: f64 = 0.80;
470const DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION: AccountShrinkThreshold =
472 AccountShrinkThreshold::TotalSpace {
473 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_RATIO,
474 };
475
476impl Default for AccountShrinkThreshold {
477 fn default() -> AccountShrinkThreshold {
478 DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION
479 }
480}
481
482pub enum ScanStorageResult<R, B> {
483 Cached(Vec<R>),
484 Stored(B),
485}
486
487#[derive(Debug, Default)]
488pub struct IndexGenerationInfo {
489 pub accounts_data_len: u64,
490 pub duplicates_lt_hash: Option<Box<DuplicatesLtHash>>,
493}
494
495#[derive(Debug, Default)]
496struct SlotIndexGenerationInfo {
497 insert_time_us: u64,
498 num_accounts: u64,
499 accounts_data_len: u64,
500 zero_lamport_pubkeys: Vec<Pubkey>,
501 all_accounts_are_zero_lamports: bool,
502 num_did_not_exist: u64,
504 num_existed_in_mem: u64,
506 num_existed_on_disk: u64,
508}
509
510#[derive(Debug, Clone, Eq, PartialEq)]
517pub struct DuplicatesLtHash(pub LtHash);
518
519impl Default for DuplicatesLtHash {
520 fn default() -> Self {
521 Self(LtHash::identity())
522 }
523}
524
525#[derive(Default, Debug)]
526struct GenerateIndexTimings {
527 pub total_time_us: u64,
528 pub index_time: u64,
529 pub scan_time: u64,
530 pub insertion_time_us: u64,
531 pub storage_size_storages_us: u64,
532 pub index_flush_us: u64,
533 pub total_including_duplicates: u64,
534 pub accounts_data_len_dedup_time_us: u64,
535 pub total_duplicate_slot_keys: u64,
536 pub total_num_unique_duplicate_keys: u64,
537 pub num_duplicate_accounts: u64,
538 pub populate_duplicate_keys_us: u64,
539 pub total_slots: u64,
540 pub par_duplicates_lt_hash_us: AtomicU64,
541 pub visit_zero_lamports_us: u64,
542 pub num_zero_lamport_single_refs: u64,
543 pub all_accounts_are_zero_lamports_slots: u64,
544 pub mark_obsolete_accounts_us: u64,
545 pub num_obsolete_accounts_marked: u64,
546 pub num_slots_removed_as_obsolete: u64,
547}
548
549#[derive(Default, Debug, PartialEq, Eq)]
550struct StorageSizeAndCount {
551 pub stored_size: usize,
553 pub count: usize,
555}
556type StorageSizeAndCountMap =
557 DashMap<AccountsFileId, StorageSizeAndCount, BuildNoHashHasher<AccountsFileId>>;
558
559impl GenerateIndexTimings {
560 pub fn report(&self, startup_stats: &StartupStats) {
561 datapoint_info!(
562 "generate_index",
563 ("overall_us", self.total_time_us, i64),
564 ("total_us", self.index_time, i64),
566 ("scan_stores_us", self.scan_time, i64),
567 ("insertion_time_us", self.insertion_time_us, i64),
568 (
569 "storage_size_storages_us",
570 self.storage_size_storages_us,
571 i64
572 ),
573 ("index_flush_us", self.index_flush_us, i64),
574 (
575 "total_items_including_duplicates",
576 self.total_including_duplicates,
577 i64
578 ),
579 (
580 "accounts_data_len_dedup_time_us",
581 self.accounts_data_len_dedup_time_us,
582 i64
583 ),
584 (
585 "total_duplicate_slot_keys",
586 self.total_duplicate_slot_keys,
587 i64
588 ),
589 (
590 "total_num_unique_duplicate_keys",
591 self.total_num_unique_duplicate_keys,
592 i64
593 ),
594 ("num_duplicate_accounts", self.num_duplicate_accounts, i64),
595 (
596 "populate_duplicate_keys_us",
597 self.populate_duplicate_keys_us,
598 i64
599 ),
600 ("total_slots", self.total_slots, i64),
601 (
602 "copy_data_us",
603 startup_stats.copy_data_us.swap(0, Ordering::Relaxed),
604 i64
605 ),
606 (
607 "par_duplicates_lt_hash_us",
608 self.par_duplicates_lt_hash_us.load(Ordering::Relaxed),
609 i64
610 ),
611 (
612 "num_zero_lamport_single_refs",
613 self.num_zero_lamport_single_refs,
614 i64
615 ),
616 ("visit_zero_lamports_us", self.visit_zero_lamports_us, i64),
617 (
618 "all_accounts_are_zero_lamports_slots",
619 self.all_accounts_are_zero_lamports_slots,
620 i64
621 ),
622 (
623 "mark_obsolete_accounts_us",
624 self.mark_obsolete_accounts_us,
625 i64
626 ),
627 (
628 "num_obsolete_accounts_marked",
629 self.num_obsolete_accounts_marked,
630 i64
631 ),
632 (
633 "num_slots_removed_as_obsolete",
634 self.num_slots_removed_as_obsolete,
635 i64
636 ),
637 );
638 }
639}
640
641impl IndexValue for AccountInfo {}
642impl DiskIndexValue for AccountInfo {}
643
644impl IsZeroLamport for AccountSharedData {
645 fn is_zero_lamport(&self) -> bool {
646 self.lamports() == 0
647 }
648}
649
650impl IsZeroLamport for Account {
651 fn is_zero_lamport(&self) -> bool {
652 self.lamports() == 0
653 }
654}
655
656struct MultiThreadProgress<'a> {
657 last_update: Instant,
658 my_last_report_count: u64,
659 total_count: &'a AtomicU64,
660 report_delay_secs: u64,
661 first_caller: bool,
662 ultimate_count: u64,
663 start_time: Instant,
664}
665
666impl<'a> MultiThreadProgress<'a> {
667 fn new(total_count: &'a AtomicU64, report_delay_secs: u64, ultimate_count: u64) -> Self {
668 Self {
669 last_update: Instant::now(),
670 my_last_report_count: 0,
671 total_count,
672 report_delay_secs,
673 first_caller: false,
674 ultimate_count,
675 start_time: Instant::now(),
676 }
677 }
678 fn report(&mut self, my_current_count: u64) {
679 let now = Instant::now();
680 if now.duration_since(self.last_update).as_secs() >= self.report_delay_secs {
681 let my_total_newly_processed_slots_since_last_report =
682 my_current_count - self.my_last_report_count;
683
684 self.my_last_report_count = my_current_count;
685 let previous_total_processed_slots_across_all_threads = self.total_count.fetch_add(
686 my_total_newly_processed_slots_since_last_report,
687 Ordering::Relaxed,
688 );
689 self.first_caller =
690 self.first_caller || 0 == previous_total_processed_slots_across_all_threads;
691 if self.first_caller {
692 let total = previous_total_processed_slots_across_all_threads
693 + my_total_newly_processed_slots_since_last_report;
694 info!(
695 "generating index: {}/{} slots... ({}/s)",
696 total,
697 self.ultimate_count,
698 total / self.start_time.elapsed().as_secs().max(1),
699 );
700 }
701 self.last_update = now;
702 }
703 }
704}
705
706pub type AtomicAccountsFileId = AtomicU32;
708pub type AccountsFileId = u32;
709
710type AccountSlots = HashMap<Pubkey, IntSet<Slot>>;
711type SlotOffsets = IntMap<Slot, IntSet<Offset>>;
712type ReclaimResult = (AccountSlots, SlotOffsets);
713type PubkeysRemovedFromAccountsIndex = HashSet<Pubkey>;
714type ShrinkCandidates = IntSet<Slot>;
715
716#[derive(Clone, Copy, Debug, PartialEq, Eq)]
721pub enum LoadHint {
722 FixedMaxRoot,
730 FixedMaxRootDoNotPopulateReadCache,
732 Unspecified,
736}
737
738#[derive(Debug)]
739pub enum LoadedAccountAccessor<'a> {
740 Stored(Option<(Arc<AccountStorageEntry>, usize)>),
743 Cached(Option<Cow<'a, Arc<CachedAccount>>>),
745}
746
747impl LoadedAccountAccessor<'_> {
748 fn check_and_get_loaded_account_shared_data(&mut self) -> AccountSharedData {
749 match self {
753 LoadedAccountAccessor::Stored(Some((maybe_storage_entry, offset))) => {
754 maybe_storage_entry
760 .accounts
761 .get_account_shared_data(*offset)
762 .expect(
763 "If a storage entry was found in the storage map, it must not have been \
764 reset yet",
765 )
766 }
767 _ => self.check_and_get_loaded_account(|loaded_account| loaded_account.take_account()),
768 }
769 }
770
771 fn check_and_get_loaded_account<T>(
772 &mut self,
773 callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
774 ) -> T {
775 match self {
779 LoadedAccountAccessor::Cached(None) | LoadedAccountAccessor::Stored(None) => {
780 panic!(
781 "Should have already been taken care of when creating this \
782 LoadedAccountAccessor"
783 );
784 }
785 LoadedAccountAccessor::Cached(Some(_cached_account)) => {
786 self.get_loaded_account(callback).unwrap()
789 }
790 LoadedAccountAccessor::Stored(Some(_maybe_storage_entry)) => {
791 self.get_loaded_account(callback).expect(
797 "If a storage entry was found in the storage map, it must not have been reset \
798 yet",
799 )
800 }
801 }
802 }
803
804 fn get_loaded_account<T>(
805 &mut self,
806 mut callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
807 ) -> Option<T> {
808 match self {
809 LoadedAccountAccessor::Cached(cached_account) => {
810 let cached_account = cached_account.take().expect(
811 "Cache flushed/purged should be handled before trying to fetch account",
812 );
813 Some(callback(LoadedAccount::Cached(cached_account)))
814 }
815 LoadedAccountAccessor::Stored(maybe_storage_entry) => {
816 maybe_storage_entry
820 .as_ref()
821 .and_then(|(storage_entry, offset)| {
822 storage_entry
823 .accounts
824 .get_stored_account_callback(*offset, |account| {
825 callback(LoadedAccount::Stored(account))
826 })
827 })
828 }
829 }
830 }
831}
832
833pub enum LoadedAccount<'a> {
834 Stored(StoredAccountInfo<'a>),
835 Cached(Cow<'a, Arc<CachedAccount>>),
836}
837
838impl LoadedAccount<'_> {
839 pub fn pubkey(&self) -> &Pubkey {
840 match self {
841 LoadedAccount::Stored(stored_account) => stored_account.pubkey(),
842 LoadedAccount::Cached(cached_account) => cached_account.pubkey(),
843 }
844 }
845
846 pub fn take_account(&self) -> AccountSharedData {
847 match self {
848 LoadedAccount::Stored(stored_account) => stored_account.to_account_shared_data(),
849 LoadedAccount::Cached(cached_account) => match cached_account {
850 Cow::Owned(cached_account) => cached_account.account.clone(),
851 Cow::Borrowed(cached_account) => cached_account.account.clone(),
852 },
853 }
854 }
855
856 pub fn is_cached(&self) -> bool {
857 match self {
858 LoadedAccount::Stored(_) => false,
859 LoadedAccount::Cached(_) => true,
860 }
861 }
862
863 pub fn data_len(&self) -> usize {
865 self.data().len()
866 }
867}
868
869impl ReadableAccount for LoadedAccount<'_> {
870 fn lamports(&self) -> u64 {
871 match self {
872 LoadedAccount::Stored(stored_account) => stored_account.lamports(),
873 LoadedAccount::Cached(cached_account) => cached_account.account.lamports(),
874 }
875 }
876 fn data(&self) -> &[u8] {
877 match self {
878 LoadedAccount::Stored(stored_account) => stored_account.data(),
879 LoadedAccount::Cached(cached_account) => cached_account.account.data(),
880 }
881 }
882 fn owner(&self) -> &Pubkey {
883 match self {
884 LoadedAccount::Stored(stored_account) => stored_account.owner(),
885 LoadedAccount::Cached(cached_account) => cached_account.account.owner(),
886 }
887 }
888 fn executable(&self) -> bool {
889 match self {
890 LoadedAccount::Stored(stored_account) => stored_account.executable(),
891 LoadedAccount::Cached(cached_account) => cached_account.account.executable(),
892 }
893 }
894 fn rent_epoch(&self) -> Epoch {
895 match self {
896 LoadedAccount::Stored(stored_account) => stored_account.rent_epoch(),
897 LoadedAccount::Cached(cached_account) => cached_account.account.rent_epoch(),
898 }
899 }
900 fn to_account_shared_data(&self) -> AccountSharedData {
901 self.take_account()
902 }
903}
904
905#[derive(Default)]
906struct CleanKeyTimings {
907 collect_delta_keys_us: u64,
908 delta_insert_us: u64,
909 dirty_store_processing_us: u64,
910 delta_key_count: u64,
911 dirty_pubkeys_count: u64,
912 oldest_dirty_slot: Slot,
913 dirty_ancient_stores: usize,
915}
916
917#[derive(Debug)]
919pub struct AccountStorageEntry {
920 pub(crate) id: AccountsFileId,
921
922 pub(crate) slot: Slot,
923
924 pub accounts: AccountsFile,
926
927 count_and_status: SeqLock<(usize, AccountStorageStatus)>,
933
934 alive_bytes: AtomicUsize,
935
936 zero_lamport_single_ref_offsets: RwLock<IntSet<Offset>>,
947
948 obsolete_accounts: RwLock<Vec<(Offset, usize, Slot)>>,
957}
958
959impl AccountStorageEntry {
960 pub fn new(
961 path: &Path,
962 slot: Slot,
963 id: AccountsFileId,
964 file_size: u64,
965 provider: AccountsFileProvider,
966 ) -> Self {
967 let tail = AccountsFile::file_name(slot, id);
968 let path = Path::new(path).join(tail);
969 let accounts = provider.new_writable(path, file_size);
970
971 Self {
972 id,
973 slot,
974 accounts,
975 count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
976 alive_bytes: AtomicUsize::new(0),
977 zero_lamport_single_ref_offsets: RwLock::default(),
978 obsolete_accounts: RwLock::default(),
979 }
980 }
981
982 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
984 fn reopen_as_readonly(&self, storage_access: StorageAccess) -> Option<Self> {
985 if storage_access != StorageAccess::File {
986 return None;
988 }
989
990 let count_and_status = self.count_and_status.lock_write();
991 self.accounts.reopen_as_readonly().map(|accounts| Self {
992 id: self.id,
993 slot: self.slot,
994 count_and_status: SeqLock::new(*count_and_status),
995 alive_bytes: AtomicUsize::new(self.alive_bytes()),
996 accounts,
997 zero_lamport_single_ref_offsets: RwLock::new(
998 self.zero_lamport_single_ref_offsets.read().unwrap().clone(),
999 ),
1000 obsolete_accounts: RwLock::new(self.obsolete_accounts.read().unwrap().clone()),
1001 })
1002 }
1003
1004 pub fn new_existing(slot: Slot, id: AccountsFileId, accounts: AccountsFile) -> Self {
1005 Self {
1006 id,
1007 slot,
1008 accounts,
1009 count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
1010 alive_bytes: AtomicUsize::new(0),
1011 zero_lamport_single_ref_offsets: RwLock::default(),
1012 obsolete_accounts: RwLock::default(),
1013 }
1014 }
1015
1016 pub fn set_status(&self, mut status: AccountStorageStatus) {
1017 let mut count_and_status = self.count_and_status.lock_write();
1018
1019 let count = count_and_status.0;
1020
1021 if status == AccountStorageStatus::Full && count == 0 {
1022 self.accounts.reset();
1031 status = AccountStorageStatus::Available;
1032 }
1033
1034 *count_and_status = (count, status);
1035 }
1036
1037 pub fn status(&self) -> AccountStorageStatus {
1038 self.count_and_status.read().1
1039 }
1040
1041 pub fn count(&self) -> usize {
1042 self.count_and_status.read().0
1043 }
1044
1045 pub fn alive_bytes(&self) -> usize {
1046 self.alive_bytes.load(Ordering::Acquire)
1047 }
1048
1049 pub fn mark_accounts_obsolete(
1051 &self,
1052 newly_obsolete_accounts: impl ExactSizeIterator<Item = (Offset, usize)>,
1053 slot: Slot,
1054 ) {
1055 let mut obsolete_accounts_list = self.obsolete_accounts.write().unwrap();
1056 obsolete_accounts_list.reserve(newly_obsolete_accounts.len());
1057
1058 for (offset, data_len) in newly_obsolete_accounts {
1059 obsolete_accounts_list.push((offset, data_len, slot));
1060 }
1061 }
1062
1063 pub fn get_obsolete_accounts(&self, slot: Option<Slot>) -> Vec<(Offset, usize)> {
1067 self.obsolete_accounts
1068 .read()
1069 .unwrap()
1070 .iter()
1071 .filter(|(_, _, obsolete_slot)| slot.is_none_or(|s| *obsolete_slot <= s))
1072 .map(|(offset, data_len, _)| (*offset, *data_len))
1073 .collect()
1074 }
1075
1076 pub fn get_obsolete_bytes(&self, slot: Option<Slot>) -> usize {
1080 let obsolete_accounts = self.obsolete_accounts.read().unwrap();
1081 let obsolete_bytes = obsolete_accounts
1082 .iter()
1083 .filter(|(_, _, obsolete_slot)| slot.is_none_or(|s| *obsolete_slot <= s))
1084 .map(|(offset, data_len, _)| {
1085 self.accounts
1086 .calculate_stored_size(*data_len)
1087 .min(self.accounts.len() - offset)
1088 })
1089 .sum();
1090 obsolete_bytes
1091 }
1092
1093 fn insert_zero_lamport_single_ref_account_offset(&self, offset: usize) -> bool {
1096 let mut zero_lamport_single_ref_offsets =
1097 self.zero_lamport_single_ref_offsets.write().unwrap();
1098 zero_lamport_single_ref_offsets.insert(offset)
1099 }
1100
1101 fn num_zero_lamport_single_ref_accounts(&self) -> usize {
1103 self.zero_lamport_single_ref_offsets.read().unwrap().len()
1104 }
1105
1106 fn alive_bytes_exclude_zero_lamport_single_ref_accounts(&self) -> usize {
1108 let zero_lamport_dead_bytes = self
1109 .accounts
1110 .dead_bytes_due_to_zero_lamport_single_ref(self.num_zero_lamport_single_ref_accounts());
1111 self.alive_bytes().saturating_sub(zero_lamport_dead_bytes)
1112 }
1113
1114 pub fn written_bytes(&self) -> u64 {
1115 self.accounts.len() as u64
1116 }
1117
1118 pub fn capacity(&self) -> u64 {
1119 self.accounts.capacity()
1120 }
1121
1122 pub fn has_accounts(&self) -> bool {
1123 self.count() > 0
1124 }
1125
1126 pub fn slot(&self) -> Slot {
1127 self.slot
1128 }
1129
1130 pub fn id(&self) -> AccountsFileId {
1131 self.id
1132 }
1133
1134 pub fn flush(&self) -> Result<(), AccountsFileError> {
1135 self.accounts.flush()
1136 }
1137
1138 fn add_accounts(&self, num_accounts: usize, num_bytes: usize) {
1139 let mut count_and_status = self.count_and_status.lock_write();
1140 *count_and_status = (count_and_status.0 + num_accounts, count_and_status.1);
1141 self.alive_bytes.fetch_add(num_bytes, Ordering::Release);
1142 }
1143
1144 fn remove_accounts(&self, num_bytes: usize, num_accounts: usize) -> usize {
1146 let mut count_and_status = self.count_and_status.lock_write();
1147 let (mut count, mut status) = *count_and_status;
1148
1149 if count == num_accounts && status == AccountStorageStatus::Full {
1150 self.accounts.reset();
1162 status = AccountStorageStatus::Available;
1163 }
1164
1165 assert!(
1168 count >= num_accounts,
1169 "double remove of account in slot: {}/store: {}!!",
1170 self.slot(),
1171 self.id(),
1172 );
1173
1174 self.alive_bytes.fetch_sub(num_bytes, Ordering::Release);
1175 count = count.saturating_sub(num_accounts);
1176 *count_and_status = (count, status);
1177 count
1178 }
1179
1180 pub fn path(&self) -> &Path {
1182 self.accounts.path()
1183 }
1184}
1185
1186pub fn get_temp_accounts_paths(count: u32) -> io::Result<(Vec<TempDir>, Vec<PathBuf>)> {
1187 let temp_dirs: io::Result<Vec<TempDir>> = (0..count).map(|_| TempDir::new()).collect();
1188 let temp_dirs = temp_dirs?;
1189
1190 let paths: io::Result<Vec<_>> = temp_dirs
1191 .iter()
1192 .map(|temp_dir| {
1193 utils::create_accounts_run_and_snapshot_dirs(temp_dir)
1194 .map(|(run_dir, _snapshot_dir)| run_dir)
1195 })
1196 .collect();
1197 let paths = paths?;
1198 Ok((temp_dirs, paths))
1199}
1200
1201#[derive(Default, Debug)]
1202struct CleaningInfo {
1203 slot_list: SlotList<AccountInfo>,
1204 ref_count: u64,
1205 might_contain_zero_lamport_entry: bool,
1209}
1210
1211type CleaningCandidates = (Box<[RwLock<HashMap<Pubkey, CleaningInfo>>]>, Option<Slot>);
1217
1218#[derive(Debug, Default)]
1222struct RemoveUnrootedSlotsSynchronization {
1223 slots_under_contention: Mutex<IntSet<Slot>>,
1225 signal: Condvar,
1226}
1227
1228type AccountInfoAccountsIndex = AccountsIndex<AccountInfo, AccountInfo>;
1229
1230#[derive(Debug)]
1232pub struct AccountsDb {
1233 pub accounts_index: AccountInfoAccountsIndex,
1235
1236 pub ancient_append_vec_offset: Option<i64>,
1239 pub ancient_storage_ideal_size: u64,
1240 pub max_ancient_storages: usize,
1241 pub skip_initial_hash_calc: bool,
1243
1244 pub storage: AccountStorage,
1245
1246 pub accounts_cache: AccountsCache,
1247
1248 write_cache_limit_bytes: Option<u64>,
1249
1250 read_only_accounts_cache: ReadOnlyAccountsCache,
1251
1252 pub next_id: AtomicAccountsFileId,
1254
1255 pub shrink_candidate_slots: Mutex<ShrinkCandidates>,
1257
1258 pub write_version: AtomicU64,
1259
1260 pub paths: Vec<PathBuf>,
1262
1263 base_working_path: PathBuf,
1265 #[allow(dead_code)]
1267 base_working_temp_dir: Option<TempDir>,
1268
1269 shrink_paths: Vec<PathBuf>,
1270
1271 #[allow(dead_code)]
1273 pub temp_paths: Option<Vec<TempDir>>,
1274
1275 file_size: u64,
1277
1278 pub thread_pool_foreground: ThreadPool,
1280 pub thread_pool_background: ThreadPool,
1282 pub num_hash_threads: Option<NonZeroUsize>,
1284
1285 pub stats: AccountsStats,
1286
1287 clean_accounts_stats: CleanAccountsStats,
1288
1289 external_purge_slots_stats: PurgeStats,
1291
1292 pub shrink_stats: ShrinkStats,
1293
1294 pub(crate) shrink_ancient_stats: ShrinkAncientStats,
1295
1296 pub account_indexes: AccountSecondaryIndexes,
1297
1298 uncleaned_pubkeys: DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>>,
1302
1303 #[cfg(test)]
1304 load_delay: u64,
1305
1306 #[cfg(test)]
1307 load_limit: AtomicU64,
1308
1309 is_bank_drop_callback_enabled: AtomicBool,
1311
1312 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization,
1316
1317 shrink_ratio: AccountShrinkThreshold,
1318
1319 dirty_stores: DashMap<Slot, Arc<AccountStorageEntry>, BuildNoHashHasher<Slot>>,
1323
1324 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet<(Slot, Pubkey)>,
1327
1328 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1330
1331 pub(crate) active_stats: ActiveStats,
1332
1333 pub verify_accounts_hash_in_bg: VerifyAccountsHashInBackground,
1334
1335 pub log_dead_slots: AtomicBool,
1338
1339 exhaustively_verify_refcounts: bool,
1341
1342 accounts_file_provider: AccountsFileProvider,
1344
1345 storage_access: StorageAccess,
1347
1348 scan_filter_for_shrinking: ScanFilter,
1350
1351 pub partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig,
1354
1355 latest_full_snapshot_slot: SeqLock<Option<Slot>>,
1358
1359 pub(crate) best_ancient_slots_to_shrink: RwLock<VecDeque<(Slot, u64)>>,
1365
1366 pub mark_obsolete_accounts: bool,
1370}
1371
1372pub fn quarter_thread_count() -> usize {
1373 std::cmp::max(2, num_cpus::get() / 4)
1374}
1375
1376pub fn default_num_hash_threads() -> NonZeroUsize {
1378 let num_threads = (num_cpus::get() / 8).clamp(2, 6);
1380 NonZeroUsize::new(num_threads).unwrap()
1381}
1382pub fn default_num_foreground_threads() -> usize {
1383 get_thread_count()
1384}
1385
1386#[cfg(feature = "frozen-abi")]
1387impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
1388 fn example() -> Self {
1389 let accounts_db = AccountsDb::new_single_for_tests();
1390 let key = Pubkey::default();
1391 let some_data_len = 5;
1392 let some_slot: Slot = 0;
1393 let account = AccountSharedData::new(1, some_data_len, &key);
1394 accounts_db.store_for_tests((some_slot, [(&key, &account)].as_slice()));
1395 accounts_db.add_root_and_flush_write_cache(0);
1396 accounts_db
1397 }
1398}
1399
1400impl AccountsDb {
1401 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1404 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO: usize = 400 * 1024 * 1024;
1405 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1406 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI: usize = 410 * 1024 * 1024;
1407
1408 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1410 const DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE: usize = 8;
1411
1412 pub fn default_for_tests() -> Self {
1413 Self::new_single_for_tests()
1414 }
1415
1416 pub fn new_single_for_tests() -> Self {
1417 AccountsDb::new_for_tests(Vec::new())
1418 }
1419
1420 pub fn new_single_for_tests_with_provider(file_provider: AccountsFileProvider) -> Self {
1421 AccountsDb::new_for_tests_with_provider(Vec::new(), file_provider)
1422 }
1423
1424 pub fn new_for_tests(paths: Vec<PathBuf>) -> Self {
1425 Self::new_for_tests_with_provider(paths, AccountsFileProvider::default())
1426 }
1427
1428 fn new_for_tests_with_provider(
1429 paths: Vec<PathBuf>,
1430 accounts_file_provider: AccountsFileProvider,
1431 ) -> Self {
1432 let mut db = AccountsDb::new_with_config(
1433 paths,
1434 Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
1435 None,
1436 Arc::default(),
1437 );
1438 db.accounts_file_provider = accounts_file_provider;
1439 db
1440 }
1441
1442 pub fn new_with_config(
1443 paths: Vec<PathBuf>,
1444 accounts_db_config: Option<AccountsDbConfig>,
1445 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1446 exit: Arc<AtomicBool>,
1447 ) -> Self {
1448 let accounts_db_config = accounts_db_config.unwrap_or_default();
1449 let accounts_index_config = accounts_db_config.index.unwrap_or_default();
1450 let accounts_index = AccountsIndex::new(&accounts_index_config, exit);
1451
1452 let base_working_path = accounts_db_config.base_working_path.clone();
1453 let (base_working_path, base_working_temp_dir) =
1454 if let Some(base_working_path) = base_working_path {
1455 (base_working_path, None)
1456 } else {
1457 let base_working_temp_dir = TempDir::new().unwrap();
1458 let base_working_path = base_working_temp_dir.path().to_path_buf();
1459 (base_working_path, Some(base_working_temp_dir))
1460 };
1461
1462 let (paths, temp_paths) = if paths.is_empty() {
1463 let (temp_dirs, temp_paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
1466 (temp_paths, Some(temp_dirs))
1467 } else {
1468 (paths, None)
1469 };
1470
1471 let shrink_paths = accounts_db_config
1472 .shrink_paths
1473 .clone()
1474 .unwrap_or_else(|| paths.clone());
1475
1476 let read_cache_size = accounts_db_config.read_cache_limit_bytes.unwrap_or((
1477 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO,
1478 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI,
1479 ));
1480 let read_cache_evict_sample_size = accounts_db_config
1481 .read_cache_evict_sample_size
1482 .unwrap_or(Self::DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE);
1483
1484 const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
1487 let num_foreground_threads = accounts_db_config
1488 .num_foreground_threads
1489 .map(Into::into)
1490 .unwrap_or_else(default_num_foreground_threads);
1491 let thread_pool_foreground = rayon::ThreadPoolBuilder::new()
1492 .num_threads(num_foreground_threads)
1493 .thread_name(|i| format!("solAcctsDbFg{i:02}"))
1494 .stack_size(ACCOUNTS_STACK_SIZE)
1495 .build()
1496 .expect("new rayon threadpool");
1497
1498 let num_background_threads = accounts_db_config
1499 .num_background_threads
1500 .map(Into::into)
1501 .unwrap_or_else(quarter_thread_count);
1502 let thread_pool_background = rayon::ThreadPoolBuilder::new()
1503 .thread_name(|i| format!("solAcctsDbBg{i:02}"))
1504 .num_threads(num_background_threads)
1505 .build()
1506 .expect("new rayon threadpool");
1507
1508 let new = Self {
1509 accounts_index,
1510 paths,
1511 base_working_path,
1512 base_working_temp_dir,
1513 temp_paths,
1514 shrink_paths,
1515 skip_initial_hash_calc: accounts_db_config.skip_initial_hash_calc,
1516 ancient_append_vec_offset: accounts_db_config
1517 .ancient_append_vec_offset
1518 .or(ANCIENT_APPEND_VEC_DEFAULT_OFFSET),
1519 ancient_storage_ideal_size: accounts_db_config
1520 .ancient_storage_ideal_size
1521 .unwrap_or(DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE),
1522 max_ancient_storages: accounts_db_config
1523 .max_ancient_storages
1524 .unwrap_or(DEFAULT_MAX_ANCIENT_STORAGES),
1525 account_indexes: accounts_db_config.account_indexes.unwrap_or_default(),
1526 shrink_ratio: accounts_db_config.shrink_ratio,
1527 accounts_update_notifier,
1528 read_only_accounts_cache: ReadOnlyAccountsCache::new(
1529 read_cache_size.0,
1530 read_cache_size.1,
1531 read_cache_evict_sample_size,
1532 ),
1533 write_cache_limit_bytes: accounts_db_config.write_cache_limit_bytes,
1534 partitioned_epoch_rewards_config: accounts_db_config.partitioned_epoch_rewards_config,
1535 exhaustively_verify_refcounts: accounts_db_config.exhaustively_verify_refcounts,
1536 storage_access: accounts_db_config.storage_access,
1537 scan_filter_for_shrinking: accounts_db_config.scan_filter_for_shrinking,
1538 thread_pool_foreground,
1539 thread_pool_background,
1540 num_hash_threads: accounts_db_config.num_hash_threads,
1541 verify_accounts_hash_in_bg: VerifyAccountsHashInBackground::default(),
1542 active_stats: ActiveStats::default(),
1543 storage: AccountStorage::default(),
1544 accounts_cache: AccountsCache::default(),
1545 uncleaned_pubkeys: DashMap::default(),
1546 next_id: AtomicAccountsFileId::new(0),
1547 shrink_candidate_slots: Mutex::new(ShrinkCandidates::default()),
1548 write_version: AtomicU64::new(0),
1549 file_size: DEFAULT_FILE_SIZE,
1550 external_purge_slots_stats: PurgeStats::default(),
1551 clean_accounts_stats: CleanAccountsStats::default(),
1552 shrink_stats: ShrinkStats::default(),
1553 shrink_ancient_stats: ShrinkAncientStats::default(),
1554 stats: AccountsStats::default(),
1555 #[cfg(test)]
1556 load_delay: u64::default(),
1557 #[cfg(test)]
1558 load_limit: AtomicU64::default(),
1559 is_bank_drop_callback_enabled: AtomicBool::default(),
1560 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(),
1561 dirty_stores: DashMap::default(),
1562 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet::default(),
1563 log_dead_slots: AtomicBool::new(true),
1564 accounts_file_provider: AccountsFileProvider::default(),
1565 latest_full_snapshot_slot: SeqLock::new(None),
1566 best_ancient_slots_to_shrink: RwLock::default(),
1567 mark_obsolete_accounts: accounts_db_config.mark_obsolete_accounts,
1568 };
1569
1570 {
1571 for path in new.paths.iter() {
1572 std::fs::create_dir_all(path).expect("Create directory failed.");
1573 }
1574 }
1575 new
1576 }
1577
1578 pub fn file_size(&self) -> u64 {
1579 self.file_size
1580 }
1581
1582 pub fn get_base_working_path(&self) -> PathBuf {
1584 self.base_working_path.clone()
1585 }
1586
1587 pub fn has_accounts_update_notifier(&self) -> bool {
1589 self.accounts_update_notifier.is_some()
1590 }
1591
1592 fn next_id(&self) -> AccountsFileId {
1593 let next_id = self.next_id.fetch_add(1, Ordering::AcqRel);
1594 assert!(
1595 next_id != AccountsFileId::MAX,
1596 "We've run out of storage ids!"
1597 );
1598 next_id
1599 }
1600
1601 fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
1602 AccountStorageEntry::new(
1603 path,
1604 slot,
1605 self.next_id(),
1606 size,
1607 self.accounts_file_provider,
1608 )
1609 }
1610
1611 fn collect_reclaims(
1615 &self,
1616 pubkey: &Pubkey,
1617 max_clean_root_inclusive: Option<Slot>,
1618 ancient_account_cleans: &AtomicU64,
1619 epoch_schedule: &EpochSchedule,
1620 pubkeys_removed_from_accounts_index: &Mutex<PubkeysRemovedFromAccountsIndex>,
1621 ) -> SlotList<AccountInfo> {
1622 let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
1623 let mut clean_rooted = Measure::start("clean_old_root-ms");
1624 let mut reclaims = Vec::new();
1625 let removed_from_index = self.accounts_index.clean_rooted_entries(
1626 pubkey,
1627 &mut reclaims,
1628 max_clean_root_inclusive,
1629 );
1630 if removed_from_index {
1631 pubkeys_removed_from_accounts_index
1632 .lock()
1633 .unwrap()
1634 .insert(*pubkey);
1635 }
1636 if !reclaims.is_empty() {
1637 let old_reclaims = reclaims
1639 .iter()
1640 .filter_map(|(slot, _)| (slot < &one_epoch_old).then_some(1))
1641 .sum();
1642 ancient_account_cleans.fetch_add(old_reclaims, Ordering::Relaxed);
1643 }
1644 clean_rooted.stop();
1645 self.clean_accounts_stats
1646 .clean_old_root_us
1647 .fetch_add(clean_rooted.as_us(), Ordering::Relaxed);
1648 reclaims
1649 }
1650
1651 fn clean_accounts_older_than_root(
1655 &self,
1656 reclaims: &SlotList<AccountInfo>,
1657 pubkeys_removed_from_accounts_index: &HashSet<Pubkey>,
1658 ) -> ReclaimResult {
1659 let mut measure = Measure::start("clean_old_root_reclaims");
1660
1661 let reclaim_result = self.handle_reclaims(
1662 (!reclaims.is_empty()).then(|| reclaims.iter()),
1663 None,
1664 pubkeys_removed_from_accounts_index,
1665 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
1666 MarkAccountsObsolete::No,
1667 );
1668 measure.stop();
1669 debug!("{measure}");
1670 self.clean_accounts_stats
1671 .clean_old_root_reclaim_us
1672 .fetch_add(measure.as_us(), Ordering::Relaxed);
1673 reclaim_result
1674 }
1675
1676 fn calc_delete_dependencies(
1681 &self,
1682 candidates: &[HashMap<Pubkey, CleaningInfo>],
1683 store_counts: &mut HashMap<Slot, (usize, HashSet<Pubkey>)>,
1684 min_slot: Option<Slot>,
1685 ) {
1686 let mut already_counted = IntSet::default();
1690 for (bin_index, bin) in candidates.iter().enumerate() {
1691 for (pubkey, cleaning_info) in bin.iter() {
1692 let slot_list = &cleaning_info.slot_list;
1693 let ref_count = &cleaning_info.ref_count;
1694 let mut failed_slot = None;
1695 let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count;
1696 if all_stores_being_deleted {
1697 let mut delete = true;
1698 for (slot, _account_info) in slot_list {
1699 if let Some(count) = store_counts.get(slot).map(|s| s.0) {
1700 debug!("calc_delete_dependencies() slot: {slot}, count len: {count}");
1701 if count == 0 {
1702 continue;
1704 }
1705 }
1706 failed_slot = Some(*slot);
1709 delete = false;
1710 break;
1711 }
1712 if delete {
1713 continue;
1715 }
1716 } else {
1717 debug!(
1719 "calc_delete_dependencies(), pubkey: {pubkey}, slot list len: {}, ref \
1720 count: {ref_count}, slot list: {slot_list:?}",
1721 slot_list.len(),
1722 );
1723 }
1724
1725 let mut pending_stores = IntSet::default();
1727 for (slot, _account_info) in slot_list {
1728 if !already_counted.contains(slot) {
1729 pending_stores.insert(*slot);
1730 }
1731 }
1732 while !pending_stores.is_empty() {
1733 let slot = pending_stores.iter().next().cloned().unwrap();
1734 if Some(slot) == min_slot {
1735 if let Some(failed_slot) = failed_slot.take() {
1736 info!(
1737 "calc_delete_dependencies, oldest slot is not able to be deleted \
1738 because of {pubkey} in slot {failed_slot}"
1739 );
1740 } else {
1741 info!(
1742 "calc_delete_dependencies, oldest slot is not able to be deleted \
1743 because of {pubkey}, slot list len: {}, ref count: {ref_count}",
1744 slot_list.len()
1745 );
1746 }
1747 }
1748
1749 pending_stores.remove(&slot);
1750 if !already_counted.insert(slot) {
1751 continue;
1752 }
1753 if let Some(store_count) = store_counts.remove(&slot) {
1755 let affected_pubkeys = &store_count.1;
1757 for key in affected_pubkeys {
1758 let candidates_bin_index =
1759 self.accounts_index.bin_calculator.bin_from_pubkey(key);
1760 let mut update_pending_stores =
1761 |bin: &HashMap<Pubkey, CleaningInfo>| {
1762 for (slot, _account_info) in &bin.get(key).unwrap().slot_list {
1763 if !already_counted.contains(slot) {
1764 pending_stores.insert(*slot);
1765 }
1766 }
1767 };
1768 if candidates_bin_index == bin_index {
1769 update_pending_stores(bin);
1770 } else {
1771 update_pending_stores(&candidates[candidates_bin_index]);
1772 }
1773 }
1774 }
1775 }
1776 }
1777 }
1778 }
1779
1780 #[must_use]
1781 pub fn purge_keys_exact<'a, C>(
1782 &'a self,
1783 pubkey_to_slot_set: impl Iterator<Item = &'a (Pubkey, C)>,
1784 ) -> (Vec<(Slot, AccountInfo)>, PubkeysRemovedFromAccountsIndex)
1785 where
1786 C: Contains<'a, Slot> + 'a,
1787 {
1788 let mut reclaims = Vec::new();
1789 let mut dead_keys = Vec::new();
1790
1791 let mut purge_exact_count = 0;
1792 let (_, purge_exact_us) = measure_us!(for (pubkey, slots_set) in pubkey_to_slot_set {
1793 purge_exact_count += 1;
1794 let is_empty = self
1795 .accounts_index
1796 .purge_exact(pubkey, slots_set, &mut reclaims);
1797 if is_empty {
1798 dead_keys.push(pubkey);
1799 }
1800 });
1801
1802 let (pubkeys_removed_from_accounts_index, handle_dead_keys_us) = measure_us!(self
1803 .accounts_index
1804 .handle_dead_keys(&dead_keys, &self.account_indexes));
1805
1806 self.stats
1807 .purge_exact_count
1808 .fetch_add(purge_exact_count, Ordering::Relaxed);
1809 self.stats
1810 .handle_dead_keys_us
1811 .fetch_add(handle_dead_keys_us, Ordering::Relaxed);
1812 self.stats
1813 .purge_exact_us
1814 .fetch_add(purge_exact_us, Ordering::Relaxed);
1815 (reclaims, pubkeys_removed_from_accounts_index)
1816 }
1817
1818 fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
1819 match (
1820 self.accounts_index.min_ongoing_scan_root(),
1821 proposed_clean_root,
1822 ) {
1823 (None, None) => None,
1824 (Some(min_scan_root), None) => Some(min_scan_root),
1825 (None, Some(proposed_clean_root)) => Some(proposed_clean_root),
1826 (Some(min_scan_root), Some(proposed_clean_root)) => {
1827 Some(std::cmp::min(min_scan_root, proposed_clean_root))
1828 }
1829 }
1830 }
1831
1832 fn get_oldest_non_ancient_slot(&self, epoch_schedule: &EpochSchedule) -> Slot {
1835 self.get_oldest_non_ancient_slot_from_slot(
1836 epoch_schedule,
1837 self.accounts_index.max_root_inclusive(),
1838 )
1839 }
1840
1841 fn get_oldest_non_ancient_slot_from_slot(
1844 &self,
1845 epoch_schedule: &EpochSchedule,
1846 max_root_inclusive: Slot,
1847 ) -> Slot {
1848 let mut result = max_root_inclusive;
1849 if let Some(offset) = self.ancient_append_vec_offset {
1850 result = Self::apply_offset_to_slot(result, offset);
1851 }
1852 result = Self::apply_offset_to_slot(
1853 result,
1854 -((epoch_schedule.slots_per_epoch as i64).saturating_sub(1)),
1855 );
1856 result.min(max_root_inclusive)
1857 }
1858
1859 fn collect_uncleaned_slots_up_to_slot(&self, max_slot_inclusive: Slot) -> Vec<Slot> {
1863 self.uncleaned_pubkeys
1864 .iter()
1865 .filter_map(|entry| {
1866 let slot = *entry.key();
1867 (slot <= max_slot_inclusive).then_some(slot)
1868 })
1869 .collect()
1870 }
1871
1872 fn remove_uncleaned_slots_up_to_slot_and_move_pubkeys(
1876 &self,
1877 max_slot_inclusive: Slot,
1878 candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
1879 ) {
1880 let uncleaned_slots = self.collect_uncleaned_slots_up_to_slot(max_slot_inclusive);
1881 for uncleaned_slot in uncleaned_slots.into_iter() {
1882 if let Some((_removed_slot, mut removed_pubkeys)) =
1883 self.uncleaned_pubkeys.remove(&uncleaned_slot)
1884 {
1885 removed_pubkeys.sort_by(|a, b| {
1888 self.accounts_index
1889 .bin_calculator
1890 .bin_from_pubkey(a)
1891 .cmp(&self.accounts_index.bin_calculator.bin_from_pubkey(b))
1892 });
1893 if let Some(first_removed_pubkey) = removed_pubkeys.first() {
1894 let mut prev_bin = self
1895 .accounts_index
1896 .bin_calculator
1897 .bin_from_pubkey(first_removed_pubkey);
1898 let mut candidates_bin = candidates[prev_bin].write().unwrap();
1899 for removed_pubkey in removed_pubkeys {
1900 let curr_bin = self
1901 .accounts_index
1902 .bin_calculator
1903 .bin_from_pubkey(&removed_pubkey);
1904 if curr_bin != prev_bin {
1905 candidates_bin = candidates[curr_bin].write().unwrap();
1906 prev_bin = curr_bin;
1907 }
1908 candidates_bin.insert(
1914 removed_pubkey,
1915 CleaningInfo {
1916 might_contain_zero_lamport_entry: true,
1917 ..Default::default()
1918 },
1919 );
1920 }
1921 }
1922 }
1923 }
1924 }
1925
1926 fn count_pubkeys(candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>]) -> u64 {
1927 candidates
1928 .iter()
1929 .map(|x| x.read().unwrap().len())
1930 .sum::<usize>() as u64
1931 }
1932
1933 fn construct_candidate_clean_keys(
1939 &self,
1940 max_clean_root_inclusive: Option<Slot>,
1941 is_startup: bool,
1942 timings: &mut CleanKeyTimings,
1943 epoch_schedule: &EpochSchedule,
1944 ) -> CleaningCandidates {
1945 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
1946 let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
1947 let max_root_inclusive = self.accounts_index.max_root_inclusive();
1948 let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);
1949 let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
1950 let mut min_dirty_slot = None::<u64>;
1953 self.dirty_stores.retain(|slot, store| {
1954 if *slot > max_slot_inclusive {
1955 true
1956 } else {
1957 min_dirty_slot = min_dirty_slot.map(|min| min.min(*slot)).or(Some(*slot));
1958 dirty_stores.push((*slot, store.clone()));
1959 false
1960 }
1961 });
1962 let dirty_stores_len = dirty_stores.len();
1963 let num_bins = self.accounts_index.bins();
1964 let candidates: Box<_> =
1965 std::iter::repeat_with(|| RwLock::new(HashMap::<Pubkey, CleaningInfo>::new()))
1966 .take(num_bins)
1967 .collect();
1968
1969 let insert_candidate = |pubkey, is_zero_lamport| {
1970 let index = self.accounts_index.bin_calculator.bin_from_pubkey(&pubkey);
1971 let mut candidates_bin = candidates[index].write().unwrap();
1972 candidates_bin
1973 .entry(pubkey)
1974 .or_default()
1975 .might_contain_zero_lamport_entry |= is_zero_lamport;
1976 };
1977
1978 let dirty_ancient_stores = AtomicUsize::default();
1979 let mut dirty_store_routine = || {
1980 let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads()));
1981 let oldest_dirty_slots: Vec<u64> = dirty_stores
1982 .par_chunks(chunk_size)
1983 .map(|dirty_store_chunk| {
1984 let mut oldest_dirty_slot = max_slot_inclusive.saturating_add(1);
1985 dirty_store_chunk.iter().for_each(|(slot, store)| {
1986 if *slot < oldest_non_ancient_slot {
1987 dirty_ancient_stores.fetch_add(1, Ordering::Relaxed);
1988 }
1989 oldest_dirty_slot = oldest_dirty_slot.min(*slot);
1990
1991 store
1992 .accounts
1993 .scan_accounts_without_data(|_offset, account| {
1994 let pubkey = *account.pubkey();
1995 let is_zero_lamport = account.is_zero_lamport();
1996 insert_candidate(pubkey, is_zero_lamport);
1997 })
1998 .expect("must scan accounts storage");
1999 });
2000 oldest_dirty_slot
2001 })
2002 .collect();
2003 timings.oldest_dirty_slot = *oldest_dirty_slots
2004 .iter()
2005 .min()
2006 .unwrap_or(&max_slot_inclusive.saturating_add(1));
2007 };
2008
2009 if is_startup {
2010 dirty_store_routine();
2012 } else {
2013 self.thread_pool_background.install(|| {
2014 dirty_store_routine();
2015 });
2016 }
2017 timings.dirty_pubkeys_count = Self::count_pubkeys(&candidates);
2018 trace!(
2019 "dirty_stores.len: {} pubkeys.len: {}",
2020 dirty_stores_len,
2021 timings.dirty_pubkeys_count,
2022 );
2023 dirty_store_processing_time.stop();
2024 timings.dirty_store_processing_us += dirty_store_processing_time.as_us();
2025 timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed);
2026
2027 let mut collect_delta_keys = Measure::start("key_create");
2028 self.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(max_slot_inclusive, &candidates);
2029 collect_delta_keys.stop();
2030 timings.collect_delta_keys_us += collect_delta_keys.as_us();
2031
2032 timings.delta_key_count = Self::count_pubkeys(&candidates);
2033
2034 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2038 assert!(
2039 latest_full_snapshot_slot.is_some()
2040 || self
2041 .zero_lamport_accounts_to_purge_after_full_snapshot
2042 .is_empty(),
2043 "if snapshots are disabled, then zero_lamport_accounts_to_purge_later should always \
2044 be empty"
2045 );
2046 if let Some(latest_full_snapshot_slot) = latest_full_snapshot_slot {
2047 self.zero_lamport_accounts_to_purge_after_full_snapshot
2048 .retain(|(slot, pubkey)| {
2049 let is_candidate_for_clean =
2050 max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot;
2051 if is_candidate_for_clean {
2052 insert_candidate(*pubkey, true);
2053 }
2054 !is_candidate_for_clean
2055 });
2056 }
2057
2058 (candidates, min_dirty_slot)
2059 }
2060
2061 pub fn clean_accounts_for_tests(&self) {
2063 self.clean_accounts(None, false, &EpochSchedule::default())
2064 }
2065
2066 fn exhaustively_verify_refcounts(&self, max_slot_inclusive: Option<Slot>) {
2071 let max_slot_inclusive =
2072 max_slot_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
2073 info!("exhaustively verifying refcounts as of slot: {max_slot_inclusive}");
2074 let pubkey_refcount = DashMap::<Pubkey, Vec<Slot>>::default();
2075 let mut storages = self.storage.all_storages();
2076 storages.retain(|s| s.slot() <= max_slot_inclusive);
2077 storages.par_iter().for_each_init(
2079 || Box::new(append_vec::new_scan_accounts_reader()),
2080 |reader, storage| {
2081 let slot = storage.slot();
2082 storage
2083 .accounts
2084 .scan_accounts(reader.as_mut(), |_offset, account| {
2085 let pk = account.pubkey();
2086 match pubkey_refcount.entry(*pk) {
2087 dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
2088 if !occupied_entry.get().iter().any(|s| s == &slot) {
2089 occupied_entry.get_mut().push(slot);
2090 }
2091 }
2092 dashmap::mapref::entry::Entry::Vacant(vacant_entry) => {
2093 vacant_entry.insert(vec![slot]);
2094 }
2095 }
2096 })
2097 .expect("must scan accounts storage")
2098 },
2099 );
2100 let total = pubkey_refcount.len();
2101 let failed = AtomicBool::default();
2102 let threads = quarter_thread_count();
2103 let per_batch = total / threads;
2104 (0..=threads).into_par_iter().for_each(|attempt| {
2105 pubkey_refcount
2106 .iter()
2107 .skip(attempt * per_batch)
2108 .take(per_batch)
2109 .for_each(|entry| {
2110 if failed.load(Ordering::Relaxed) {
2111 return;
2112 }
2113
2114 self.accounts_index
2115 .get_and_then(entry.key(), |index_entry| {
2116 if let Some(index_entry) = index_entry {
2117 match (index_entry.ref_count() as usize).cmp(&entry.value().len()) {
2118 std::cmp::Ordering::Equal => {
2119 }
2121 std::cmp::Ordering::Greater => {
2122 let slot_list = index_entry.slot_list.read().unwrap();
2123 let num_too_new = slot_list
2124 .iter()
2125 .filter(|(slot, _)| slot > &max_slot_inclusive)
2126 .count();
2127
2128 if ((index_entry.ref_count() as usize) - num_too_new)
2129 > entry.value().len()
2130 {
2131 failed.store(true, Ordering::Relaxed);
2132 error!(
2133 "exhaustively_verify_refcounts: {} refcount too \
2134 large: {}, should be: {}, {:?}, {:?}, too_new: \
2135 {num_too_new}",
2136 entry.key(),
2137 index_entry.ref_count(),
2138 entry.value().len(),
2139 *entry.value(),
2140 slot_list
2141 );
2142 }
2143 }
2144 std::cmp::Ordering::Less => {
2145 error!(
2146 "exhaustively_verify_refcounts: {} refcount too \
2147 small: {}, should be: {}, {:?}, {:?}",
2148 entry.key(),
2149 index_entry.ref_count(),
2150 entry.value().len(),
2151 *entry.value(),
2152 index_entry.slot_list.read().unwrap()
2153 );
2154 }
2155 }
2156 };
2157 (false, ())
2158 });
2159 });
2160 });
2161 if failed.load(Ordering::Relaxed) {
2162 panic!("exhaustively_verify_refcounts failed");
2163 }
2164 }
2165
2166 pub fn clean_accounts(
2171 &self,
2172 max_clean_root_inclusive: Option<Slot>,
2173 is_startup: bool,
2174 epoch_schedule: &EpochSchedule,
2175 ) {
2176 if self.exhaustively_verify_refcounts {
2177 if is_startup {
2179 self.exhaustively_verify_refcounts(max_clean_root_inclusive);
2180 } else {
2181 self.thread_pool_background
2183 .install(|| self.exhaustively_verify_refcounts(max_clean_root_inclusive));
2184 }
2185 }
2186
2187 let _guard = self.active_stats.activate(ActiveStatItem::Clean);
2188
2189 let ancient_account_cleans = AtomicU64::default();
2190 let purges_old_accounts_count = AtomicU64::default();
2191
2192 let mut measure_all = Measure::start("clean_accounts");
2193 let max_clean_root_inclusive = self.max_clean_root(max_clean_root_inclusive);
2194
2195 self.report_store_stats();
2196
2197 let active_guard = self
2198 .active_stats
2199 .activate(ActiveStatItem::CleanConstructCandidates);
2200 let mut measure_construct_candidates = Measure::start("construct_candidates");
2201 let mut key_timings = CleanKeyTimings::default();
2202 let (mut candidates, min_dirty_slot) = self.construct_candidate_clean_keys(
2203 max_clean_root_inclusive,
2204 is_startup,
2205 &mut key_timings,
2206 epoch_schedule,
2207 );
2208 measure_construct_candidates.stop();
2209 drop(active_guard);
2210
2211 let num_candidates = Self::count_pubkeys(&candidates);
2212 let found_not_zero_accum = AtomicU64::new(0);
2213 let not_found_on_fork_accum = AtomicU64::new(0);
2214 let missing_accum = AtomicU64::new(0);
2215 let useful_accum = AtomicU64::new(0);
2216 let reclaims: SlotList<AccountInfo> = Vec::with_capacity(num_candidates as usize);
2217 let reclaims = Mutex::new(reclaims);
2218 let pubkeys_removed_from_accounts_index: PubkeysRemovedFromAccountsIndex = HashSet::new();
2219 let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);
2220 let do_clean_scan = || {
2222 candidates.par_iter().for_each(|candidates_bin| {
2223 let mut found_not_zero = 0;
2224 let mut not_found_on_fork = 0;
2225 let mut missing = 0;
2226 let mut useful = 0;
2227 let mut purges_old_accounts_local = 0;
2228 let mut candidates_bin = candidates_bin.write().unwrap();
2229 candidates_bin.retain(|candidate_pubkey, candidate_info| {
2234 let mut should_collect_reclaims = false;
2235 self.accounts_index.scan(
2236 iter::once(candidate_pubkey),
2237 |_candidate_pubkey, slot_list_and_ref_count, _entry| {
2238 let mut useless = true;
2239 if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
2240 let index_in_slot_list = self.accounts_index.latest_slot(
2242 None,
2243 slot_list,
2244 max_clean_root_inclusive,
2245 );
2246
2247 match index_in_slot_list {
2248 Some(index_in_slot_list) => {
2249 let (slot, account_info) = &slot_list[index_in_slot_list];
2251 if account_info.is_zero_lamport() {
2252 useless = false;
2253 candidate_info.slot_list =
2257 self.accounts_index.get_rooted_entries(
2258 slot_list,
2259 max_clean_root_inclusive,
2260 );
2261 candidate_info.ref_count = ref_count;
2262 } else {
2263 found_not_zero += 1;
2264 }
2265
2266 if slot_list.len() > 1
2269 && *slot
2270 <= max_clean_root_inclusive.unwrap_or(Slot::MAX)
2271 {
2272 should_collect_reclaims = true;
2273 purges_old_accounts_local += 1;
2274 useless = false;
2275 }
2276 }
2277 None => {
2278 not_found_on_fork += 1;
2285 should_collect_reclaims = true;
2286 purges_old_accounts_local += 1;
2287 useless = false;
2288 }
2289 }
2290 } else {
2291 missing += 1;
2292 }
2293 if !useless {
2294 useful += 1;
2295 }
2296 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2297 },
2298 None,
2299 false,
2300 if candidate_info.might_contain_zero_lamport_entry {
2301 ScanFilter::All
2302 } else {
2303 self.scan_filter_for_shrinking
2304 },
2305 );
2306 if should_collect_reclaims {
2307 let reclaims_new = self.collect_reclaims(
2308 candidate_pubkey,
2309 max_clean_root_inclusive,
2310 &ancient_account_cleans,
2311 epoch_schedule,
2312 &pubkeys_removed_from_accounts_index,
2313 );
2314 if !reclaims_new.is_empty() {
2315 reclaims.lock().unwrap().extend(reclaims_new);
2316 }
2317 }
2318 !candidate_info.slot_list.is_empty()
2319 });
2320 found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
2321 not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
2322 missing_accum.fetch_add(missing, Ordering::Relaxed);
2323 useful_accum.fetch_add(useful, Ordering::Relaxed);
2324 purges_old_accounts_count.fetch_add(purges_old_accounts_local, Ordering::Relaxed);
2325 });
2326 };
2327 let active_guard = self
2328 .active_stats
2329 .activate(ActiveStatItem::CleanScanCandidates);
2330 let mut accounts_scan = Measure::start("accounts_scan");
2331 if is_startup {
2332 do_clean_scan();
2333 } else {
2334 self.thread_pool_background.install(do_clean_scan);
2335 }
2336 accounts_scan.stop();
2337 drop(active_guard);
2338
2339 let mut candidates: Box<_> = candidates
2341 .iter_mut()
2342 .map(|candidates_bin| mem::take(candidates_bin.get_mut().unwrap()))
2343 .collect();
2344
2345 let retained_keys_count: usize = candidates.iter().map(HashMap::len).sum();
2346 let reclaims = reclaims.into_inner().unwrap();
2347 let mut pubkeys_removed_from_accounts_index =
2348 pubkeys_removed_from_accounts_index.into_inner().unwrap();
2349
2350 let active_guard = self.active_stats.activate(ActiveStatItem::CleanOldAccounts);
2351 let mut clean_old_rooted = Measure::start("clean_old_roots");
2352 let (purged_account_slots, removed_accounts) =
2353 self.clean_accounts_older_than_root(&reclaims, &pubkeys_removed_from_accounts_index);
2354 clean_old_rooted.stop();
2355 drop(active_guard);
2356
2357 let active_guard = self
2360 .active_stats
2361 .activate(ActiveStatItem::CleanCollectStoreCounts);
2362 let mut store_counts_time = Measure::start("store_counts");
2363 let mut store_counts: HashMap<Slot, (usize, HashSet<Pubkey>)> = HashMap::new();
2364 for candidates_bin in candidates.iter_mut() {
2365 for (pubkey, cleaning_info) in candidates_bin.iter_mut() {
2366 let slot_list = &mut cleaning_info.slot_list;
2367 let ref_count = &mut cleaning_info.ref_count;
2368 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2369 if purged_account_slots.contains_key(pubkey) {
2370 *ref_count = self.accounts_index.ref_count_from_storage(pubkey);
2371 }
2372 slot_list.retain(|(slot, account_info)| {
2373 let was_slot_purged = purged_account_slots
2374 .get(pubkey)
2375 .map(|slots_removed| slots_removed.contains(slot))
2376 .unwrap_or(false);
2377 if was_slot_purged {
2378 return false;
2381 }
2382 let was_reclaimed = removed_accounts
2385 .get(slot)
2386 .map(|store_removed| store_removed.contains(&account_info.offset()))
2387 .unwrap_or(false);
2388 if was_reclaimed {
2389 return false;
2390 }
2391 if let Some(store_count) = store_counts.get_mut(slot) {
2392 store_count.0 -= 1;
2393 store_count.1.insert(*pubkey);
2394 } else {
2395 let mut key_set = HashSet::new();
2396 key_set.insert(*pubkey);
2397 assert!(
2398 !account_info.is_cached(),
2399 "The Accounts Cache must be flushed first for this account info. \
2400 pubkey: {}, slot: {}",
2401 *pubkey,
2402 *slot
2403 );
2404 let count = self
2405 .storage
2406 .get_account_storage_entry(*slot, account_info.store_id())
2407 .map(|store| store.count())
2408 .unwrap()
2409 - 1;
2410 debug!(
2411 "store_counts, inserting slot: {}, store id: {}, count: {}",
2412 slot,
2413 account_info.store_id(),
2414 count
2415 );
2416 store_counts.insert(*slot, (count, key_set));
2417 }
2418 true
2419 });
2420 }
2421 }
2422 store_counts_time.stop();
2423 drop(active_guard);
2424
2425 let active_guard = self
2426 .active_stats
2427 .activate(ActiveStatItem::CleanCalcDeleteDeps);
2428 let mut calc_deps_time = Measure::start("calc_deps");
2429 self.calc_delete_dependencies(&candidates, &mut store_counts, min_dirty_slot);
2430 calc_deps_time.stop();
2431 drop(active_guard);
2432
2433 let active_guard = self
2434 .active_stats
2435 .activate(ActiveStatItem::CleanFilterZeroLamport);
2436 let mut purge_filter = Measure::start("purge_filter");
2437 self.filter_zero_lamport_clean_for_incremental_snapshots(
2438 max_clean_root_inclusive,
2439 &store_counts,
2440 &mut candidates,
2441 );
2442 purge_filter.stop();
2443 drop(active_guard);
2444
2445 let active_guard = self.active_stats.activate(ActiveStatItem::CleanReclaims);
2446 let mut reclaims_time = Measure::start("reclaims");
2447 let mut pubkey_to_slot_set = Vec::new();
2449 for candidates_bin in candidates.iter() {
2450 let mut bin_set = candidates_bin
2451 .iter()
2452 .filter_map(|(pubkey, cleaning_info)| {
2453 let slot_list = &cleaning_info.slot_list;
2454 (!slot_list.is_empty()).then_some((
2455 *pubkey,
2456 slot_list
2457 .iter()
2458 .map(|(slot, _)| *slot)
2459 .collect::<HashSet<Slot>>(),
2460 ))
2461 })
2462 .collect::<Vec<_>>();
2463 pubkey_to_slot_set.append(&mut bin_set);
2464 }
2465
2466 let (reclaims, pubkeys_removed_from_accounts_index2) =
2467 self.purge_keys_exact(pubkey_to_slot_set.iter());
2468 pubkeys_removed_from_accounts_index.extend(pubkeys_removed_from_accounts_index2);
2469
2470 self.handle_reclaims(
2471 (!reclaims.is_empty()).then(|| reclaims.iter()),
2472 None,
2473 &pubkeys_removed_from_accounts_index,
2474 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
2475 MarkAccountsObsolete::No,
2476 );
2477
2478 reclaims_time.stop();
2479 drop(active_guard);
2480
2481 measure_all.stop();
2482
2483 self.clean_accounts_stats.report();
2484 datapoint_info!(
2485 "clean_accounts",
2486 ("max_clean_root", max_clean_root_inclusive, Option<i64>),
2487 ("total_us", measure_all.as_us(), i64),
2488 (
2489 "collect_delta_keys_us",
2490 key_timings.collect_delta_keys_us,
2491 i64
2492 ),
2493 ("oldest_dirty_slot", key_timings.oldest_dirty_slot, i64),
2494 (
2495 "pubkeys_removed_from_accounts_index",
2496 pubkeys_removed_from_accounts_index.len(),
2497 i64
2498 ),
2499 (
2500 "dirty_ancient_stores",
2501 key_timings.dirty_ancient_stores,
2502 i64
2503 ),
2504 (
2505 "dirty_store_processing_us",
2506 key_timings.dirty_store_processing_us,
2507 i64
2508 ),
2509 ("construct_candidates_us", measure_construct_candidates.as_us(), i64),
2510 ("accounts_scan", accounts_scan.as_us(), i64),
2511 ("clean_old_rooted", clean_old_rooted.as_us(), i64),
2512 ("store_counts", store_counts_time.as_us(), i64),
2513 ("purge_filter", purge_filter.as_us(), i64),
2514 ("calc_deps", calc_deps_time.as_us(), i64),
2515 ("reclaims", reclaims_time.as_us(), i64),
2516 ("delta_insert_us", key_timings.delta_insert_us, i64),
2517 ("delta_key_count", key_timings.delta_key_count, i64),
2518 ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64),
2519 ("useful_keys", useful_accum.load(Ordering::Relaxed), i64),
2520 ("total_keys_count", num_candidates, i64),
2521 ("retained_keys_count", retained_keys_count, i64),
2522 (
2523 "scan_found_not_zero",
2524 found_not_zero_accum.load(Ordering::Relaxed),
2525 i64
2526 ),
2527 (
2528 "scan_not_found_on_fork",
2529 not_found_on_fork_accum.load(Ordering::Relaxed),
2530 i64
2531 ),
2532 ("scan_missing", missing_accum.load(Ordering::Relaxed), i64),
2533 (
2534 "get_account_sizes_us",
2535 self.clean_accounts_stats
2536 .get_account_sizes_us
2537 .swap(0, Ordering::Relaxed),
2538 i64
2539 ),
2540 (
2541 "slots_cleaned",
2542 self.clean_accounts_stats
2543 .slots_cleaned
2544 .swap(0, Ordering::Relaxed),
2545 i64
2546 ),
2547 (
2548 "clean_old_root_us",
2549 self.clean_accounts_stats
2550 .clean_old_root_us
2551 .swap(0, Ordering::Relaxed),
2552 i64
2553 ),
2554 (
2555 "clean_old_root_reclaim_us",
2556 self.clean_accounts_stats
2557 .clean_old_root_reclaim_us
2558 .swap(0, Ordering::Relaxed),
2559 i64
2560 ),
2561 (
2562 "remove_dead_accounts_remove_us",
2563 self.clean_accounts_stats
2564 .remove_dead_accounts_remove_us
2565 .swap(0, Ordering::Relaxed),
2566 i64
2567 ),
2568 (
2569 "remove_dead_accounts_shrink_us",
2570 self.clean_accounts_stats
2571 .remove_dead_accounts_shrink_us
2572 .swap(0, Ordering::Relaxed),
2573 i64
2574 ),
2575 (
2576 "clean_stored_dead_slots_us",
2577 self.clean_accounts_stats
2578 .clean_stored_dead_slots_us
2579 .swap(0, Ordering::Relaxed),
2580 i64
2581 ),
2582 (
2583 "roots_added",
2584 self.accounts_index.roots_added.swap(0, Ordering::Relaxed),
2585 i64
2586 ),
2587 (
2588 "purge_older_root_entries_one_slot_list",
2589 self.accounts_index
2590 .purge_older_root_entries_one_slot_list
2591 .swap(0, Ordering::Relaxed),
2592 i64
2593 ),
2594 (
2595 "roots_removed",
2596 self.accounts_index.roots_removed.swap(0, Ordering::Relaxed),
2597 i64
2598 ),
2599 (
2600 "active_scans",
2601 self.accounts_index.active_scans.load(Ordering::Relaxed),
2602 i64
2603 ),
2604 (
2605 "max_distance_to_min_scan_slot",
2606 self.accounts_index
2607 .max_distance_to_min_scan_slot
2608 .swap(0, Ordering::Relaxed),
2609 i64
2610 ),
2611 (
2612 "ancient_account_cleans",
2613 ancient_account_cleans.load(Ordering::Relaxed),
2614 i64
2615 ),
2616 (
2617 "purges_old_accounts_count",
2618 purges_old_accounts_count.load(Ordering::Relaxed),
2619 i64
2620 ),
2621 ("next_store_id", self.next_id.load(Ordering::Relaxed), i64),
2622 );
2623 }
2624
2625 fn handle_reclaims<'a, I>(
2656 &'a self,
2657 reclaims: Option<I>,
2658 expected_single_dead_slot: Option<Slot>,
2659 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2660 handle_reclaims: HandleReclaims<'a>,
2661 mark_accounts_obsolete: MarkAccountsObsolete,
2662 ) -> ReclaimResult
2663 where
2664 I: Iterator<Item = &'a (Slot, AccountInfo)>,
2665 {
2666 let mut reclaim_result = ReclaimResult::default();
2667 if let Some(reclaims) = reclaims {
2668 let (dead_slots, reclaimed_offsets) = self.remove_dead_accounts(
2669 reclaims,
2670 expected_single_dead_slot,
2671 mark_accounts_obsolete,
2672 );
2673 reclaim_result.1 = reclaimed_offsets;
2674 let HandleReclaims::ProcessDeadSlots(purge_stats) = handle_reclaims;
2675 if let Some(expected_single_dead_slot) = expected_single_dead_slot {
2676 assert!(dead_slots.len() <= 1);
2677 if dead_slots.len() == 1 {
2678 assert!(dead_slots.contains(&expected_single_dead_slot));
2679 }
2680 }
2681 let clean_stored_dead_slots =
2683 !matches!(mark_accounts_obsolete, MarkAccountsObsolete::Yes(_));
2684
2685 self.process_dead_slots(
2686 &dead_slots,
2687 Some(&mut reclaim_result.0),
2688 purge_stats,
2689 pubkeys_removed_from_accounts_index,
2690 clean_stored_dead_slots,
2691 );
2692 }
2693 reclaim_result
2694 }
2695
2696 fn filter_zero_lamport_clean_for_incremental_snapshots(
2720 &self,
2721 max_clean_root_inclusive: Option<Slot>,
2722 store_counts: &HashMap<Slot, (usize, HashSet<Pubkey>)>,
2723 candidates: &mut [HashMap<Pubkey, CleaningInfo>],
2724 ) {
2725 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2726 let should_filter_for_incremental_snapshots = max_clean_root_inclusive.unwrap_or(Slot::MAX)
2727 > latest_full_snapshot_slot.unwrap_or(Slot::MAX);
2728 assert!(
2729 latest_full_snapshot_slot.is_some() || !should_filter_for_incremental_snapshots,
2730 "if filtering for incremental snapshots, then snapshots should be enabled",
2731 );
2732
2733 for bin in candidates {
2734 bin.retain(|pubkey, cleaning_info| {
2735 let slot_list = &cleaning_info.slot_list;
2736 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2737 for (slot, _account_info) in slot_list.iter() {
2740 if let Some(store_count) = store_counts.get(slot) {
2741 if store_count.0 != 0 {
2742 return false;
2744 }
2745 } else {
2746 return false;
2748 }
2749 }
2750
2751 if !should_filter_for_incremental_snapshots {
2753 return true;
2754 }
2755
2756 let (slot, account_info) = slot_list
2759 .iter()
2760 .max_by_key(|(slot, _account_info)| slot)
2761 .unwrap();
2762
2763 assert!(account_info.is_zero_lamport());
2769 let cannot_purge = *slot > latest_full_snapshot_slot.unwrap();
2770 if cannot_purge {
2771 self.zero_lamport_accounts_to_purge_after_full_snapshot
2772 .insert((*slot, *pubkey));
2773 }
2774 !cannot_purge
2775 });
2776 }
2777 }
2778
2779 fn process_dead_slots(
2788 &self,
2789 dead_slots: &IntSet<Slot>,
2790 purged_account_slots: Option<&mut AccountSlots>,
2791 purge_stats: &PurgeStats,
2792 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2793 clean_stored_dead_slots: bool,
2794 ) {
2795 if dead_slots.is_empty() {
2796 return;
2797 }
2798 let mut clean_dead_slots = Measure::start("reclaims::clean_dead_slots");
2799
2800 if clean_stored_dead_slots {
2801 self.clean_stored_dead_slots(
2802 dead_slots,
2803 purged_account_slots,
2804 pubkeys_removed_from_accounts_index,
2805 );
2806 }
2807
2808 self.remove_dead_slots_metadata(dead_slots.iter());
2810
2811 clean_dead_slots.stop();
2812
2813 let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
2814 self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats);
2815 purge_removed_slots.stop();
2816
2817 {
2820 let mut list = self.shrink_candidate_slots.lock().unwrap();
2821 for slot in dead_slots {
2822 list.remove(slot);
2823 }
2824 }
2825
2826 debug!(
2827 "process_dead_slots({}): {} {} {:?}",
2828 dead_slots.len(),
2829 clean_dead_slots,
2830 purge_removed_slots,
2831 dead_slots,
2832 );
2833 }
2834
2835 fn load_accounts_index_for_shrink<'a, T: ShrinkCollectRefs<'a>>(
2840 &self,
2841 accounts: &'a [AccountFromStorage],
2842 stats: &ShrinkStats,
2843 slot_to_shrink: Slot,
2844 ) -> LoadAccountsIndexForShrink<'a, T> {
2845 let count = accounts.len();
2846 let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
2847 let mut pubkeys_to_unref = Vec::with_capacity(count);
2848 let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);
2849
2850 let mut alive = 0;
2851 let mut dead = 0;
2852 let mut index = 0;
2853 let mut index_scan_returned_some_count = 0;
2854 let mut index_scan_returned_none_count = 0;
2855 let mut all_are_zero_lamports = true;
2856 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2857 self.accounts_index.scan(
2858 accounts.iter().map(|account| account.pubkey()),
2859 |pubkey, slots_refs, _entry| {
2860 let stored_account = &accounts[index];
2861 let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
2862 if stored_account.is_zero_lamport()
2863 && ref_count == 1
2864 && latest_full_snapshot_slot
2865 .map(|latest_full_snapshot_slot| {
2866 latest_full_snapshot_slot >= slot_to_shrink
2867 })
2868 .unwrap_or(true)
2869 {
2870 zero_lamport_single_ref_pubkeys.push(pubkey);
2873 self.add_uncleaned_pubkeys_after_shrink(
2874 slot_to_shrink,
2875 [*pubkey].into_iter(),
2876 );
2877 } else {
2878 all_are_zero_lamports &= stored_account.is_zero_lamport();
2879 alive_accounts.add(ref_count, stored_account, slot_list);
2880 alive += 1;
2881 }
2882 };
2883 if let Some((slot_list, ref_count)) = slots_refs {
2884 index_scan_returned_some_count += 1;
2885 let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
2886 *slot == slot_to_shrink
2888 });
2889
2890 if !is_alive {
2891 pubkeys_to_unref.push(pubkey);
2896 dead += 1;
2897 } else {
2898 do_populate_accounts_for_shrink(ref_count, slot_list);
2899 }
2900 } else {
2901 index_scan_returned_none_count += 1;
2902 let ref_count = 1;
2908 let slot_list = [(slot_to_shrink, AccountInfo::default())];
2909 do_populate_accounts_for_shrink(ref_count, &slot_list);
2910 }
2911 index += 1;
2912 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2913 },
2914 None,
2915 false,
2916 self.scan_filter_for_shrinking,
2917 );
2918 assert_eq!(index, std::cmp::min(accounts.len(), count));
2919 stats
2920 .index_scan_returned_some
2921 .fetch_add(index_scan_returned_some_count, Ordering::Relaxed);
2922 stats
2923 .index_scan_returned_none
2924 .fetch_add(index_scan_returned_none_count, Ordering::Relaxed);
2925 stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
2926 stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);
2927
2928 LoadAccountsIndexForShrink {
2929 alive_accounts,
2930 pubkeys_to_unref,
2931 zero_lamport_single_ref_pubkeys,
2932 all_are_zero_lamports,
2933 }
2934 }
2935
2936 pub fn get_unique_accounts_from_storage(
2939 &self,
2940 store: &AccountStorageEntry,
2941 ) -> GetUniqueAccountsResult {
2942 let capacity = store.capacity();
2943 let mut stored_accounts = Vec::with_capacity(store.count());
2944 store
2945 .accounts
2946 .scan_accounts_without_data(|offset, account| {
2947 let file_id = 0;
2949 stored_accounts.push(AccountFromStorage {
2950 index_info: AccountInfo::new(
2951 StorageLocation::AppendVec(file_id, offset),
2952 account.is_zero_lamport(),
2953 ),
2954 pubkey: *account.pubkey(),
2955 data_len: account.data_len as u64,
2956 });
2957 })
2958 .expect("must scan accounts storage");
2959
2960 let num_duplicated_accounts = Self::sort_and_remove_dups(&mut stored_accounts);
2962
2963 GetUniqueAccountsResult {
2964 stored_accounts,
2965 capacity,
2966 num_duplicated_accounts,
2967 }
2968 }
2969
2970 #[cfg(feature = "dev-context-only-utils")]
2971 pub fn set_storage_access(&mut self, storage_access: StorageAccess) {
2972 self.storage_access = storage_access;
2973 }
2974
2975 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
2980 fn sort_and_remove_dups(accounts: &mut Vec<AccountFromStorage>) -> usize {
2981 accounts.sort_by(|a, b| a.pubkey().cmp(b.pubkey()));
2983 let len0 = accounts.len();
2984 if accounts.len() > 1 {
2985 let mut last = 0;
2986 let mut curr = 1;
2987
2988 while curr < accounts.len() {
2989 if accounts[curr].pubkey() != accounts[last].pubkey() {
2990 last += 1;
2991 }
2992 accounts[last] = accounts[curr];
2993 curr += 1;
2994 }
2995 accounts.truncate(last + 1);
2996 }
2997 len0 - accounts.len()
2998 }
2999
3000 pub(crate) fn get_unique_accounts_from_storage_for_shrink(
3001 &self,
3002 store: &AccountStorageEntry,
3003 stats: &ShrinkStats,
3004 ) -> GetUniqueAccountsResult {
3005 let (result, storage_read_elapsed_us) =
3006 measure_us!(self.get_unique_accounts_from_storage(store));
3007 stats
3008 .storage_read_elapsed
3009 .fetch_add(storage_read_elapsed_us, Ordering::Relaxed);
3010 stats
3011 .num_duplicated_accounts
3012 .fetch_add(result.num_duplicated_accounts as u64, Ordering::Relaxed);
3013 result
3014 }
3015
3016 pub(crate) fn shrink_collect<'a: 'b, 'b, T: ShrinkCollectRefs<'b>>(
3019 &self,
3020 store: &'a AccountStorageEntry,
3021 unique_accounts: &'b mut GetUniqueAccountsResult,
3022 stats: &ShrinkStats,
3023 ) -> ShrinkCollect<'b, T> {
3024 let slot = store.slot();
3025
3026 let GetUniqueAccountsResult {
3027 stored_accounts,
3028 capacity,
3029 num_duplicated_accounts,
3030 } = unique_accounts;
3031
3032 let mut index_read_elapsed = Measure::start("index_read_elapsed");
3033
3034 let len = stored_accounts.len();
3035 let alive_accounts_collect = Mutex::new(T::with_capacity(len, slot));
3036 let pubkeys_to_unref_collect = Mutex::new(Vec::with_capacity(len));
3037 let zero_lamport_single_ref_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
3038
3039 let obsolete_offsets: IntSet<_> = store
3043 .get_obsolete_accounts(None)
3044 .into_iter()
3045 .map(|(offset, _)| offset)
3046 .collect();
3047
3048 let initial_len = stored_accounts.len();
3050 stored_accounts.retain(|account| !obsolete_offsets.contains(&account.index_info.offset()));
3051 let obsolete_accounts_filtered = initial_len - stored_accounts.len();
3052
3053 stats
3054 .accounts_loaded
3055 .fetch_add(len as u64, Ordering::Relaxed);
3056 stats
3057 .num_duplicated_accounts
3058 .fetch_add(*num_duplicated_accounts as u64, Ordering::Relaxed);
3059 let all_are_zero_lamports_collect = Mutex::new(true);
3060 self.thread_pool_background.install(|| {
3061 stored_accounts
3062 .par_chunks(SHRINK_COLLECT_CHUNK_SIZE)
3063 .for_each(|stored_accounts| {
3064 let LoadAccountsIndexForShrink {
3065 alive_accounts,
3066 mut pubkeys_to_unref,
3067 all_are_zero_lamports,
3068 mut zero_lamport_single_ref_pubkeys,
3069 } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
3070
3071 alive_accounts_collect
3073 .lock()
3074 .unwrap()
3075 .collect(alive_accounts);
3076 pubkeys_to_unref_collect
3077 .lock()
3078 .unwrap()
3079 .append(&mut pubkeys_to_unref);
3080 zero_lamport_single_ref_pubkeys_collect
3081 .lock()
3082 .unwrap()
3083 .append(&mut zero_lamport_single_ref_pubkeys);
3084 if !all_are_zero_lamports {
3085 *all_are_zero_lamports_collect.lock().unwrap() = false;
3086 }
3087 });
3088 });
3089
3090 let alive_accounts = alive_accounts_collect.into_inner().unwrap();
3091 let pubkeys_to_unref = pubkeys_to_unref_collect.into_inner().unwrap();
3092 let zero_lamport_single_ref_pubkeys = zero_lamport_single_ref_pubkeys_collect
3093 .into_inner()
3094 .unwrap();
3095
3096 index_read_elapsed.stop();
3097
3098 stats
3099 .obsolete_accounts_filtered
3100 .fetch_add(obsolete_accounts_filtered as u64, Ordering::Relaxed);
3101
3102 stats
3103 .index_read_elapsed
3104 .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
3105
3106 let alive_total_bytes = alive_accounts.alive_bytes();
3107
3108 stats
3109 .accounts_removed
3110 .fetch_add(len - alive_accounts.len(), Ordering::Relaxed);
3111 stats.bytes_removed.fetch_add(
3112 capacity.saturating_sub(alive_total_bytes as u64),
3113 Ordering::Relaxed,
3114 );
3115 stats
3116 .bytes_written
3117 .fetch_add(alive_total_bytes as u64, Ordering::Relaxed);
3118
3119 ShrinkCollect {
3120 slot,
3121 capacity: *capacity,
3122 pubkeys_to_unref,
3123 zero_lamport_single_ref_pubkeys,
3124 alive_accounts,
3125 alive_total_bytes,
3126 total_starting_accounts: len,
3127 all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
3128 }
3129 }
3130
3131 fn remove_zero_lamport_single_ref_accounts_after_shrink(
3142 &self,
3143 zero_lamport_single_ref_pubkeys: &[&Pubkey],
3144 slot: Slot,
3145 stats: &ShrinkStats,
3146 do_assert: bool,
3147 ) {
3148 stats.purged_zero_lamports.fetch_add(
3149 zero_lamport_single_ref_pubkeys.len() as u64,
3150 Ordering::Relaxed,
3151 );
3152
3153 self.accounts_index.scan(
3157 zero_lamport_single_ref_pubkeys.iter().cloned(),
3158 |_pubkey, _slots_refs, _entry| AccountsIndexScanResult::Unref,
3159 if do_assert {
3160 Some(AccountsIndexScanResult::UnrefAssert0)
3161 } else {
3162 Some(AccountsIndexScanResult::UnrefLog0)
3163 },
3164 false,
3165 ScanFilter::All,
3166 );
3167
3168 zero_lamport_single_ref_pubkeys.iter().for_each(|k| {
3169 _ = self.purge_keys_exact([&(**k, slot)].into_iter());
3170 });
3171 }
3172
3173 pub(crate) fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
3176 &self,
3177 shrink_collect: &ShrinkCollect<'a, T>,
3178 stats: &ShrinkStats,
3179 shrink_in_progress: Option<ShrinkInProgress>,
3180 shrink_can_be_active: bool,
3181 ) {
3182 let mut time = Measure::start("remove_old_stores_shrink");
3183
3184 self.remove_zero_lamport_single_ref_accounts_after_shrink(
3188 &shrink_collect.zero_lamport_single_ref_pubkeys,
3189 shrink_collect.slot,
3190 stats,
3191 false,
3192 );
3193
3194 let dead_storages = self.mark_dirty_dead_stores(
3198 shrink_collect.slot,
3199 shrink_collect.all_are_zero_lamports,
3202 shrink_in_progress,
3203 shrink_can_be_active,
3204 );
3205 let dead_storages_len = dead_storages.len();
3206
3207 if !shrink_collect.all_are_zero_lamports {
3208 self.add_uncleaned_pubkeys_after_shrink(
3209 shrink_collect.slot,
3210 shrink_collect.pubkeys_to_unref.iter().cloned().cloned(),
3211 );
3212 }
3213
3214 let (_, drop_storage_entries_elapsed) = measure_us!(drop(dead_storages));
3215 time.stop();
3216
3217 self.stats
3218 .dropped_stores
3219 .fetch_add(dead_storages_len as u64, Ordering::Relaxed);
3220 stats
3221 .drop_storage_entries_elapsed
3222 .fetch_add(drop_storage_entries_elapsed, Ordering::Relaxed);
3223 stats
3224 .remove_old_stores_shrink_us
3225 .fetch_add(time.as_us(), Ordering::Relaxed);
3226 }
3227
3228 pub(crate) fn unref_shrunk_dead_accounts<'a>(
3229 &self,
3230 pubkeys: impl Iterator<Item = &'a Pubkey>,
3231 slot: Slot,
3232 ) {
3233 self.accounts_index.scan(
3234 pubkeys,
3235 |pubkey, slot_refs, _entry| {
3236 match slot_refs {
3237 Some((slot_list, ref_count)) => {
3238 if slot_list.len() == 1 && ref_count == 2 {
3240 if let Some((slot_alive, acct_info)) = slot_list.first() {
3241 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
3242 self.zero_lamport_single_ref_found(
3243 *slot_alive,
3244 acct_info.offset(),
3245 );
3246 }
3247 }
3248 }
3249 }
3250 None => {
3251 warn!(
3255 "pubkey {pubkey} in slot {slot} was NOT found in accounts index \
3256 during shrink"
3257 );
3258 datapoint_warn!(
3259 "accounts_db-shink_pubkey_missing_from_index",
3260 ("store_slot", slot, i64),
3261 ("pubkey", pubkey.to_string(), String),
3262 );
3263 }
3264 }
3265 AccountsIndexScanResult::Unref
3266 },
3267 None,
3268 false,
3269 ScanFilter::All,
3270 );
3271 }
3272
3273 pub(crate) fn zero_lamport_single_ref_found(&self, slot: Slot, offset: Offset) {
3275 if let Some(store) = self
3288 .storage
3289 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3290 {
3291 if store.insert_zero_lamport_single_ref_account_offset(offset) {
3292 self.shrink_stats
3294 .num_zero_lamport_single_ref_accounts_found
3295 .fetch_add(1, Ordering::Relaxed);
3296
3297 if store.num_zero_lamport_single_ref_accounts() == store.count() {
3298 self.dirty_stores.entry(slot).or_insert(store);
3300 self.shrink_stats
3301 .num_dead_slots_added_to_clean
3302 .fetch_add(1, Ordering::Relaxed);
3303 } else if Self::is_shrinking_productive(&store)
3304 && self.is_candidate_for_shrink(&store)
3305 {
3306 let is_new = self.shrink_candidate_slots.lock().unwrap().insert(slot);
3308 if is_new {
3309 self.shrink_stats
3310 .num_slots_with_zero_lamport_accounts_added_to_shrink
3311 .fetch_add(1, Ordering::Relaxed);
3312 }
3313 } else {
3314 self.shrink_stats
3315 .marking_zero_dead_accounts_in_non_shrinkable_store
3316 .fetch_add(1, Ordering::Relaxed);
3317 }
3318 }
3319 }
3320 }
3321
3322 fn shrink_storage(&self, store: Arc<AccountStorageEntry>) {
3324 let slot = store.slot();
3325 if self.accounts_cache.contains(slot) {
3326 return;
3338 }
3339 let mut unique_accounts =
3340 self.get_unique_accounts_from_storage_for_shrink(&store, &self.shrink_stats);
3341 debug!("do_shrink_slot_store: slot: {slot}");
3342 let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
3343 &store,
3344 &mut unique_accounts,
3345 &self.shrink_stats,
3346 );
3347
3348 if Self::should_not_shrink(
3351 shrink_collect.alive_total_bytes as u64,
3352 shrink_collect.capacity,
3353 ) || shrink_collect.alive_total_bytes == 0
3354 {
3355 if shrink_collect.alive_total_bytes == 0 {
3356 self.dirty_stores.insert(slot, store.clone());
3358 }
3359
3360 if !shrink_collect.all_are_zero_lamports {
3361 info!(
3363 "Unexpected shrink for slot {} alive {} capacity {}, likely caused by a bug \
3364 for calculating alive bytes.",
3365 slot, shrink_collect.alive_total_bytes, shrink_collect.capacity
3366 );
3367 }
3368
3369 self.shrink_stats
3370 .skipped_shrink
3371 .fetch_add(1, Ordering::Relaxed);
3372 return;
3373 }
3374
3375 self.unref_shrunk_dead_accounts(shrink_collect.pubkeys_to_unref.iter().cloned(), slot);
3376
3377 let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
3378 debug!(
3379 "shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
3380 slot,
3381 shrink_collect.total_starting_accounts,
3382 total_accounts_after_shrink,
3383 shrink_collect.alive_total_bytes,
3384 shrink_collect.capacity,
3385 );
3386
3387 let mut stats_sub = ShrinkStatsSub::default();
3388 let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
3389 let (shrink_in_progress, time_us) =
3390 measure_us!(self.get_store_for_shrink(slot, shrink_collect.alive_total_bytes as u64));
3391 stats_sub.create_and_insert_store_elapsed_us = Saturating(time_us);
3392
3393 let accounts = [(slot, &shrink_collect.alive_accounts.alive_accounts()[..])];
3397 let storable_accounts = StorableAccountsBySlot::new(slot, &accounts, self);
3398 stats_sub.store_accounts_timing = self.store_accounts_frozen(
3399 storable_accounts,
3400 shrink_in_progress.new_storage(),
3401 UpdateIndexThreadSelection::PoolWithThreshold,
3402 );
3403
3404 rewrite_elapsed.stop();
3405 stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us());
3406
3407 self.shrink_candidate_slots.lock().unwrap().remove(&slot);
3413
3414 self.remove_old_stores_shrink(
3415 &shrink_collect,
3416 &self.shrink_stats,
3417 Some(shrink_in_progress),
3418 false,
3419 );
3420
3421 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
3422
3423 Self::update_shrink_stats(&self.shrink_stats, stats_sub, true);
3424 self.shrink_stats.report();
3425 }
3426
3427 pub(crate) fn update_shrink_stats(
3428 shrink_stats: &ShrinkStats,
3429 stats_sub: ShrinkStatsSub,
3430 increment_count: bool,
3431 ) {
3432 if increment_count {
3433 shrink_stats
3434 .num_slots_shrunk
3435 .fetch_add(1, Ordering::Relaxed);
3436 }
3437 shrink_stats.create_and_insert_store_elapsed.fetch_add(
3438 stats_sub.create_and_insert_store_elapsed_us.0,
3439 Ordering::Relaxed,
3440 );
3441 shrink_stats.store_accounts_elapsed.fetch_add(
3442 stats_sub.store_accounts_timing.store_accounts_elapsed,
3443 Ordering::Relaxed,
3444 );
3445 shrink_stats.update_index_elapsed.fetch_add(
3446 stats_sub.store_accounts_timing.update_index_elapsed,
3447 Ordering::Relaxed,
3448 );
3449 shrink_stats.handle_reclaims_elapsed.fetch_add(
3450 stats_sub.store_accounts_timing.handle_reclaims_elapsed,
3451 Ordering::Relaxed,
3452 );
3453 shrink_stats
3454 .rewrite_elapsed
3455 .fetch_add(stats_sub.rewrite_elapsed_us.0, Ordering::Relaxed);
3456 shrink_stats
3457 .unpackable_slots_count
3458 .fetch_add(stats_sub.unpackable_slots_count.0 as u64, Ordering::Relaxed);
3459 shrink_stats.newest_alive_packed_count.fetch_add(
3460 stats_sub.newest_alive_packed_count.0 as u64,
3461 Ordering::Relaxed,
3462 );
3463 }
3464
3465 pub fn mark_dirty_dead_stores(
3470 &self,
3471 slot: Slot,
3472 add_dirty_stores: bool,
3473 shrink_in_progress: Option<ShrinkInProgress>,
3474 shrink_can_be_active: bool,
3475 ) -> Vec<Arc<AccountStorageEntry>> {
3476 let mut dead_storages = Vec::default();
3477
3478 let mut not_retaining_store = |store: &Arc<AccountStorageEntry>| {
3479 if add_dirty_stores {
3480 self.dirty_stores.insert(slot, store.clone());
3481 }
3482 dead_storages.push(store.clone());
3483 };
3484
3485 if let Some(shrink_in_progress) = shrink_in_progress {
3486 not_retaining_store(shrink_in_progress.old_storage());
3488 } else if let Some(store) = self.storage.remove(&slot, shrink_can_be_active) {
3490 not_retaining_store(&store);
3492 }
3493
3494 dead_storages
3495 }
3496
3497 pub(crate) fn reopen_storage_as_readonly_shrinking_in_progress_ok(&self, slot: Slot) {
3500 if let Some(storage) = self
3501 .storage
3502 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3503 {
3504 if let Some(new_storage) = storage.reopen_as_readonly(self.storage_access) {
3505 assert_eq!(storage.id(), new_storage.id());
3510 assert_eq!(storage.accounts.len(), new_storage.accounts.len());
3511 self.storage
3512 .replace_storage_with_equivalent(slot, Arc::new(new_storage));
3513 }
3514 }
3515 }
3516
3517 pub fn get_store_for_shrink(&self, slot: Slot, size: u64) -> ShrinkInProgress<'_> {
3519 let shrunken_store = self.create_store(slot, size, "shrink", self.shrink_paths.as_slice());
3520 self.storage.shrinking_in_progress(slot, shrunken_store)
3521 }
3522
3523 fn shrink_slot_forced(&self, slot: Slot) {
3526 debug!("shrink_slot_forced: slot: {slot}");
3527
3528 if let Some(store) = self
3529 .storage
3530 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3531 {
3532 if Self::is_shrinking_productive(&store) {
3533 self.shrink_storage(store)
3534 }
3535 }
3536 }
3537
3538 fn all_slots_in_storage(&self) -> Vec<Slot> {
3539 self.storage.all_slots()
3540 }
3541
3542 fn select_candidates_by_total_usage(
3550 &self,
3551 shrink_slots: &ShrinkCandidates,
3552 shrink_ratio: f64,
3553 ) -> (IntMap<Slot, Arc<AccountStorageEntry>>, ShrinkCandidates) {
3554 struct StoreUsageInfo {
3555 slot: Slot,
3556 alive_ratio: f64,
3557 store: Arc<AccountStorageEntry>,
3558 }
3559 let mut store_usage: Vec<StoreUsageInfo> = Vec::with_capacity(shrink_slots.len());
3560 let mut total_alive_bytes: u64 = 0;
3561 let mut total_bytes: u64 = 0;
3562 for slot in shrink_slots {
3563 let Some(store) = self.storage.get_slot_storage_entry(*slot) else {
3564 continue;
3565 };
3566 let alive_bytes = store.alive_bytes();
3567 total_alive_bytes += alive_bytes as u64;
3568 total_bytes += store.capacity();
3569 let alive_ratio = alive_bytes as f64 / store.capacity() as f64;
3570 store_usage.push(StoreUsageInfo {
3571 slot: *slot,
3572 alive_ratio,
3573 store: store.clone(),
3574 });
3575 }
3576 store_usage.sort_by(|a, b| {
3577 a.alive_ratio
3578 .partial_cmp(&b.alive_ratio)
3579 .unwrap_or(std::cmp::Ordering::Equal)
3580 });
3581
3582 let mut shrink_slots = IntMap::default();
3585 let mut shrink_slots_next_batch = ShrinkCandidates::default();
3586 for usage in &store_usage {
3587 let store = &usage.store;
3588 let alive_ratio = (total_alive_bytes as f64) / (total_bytes as f64);
3589 debug!(
3590 "alive_ratio: {:?} store_id: {:?}, store_ratio: {:?} requirement: {:?}, \
3591 total_bytes: {:?} total_alive_bytes: {:?}",
3592 alive_ratio,
3593 usage.store.id(),
3594 usage.alive_ratio,
3595 shrink_ratio,
3596 total_bytes,
3597 total_alive_bytes
3598 );
3599 if alive_ratio > shrink_ratio {
3600 debug!(
3602 "Shrinking goal can be achieved at slot {:?}, total_alive_bytes: {:?} \
3603 total_bytes: {:?}, alive_ratio: {:}, shrink_ratio: {:?}",
3604 usage.slot, total_alive_bytes, total_bytes, alive_ratio, shrink_ratio
3605 );
3606 if usage.alive_ratio < shrink_ratio {
3607 shrink_slots_next_batch.insert(usage.slot);
3608 } else {
3609 break;
3610 }
3611 } else {
3612 let current_store_size = store.capacity();
3613 let after_shrink_size = store.alive_bytes() as u64;
3614 let bytes_saved = current_store_size.saturating_sub(after_shrink_size);
3615 total_bytes -= bytes_saved;
3616 shrink_slots.insert(usage.slot, Arc::clone(store));
3617 }
3618 }
3619 (shrink_slots, shrink_slots_next_batch)
3620 }
3621
3622 fn get_roots_less_than(&self, slot: Slot) -> Vec<Slot> {
3623 self.accounts_index
3624 .roots_tracker
3625 .read()
3626 .unwrap()
3627 .alive_roots
3628 .get_all_less_than(slot)
3629 }
3630
3631 fn get_sorted_potential_ancient_slots(&self, oldest_non_ancient_slot: Slot) -> Vec<Slot> {
3635 let mut ancient_slots = self.get_roots_less_than(oldest_non_ancient_slot);
3636 ancient_slots.sort_unstable();
3637 ancient_slots
3638 }
3639
3640 pub fn shrink_ancient_slots(&self, epoch_schedule: &EpochSchedule) {
3643 if self.ancient_append_vec_offset.is_none() {
3644 return;
3645 }
3646
3647 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3648 let can_randomly_shrink = true;
3649 let sorted_slots = self.get_sorted_potential_ancient_slots(oldest_non_ancient_slot);
3650 self.combine_ancient_slots_packed(sorted_slots, can_randomly_shrink);
3651 }
3652
3653 pub(crate) fn handle_dropped_roots_for_ancient(
3656 &self,
3657 dropped_roots: impl Iterator<Item = Slot>,
3658 ) {
3659 dropped_roots.for_each(|slot| {
3660 self.accounts_index.clean_dead_slot(slot);
3661 assert!(self.storage.remove(&slot, false).is_none());
3663 debug_assert!(
3664 !self
3665 .accounts_index
3666 .roots_tracker
3667 .read()
3668 .unwrap()
3669 .alive_roots
3670 .contains(&slot),
3671 "slot: {slot}"
3672 );
3673 });
3674 }
3675
3676 fn add_uncleaned_pubkeys_after_shrink(
3679 &self,
3680 slot: Slot,
3681 pubkeys: impl Iterator<Item = Pubkey>,
3682 ) {
3683 let mut uncleaned_pubkeys = self.uncleaned_pubkeys.entry(slot).or_default();
3703 uncleaned_pubkeys.extend(pubkeys);
3704 }
3705
3706 pub fn shrink_candidate_slots(&self, epoch_schedule: &EpochSchedule) -> usize {
3707 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3708
3709 let shrink_candidates_slots =
3710 std::mem::take(&mut *self.shrink_candidate_slots.lock().unwrap());
3711 self.shrink_stats
3712 .initial_candidates_count
3713 .store(shrink_candidates_slots.len() as u64, Ordering::Relaxed);
3714
3715 let candidates_count = shrink_candidates_slots.len();
3716 let ((mut shrink_slots, shrink_slots_next_batch), select_time_us) = measure_us!({
3717 if let AccountShrinkThreshold::TotalSpace { shrink_ratio } = self.shrink_ratio {
3718 let (shrink_slots, shrink_slots_next_batch) =
3719 self.select_candidates_by_total_usage(&shrink_candidates_slots, shrink_ratio);
3720 (shrink_slots, Some(shrink_slots_next_batch))
3721 } else {
3722 (
3723 shrink_candidates_slots
3725 .into_iter()
3726 .filter_map(|slot| {
3727 self.storage
3728 .get_slot_storage_entry(slot)
3729 .map(|storage| (slot, storage))
3730 })
3731 .collect(),
3732 None,
3733 )
3734 }
3735 });
3736
3737 if shrink_slots.len() < SHRINK_INSERT_ANCIENT_THRESHOLD {
3740 let mut ancients = self.best_ancient_slots_to_shrink.write().unwrap();
3741 while let Some((slot, capacity)) = ancients.pop_front() {
3742 if let Some(store) = self.storage.get_slot_storage_entry(slot) {
3743 if !shrink_slots.contains(&slot)
3744 && capacity == store.capacity()
3745 && Self::is_candidate_for_shrink(self, &store)
3746 {
3747 let ancient_bytes_added_to_shrink = store.alive_bytes() as u64;
3748 shrink_slots.insert(slot, store);
3749 self.shrink_stats
3750 .ancient_bytes_added_to_shrink
3751 .fetch_add(ancient_bytes_added_to_shrink, Ordering::Relaxed);
3752 self.shrink_stats
3753 .ancient_slots_added_to_shrink
3754 .fetch_add(1, Ordering::Relaxed);
3755 break;
3756 }
3757 }
3758 }
3759 }
3760 if shrink_slots.is_empty()
3761 && shrink_slots_next_batch
3762 .as_ref()
3763 .map(|s| s.is_empty())
3764 .unwrap_or(true)
3765 {
3766 return 0;
3767 }
3768
3769 let _guard = (!shrink_slots.is_empty())
3770 .then_some(|| self.active_stats.activate(ActiveStatItem::Shrink));
3771
3772 let num_selected = shrink_slots.len();
3773 let (_, shrink_all_us) = measure_us!({
3774 self.thread_pool_background.install(|| {
3775 shrink_slots
3776 .into_par_iter()
3777 .for_each(|(slot, slot_shrink_candidate)| {
3778 if self.ancient_append_vec_offset.is_some()
3779 && slot < oldest_non_ancient_slot
3780 {
3781 self.shrink_stats
3782 .num_ancient_slots_shrunk
3783 .fetch_add(1, Ordering::Relaxed);
3784 }
3785 self.shrink_storage(slot_shrink_candidate);
3786 });
3787 })
3788 });
3789
3790 let mut pended_counts: usize = 0;
3791 if let Some(shrink_slots_next_batch) = shrink_slots_next_batch {
3792 let mut shrink_slots = self.shrink_candidate_slots.lock().unwrap();
3793 pended_counts = shrink_slots_next_batch.len();
3794 for slot in shrink_slots_next_batch {
3795 shrink_slots.insert(slot);
3796 }
3797 }
3798
3799 datapoint_info!(
3800 "shrink_candidate_slots",
3801 ("select_time_us", select_time_us, i64),
3802 ("shrink_all_us", shrink_all_us, i64),
3803 ("candidates_count", candidates_count, i64),
3804 ("selected_count", num_selected, i64),
3805 ("deferred_to_next_round_count", pended_counts, i64)
3806 );
3807
3808 num_selected
3809 }
3810
3811 pub fn shrink_all_slots(
3816 &self,
3817 is_startup: bool,
3818 epoch_schedule: &EpochSchedule,
3819 newest_slot_skip_shrink_inclusive: Option<Slot>,
3820 ) {
3821 let _guard = self.active_stats.activate(ActiveStatItem::Shrink);
3822 const DIRTY_STORES_CLEANING_THRESHOLD: usize = 10_000;
3823 const OUTER_CHUNK_SIZE: usize = 2000;
3824 let mut slots = self.all_slots_in_storage();
3825 if let Some(newest_slot_skip_shrink_inclusive) = newest_slot_skip_shrink_inclusive {
3826 slots.retain(|slot| slot < &newest_slot_skip_shrink_inclusive);
3829 }
3830
3831 let maybe_clean = || {
3840 if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
3841 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
3842 self.clean_accounts(latest_full_snapshot_slot, is_startup, epoch_schedule);
3843 }
3844 };
3845
3846 if is_startup {
3847 let threads = num_cpus::get();
3848 let inner_chunk_size = std::cmp::max(OUTER_CHUNK_SIZE / threads, 1);
3849 slots.chunks(OUTER_CHUNK_SIZE).for_each(|chunk| {
3850 chunk.par_chunks(inner_chunk_size).for_each(|slots| {
3851 for slot in slots {
3852 self.shrink_slot_forced(*slot);
3853 }
3854 });
3855 maybe_clean();
3856 });
3857 } else {
3858 for slot in slots {
3859 self.shrink_slot_forced(slot);
3860 maybe_clean();
3861 }
3862 }
3863 }
3864
3865 pub fn scan_accounts<F>(
3866 &self,
3867 ancestors: &Ancestors,
3868 bank_id: BankId,
3869 mut scan_func: F,
3870 config: &ScanConfig,
3871 ) -> ScanResult<()>
3872 where
3873 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3874 {
3875 self.accounts_index.scan_accounts(
3877 ancestors,
3878 bank_id,
3879 |pubkey, (account_info, slot)| {
3880 let mut account_accessor =
3881 self.get_account_accessor(slot, pubkey, &account_info.storage_location());
3882
3883 let account_slot = match account_accessor {
3884 LoadedAccountAccessor::Cached(None) => None,
3885 _ => account_accessor.get_loaded_account(|loaded_account| {
3886 (pubkey, loaded_account.take_account(), slot)
3887 }),
3888 };
3889 scan_func(account_slot)
3890 },
3891 config,
3892 )?;
3893
3894 Ok(())
3895 }
3896
3897 #[cfg(feature = "dev-context-only-utils")]
3898 pub fn unchecked_scan_accounts<F>(
3899 &self,
3900 metric_name: &'static str,
3901 ancestors: &Ancestors,
3902 mut scan_func: F,
3903 config: &ScanConfig,
3904 ) where
3905 F: FnMut(&Pubkey, LoadedAccount, Slot),
3906 {
3907 self.accounts_index.unchecked_scan_accounts(
3908 metric_name,
3909 ancestors,
3910 |pubkey, (account_info, slot)| {
3911 self.get_account_accessor(slot, pubkey, &account_info.storage_location())
3912 .get_loaded_account(|loaded_account| {
3913 scan_func(pubkey, loaded_account, slot);
3914 });
3915 },
3916 config,
3917 );
3918 }
3919
3920 pub fn index_scan_accounts<F>(
3921 &self,
3922 ancestors: &Ancestors,
3923 bank_id: BankId,
3924 index_key: IndexKey,
3925 mut scan_func: F,
3926 config: &ScanConfig,
3927 ) -> ScanResult<bool>
3928 where
3929 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3930 {
3931 let key = match &index_key {
3932 IndexKey::ProgramId(key) => key,
3933 IndexKey::SplTokenMint(key) => key,
3934 IndexKey::SplTokenOwner(key) => key,
3935 };
3936 if !self.account_indexes.include_key(key) {
3937 let used_index = false;
3939 self.scan_accounts(ancestors, bank_id, scan_func, config)?;
3940 return Ok(used_index);
3941 }
3942
3943 self.accounts_index.index_scan_accounts(
3944 ancestors,
3945 bank_id,
3946 index_key,
3947 |pubkey, (account_info, slot)| {
3948 let account_slot = self
3949 .get_account_accessor(slot, pubkey, &account_info.storage_location())
3950 .get_loaded_account(|loaded_account| {
3951 (pubkey, loaded_account.take_account(), slot)
3952 });
3953 scan_func(account_slot)
3954 },
3955 config,
3956 )?;
3957 let used_index = true;
3958 Ok(used_index)
3959 }
3960
3961 pub(crate) fn scan_account_storage<R, B>(
3963 &self,
3964 slot: Slot,
3965 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
3966 storage_scan_func: impl for<'a, 'b, 'storage> Fn(
3967 &'b mut B,
3968 &'a StoredAccountInfoWithoutData<'storage>,
3969 Option<&'storage [u8]>, ) + Sync,
3971 scan_account_storage_data: ScanAccountStorageData,
3972 ) -> ScanStorageResult<R, B>
3973 where
3974 R: Send,
3975 B: Send + Default + Sync,
3976 {
3977 self.scan_cache_storage_fallback(slot, cache_map_func, |retval, storage| {
3978 match scan_account_storage_data {
3979 ScanAccountStorageData::NoData => {
3980 storage.scan_accounts_without_data(|_offset, account_without_data| {
3981 storage_scan_func(retval, &account_without_data, None);
3982 })
3983 }
3984 ScanAccountStorageData::DataRefForStorage => {
3985 let mut reader = append_vec::new_scan_accounts_reader();
3986 storage.scan_accounts(&mut reader, |_offset, account| {
3987 let account_without_data = StoredAccountInfoWithoutData::new_from(&account);
3988 storage_scan_func(retval, &account_without_data, Some(account.data));
3989 })
3990 }
3991 }
3992 .expect("must scan accounts storage");
3993 })
3994 }
3995
3996 pub fn scan_cache_storage_fallback<R, B>(
3998 &self,
3999 slot: Slot,
4000 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
4001 storage_fallback_func: impl Fn(&mut B, &AccountsFile) + Sync,
4002 ) -> ScanStorageResult<R, B>
4003 where
4004 R: Send,
4005 B: Send + Default + Sync,
4006 {
4007 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
4008 if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD {
4011 ScanStorageResult::Cached(self.thread_pool_foreground.install(|| {
4012 slot_cache
4013 .par_iter()
4014 .filter_map(|cached_account| {
4015 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
4016 cached_account.value(),
4017 )))
4018 })
4019 .collect()
4020 }))
4021 } else {
4022 ScanStorageResult::Cached(
4023 slot_cache
4024 .iter()
4025 .filter_map(|cached_account| {
4026 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
4027 cached_account.value(),
4028 )))
4029 })
4030 .collect(),
4031 )
4032 }
4033 } else {
4034 let mut retval = B::default();
4035 if let Some(storage) = self
4048 .storage
4049 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
4050 {
4051 storage_fallback_func(&mut retval, &storage.accounts);
4052 }
4053
4054 ScanStorageResult::Stored(retval)
4055 }
4056 }
4057
4058 pub fn load(
4059 &self,
4060 ancestors: &Ancestors,
4061 pubkey: &Pubkey,
4062 load_hint: LoadHint,
4063 ) -> Option<(AccountSharedData, Slot)> {
4064 self.do_load(ancestors, pubkey, None, load_hint, LoadZeroLamports::None)
4065 }
4066
4067 pub fn load_account_into_read_cache(&self, ancestors: &Ancestors, pubkey: &Pubkey) {
4070 self.do_load_with_populate_read_cache(
4071 ancestors,
4072 pubkey,
4073 None,
4074 LoadHint::Unspecified,
4075 true,
4076 LoadZeroLamports::None,
4078 );
4079 }
4080
4081 pub fn load_with_fixed_root(
4083 &self,
4084 ancestors: &Ancestors,
4085 pubkey: &Pubkey,
4086 ) -> Option<(AccountSharedData, Slot)> {
4087 self.load(ancestors, pubkey, LoadHint::FixedMaxRoot)
4088 }
4089
4090 fn read_index_for_accessor_or_load_slow<'a>(
4091 &'a self,
4092 ancestors: &Ancestors,
4093 pubkey: &'a Pubkey,
4094 max_root: Option<Slot>,
4095 clone_in_lock: bool,
4096 ) -> Option<(Slot, StorageLocation, Option<LoadedAccountAccessor<'a>>)> {
4097 self.accounts_index.get_with_and_then(
4098 pubkey,
4099 Some(ancestors),
4100 max_root,
4101 true,
4102 |(slot, account_info)| {
4103 let storage_location = account_info.storage_location();
4104 let account_accessor = clone_in_lock
4105 .then(|| self.get_account_accessor(slot, pubkey, &storage_location));
4106 (slot, storage_location, account_accessor)
4107 },
4108 )
4109 }
4110
4111 fn retry_to_get_account_accessor<'a>(
4112 &'a self,
4113 mut slot: Slot,
4114 mut storage_location: StorageLocation,
4115 ancestors: &'a Ancestors,
4116 pubkey: &'a Pubkey,
4117 max_root: Option<Slot>,
4118 load_hint: LoadHint,
4119 ) -> Option<(LoadedAccountAccessor<'a>, Slot)> {
4120 #[cfg(test)]
4224 {
4225 sleep(Duration::from_millis(self.load_delay));
4227 }
4228
4229 let mut num_acceptable_failed_iterations = 0;
4231 loop {
4232 let account_accessor = self.get_account_accessor(slot, pubkey, &storage_location);
4233 match account_accessor {
4234 LoadedAccountAccessor::Cached(Some(_)) | LoadedAccountAccessor::Stored(Some(_)) => {
4235 return Some((account_accessor, slot));
4237 }
4238 LoadedAccountAccessor::Cached(None) => {
4239 num_acceptable_failed_iterations += 1;
4240 match load_hint {
4244 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4245 assert!(num_acceptable_failed_iterations <= 1);
4254 }
4255 LoadHint::Unspecified => {
4256 }
4260 }
4261 }
4262 LoadedAccountAccessor::Stored(None) => {
4263 match load_hint {
4264 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4265 }
4291 LoadHint::Unspecified => {
4292 num_acceptable_failed_iterations += 1;
4303 }
4304 }
4305 }
4306 }
4307 #[cfg(not(test))]
4308 let load_limit = ABSURD_CONSECUTIVE_FAILED_ITERATIONS;
4309
4310 #[cfg(test)]
4311 let load_limit = self.load_limit.load(Ordering::Relaxed);
4312
4313 let fallback_to_slow_path = if num_acceptable_failed_iterations >= load_limit {
4314 let message = format!(
4318 "do_load() failed to get key: {pubkey} from storage, latest attempt was for \
4319 slot: {slot}, storage_location: {storage_location:?}, load_hint: \
4320 {load_hint:?}",
4321 );
4322 datapoint_warn!("accounts_db-do_load_warn", ("warn", message, String));
4323 true
4324 } else {
4325 false
4326 };
4327
4328 let (new_slot, new_storage_location, maybe_account_accessor) = self
4330 .read_index_for_accessor_or_load_slow(
4331 ancestors,
4332 pubkey,
4333 max_root,
4334 fallback_to_slow_path,
4335 )?;
4336 if new_slot == slot && new_storage_location.is_store_id_equal(&storage_location) {
4339 let message = format!(
4340 "Bad index entry detected ({}, {}, {:?}, {:?}, {:?}, {:?})",
4341 pubkey,
4342 slot,
4343 storage_location,
4344 load_hint,
4345 new_storage_location,
4346 self.accounts_index.get_cloned(pubkey)
4347 );
4348 assert!(
4352 new_storage_location.is_offset_equal(&storage_location),
4353 "{message}"
4354 );
4355
4356 assert!(!new_storage_location.is_cached(), "{message}");
4360
4361 assert_eq!(load_hint, LoadHint::Unspecified, "{message}");
4367
4368 panic!("{message}");
4377 } else if fallback_to_slow_path {
4378 return Some((
4381 maybe_account_accessor.expect("must be some if clone_in_lock=true"),
4382 new_slot,
4383 ));
4384 }
4385
4386 slot = new_slot;
4387 storage_location = new_storage_location;
4388 }
4389 }
4390
4391 fn do_load(
4392 &self,
4393 ancestors: &Ancestors,
4394 pubkey: &Pubkey,
4395 max_root: Option<Slot>,
4396 load_hint: LoadHint,
4397 load_zero_lamports: LoadZeroLamports,
4398 ) -> Option<(AccountSharedData, Slot)> {
4399 self.do_load_with_populate_read_cache(
4400 ancestors,
4401 pubkey,
4402 max_root,
4403 load_hint,
4404 false,
4405 load_zero_lamports,
4406 )
4407 }
4408
4409 pub fn load_account_with(
4414 &self,
4415 ancestors: &Ancestors,
4416 pubkey: &Pubkey,
4417 should_put_in_read_cache: bool,
4418 ) -> Option<(AccountSharedData, Slot)> {
4419 let (slot, storage_location, _maybe_account_accessor) =
4420 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, None, false)?;
4421 let in_write_cache = storage_location.is_cached();
4424 if !in_write_cache {
4425 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4426 if let Some(account) = result {
4427 if account.is_zero_lamport() {
4428 return None;
4429 }
4430 return Some((account, slot));
4431 }
4432 }
4433
4434 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4435 slot,
4436 storage_location,
4437 ancestors,
4438 pubkey,
4439 None,
4440 LoadHint::Unspecified,
4441 )?;
4442
4443 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4446 let account = account_accessor.check_and_get_loaded_account_shared_data();
4447 if account.is_zero_lamport() {
4448 return None;
4449 }
4450
4451 if !in_write_cache && should_put_in_read_cache {
4452 self.read_only_accounts_cache
4465 .store(*pubkey, slot, account.clone());
4466 }
4467 Some((account, slot))
4468 }
4469
4470 fn do_load_with_populate_read_cache(
4473 &self,
4474 ancestors: &Ancestors,
4475 pubkey: &Pubkey,
4476 max_root: Option<Slot>,
4477 load_hint: LoadHint,
4478 load_into_read_cache_only: bool,
4479 load_zero_lamports: LoadZeroLamports,
4480 ) -> Option<(AccountSharedData, Slot)> {
4481 #[cfg(not(test))]
4482 assert!(max_root.is_none());
4483
4484 let starting_max_root = self.accounts_index.max_root_inclusive();
4485
4486 let (slot, storage_location, _maybe_account_accessor) =
4487 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, max_root, false)?;
4488 let in_write_cache = storage_location.is_cached();
4491 if !load_into_read_cache_only {
4492 if !in_write_cache {
4493 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4494 if let Some(account) = result {
4495 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4496 return None;
4497 }
4498 return Some((account, slot));
4499 }
4500 }
4501 } else {
4502 if in_write_cache {
4504 return None;
4506 }
4507 if self.read_only_accounts_cache.in_cache(pubkey, slot) {
4508 return None;
4510 }
4511 }
4512
4513 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4514 slot,
4515 storage_location,
4516 ancestors,
4517 pubkey,
4518 max_root,
4519 load_hint,
4520 )?;
4521 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4524 let account = account_accessor.check_and_get_loaded_account_shared_data();
4525 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4526 return None;
4527 }
4528
4529 if !in_write_cache && load_hint != LoadHint::FixedMaxRootDoNotPopulateReadCache {
4530 self.read_only_accounts_cache
4543 .store(*pubkey, slot, account.clone());
4544 }
4545 if load_hint == LoadHint::FixedMaxRoot
4546 || load_hint == LoadHint::FixedMaxRootDoNotPopulateReadCache
4547 {
4548 let ending_max_root = self.accounts_index.max_root_inclusive();
4550 if starting_max_root != ending_max_root {
4551 warn!(
4552 "do_load_with_populate_read_cache() scanning pubkey {pubkey} called with \
4553 fixed max root, but max root changed from {starting_max_root} to \
4554 {ending_max_root} during function call"
4555 );
4556 }
4557 }
4558 Some((account, slot))
4559 }
4560
4561 fn get_account_accessor<'a>(
4562 &'a self,
4563 slot: Slot,
4564 pubkey: &'a Pubkey,
4565 storage_location: &StorageLocation,
4566 ) -> LoadedAccountAccessor<'a> {
4567 match storage_location {
4568 StorageLocation::Cached => {
4569 let maybe_cached_account = self.accounts_cache.load(slot, pubkey).map(Cow::Owned);
4570 LoadedAccountAccessor::Cached(maybe_cached_account)
4571 }
4572 StorageLocation::AppendVec(store_id, offset) => {
4573 let maybe_storage_entry = self
4574 .storage
4575 .get_account_storage_entry(slot, *store_id)
4576 .map(|account_storage_entry| (account_storage_entry, *offset));
4577 LoadedAccountAccessor::Stored(maybe_storage_entry)
4578 }
4579 }
4580 }
4581
4582 fn has_space_available(&self, slot: Slot, size: u64) -> bool {
4583 let store = self.storage.get_slot_storage_entry(slot).unwrap();
4584 if store.status() == AccountStorageStatus::Available
4585 && store.accounts.remaining_bytes() >= size
4586 {
4587 return true;
4588 }
4589 false
4590 }
4591
4592 fn create_store(
4593 &self,
4594 slot: Slot,
4595 size: u64,
4596 from: &str,
4597 paths: &[PathBuf],
4598 ) -> Arc<AccountStorageEntry> {
4599 self.stats
4600 .create_store_count
4601 .fetch_add(1, Ordering::Relaxed);
4602 let path_index = thread_rng().gen_range(0..paths.len());
4603 let store = Arc::new(self.new_storage_entry(slot, Path::new(&paths[path_index]), size));
4604
4605 debug!(
4606 "creating store: {} slot: {} len: {} size: {} from: {} path: {}",
4607 store.id(),
4608 slot,
4609 store.accounts.len(),
4610 store.accounts.capacity(),
4611 from,
4612 store.accounts.path().display(),
4613 );
4614
4615 store
4616 }
4617
4618 fn create_and_insert_store(
4619 &self,
4620 slot: Slot,
4621 size: u64,
4622 from: &str,
4623 ) -> Arc<AccountStorageEntry> {
4624 self.create_and_insert_store_with_paths(slot, size, from, &self.paths)
4625 }
4626
4627 fn create_and_insert_store_with_paths(
4628 &self,
4629 slot: Slot,
4630 size: u64,
4631 from: &str,
4632 paths: &[PathBuf],
4633 ) -> Arc<AccountStorageEntry> {
4634 let store = self.create_store(slot, size, from, paths);
4635 let store_for_index = store.clone();
4636
4637 self.insert_store(slot, store_for_index);
4638 store
4639 }
4640
4641 fn insert_store(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
4642 self.storage.insert(slot, store)
4643 }
4644
4645 pub fn enable_bank_drop_callback(&self) {
4646 self.is_bank_drop_callback_enabled
4647 .store(true, Ordering::Release);
4648 }
4649
4650 pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_serialized_with_abs: bool) {
4656 if self.is_bank_drop_callback_enabled.load(Ordering::Acquire) && !is_serialized_with_abs {
4657 panic!(
4658 "bad drop callpath detected; Bank::drop() must run serially with other logic in \
4659 ABS like clean_accounts()"
4660 )
4661 }
4662
4663 if self
4668 .accounts_index
4669 .removed_bank_ids
4670 .lock()
4671 .unwrap()
4672 .remove(&bank_id)
4673 {
4674 return;
4676 }
4677
4678 self.purge_slots(std::iter::once(&slot));
4679 }
4680
4681 pub fn purge_slots_from_cache_and_store<'a>(
4684 &self,
4685 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4686 purge_stats: &PurgeStats,
4687 ) {
4688 let mut remove_cache_elapsed_across_slots = 0;
4689 let mut num_cached_slots_removed = 0;
4690 let mut total_removed_cached_bytes = 0;
4691 for remove_slot in removed_slots {
4692 let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed");
4695 if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) {
4700 num_cached_slots_removed += 1;
4703 total_removed_cached_bytes += slot_cache.total_bytes();
4704 self.purge_slot_cache(*remove_slot, &slot_cache);
4705 remove_cache_elapsed.stop();
4706 remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us();
4707 assert!(self.accounts_cache.remove_slot(*remove_slot).is_some());
4709 } else {
4710 self.purge_slot_storage(*remove_slot, purge_stats);
4711 }
4712 }
4716
4717 purge_stats
4718 .remove_cache_elapsed
4719 .fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed);
4720 purge_stats
4721 .num_cached_slots_removed
4722 .fetch_add(num_cached_slots_removed, Ordering::Relaxed);
4723 purge_stats
4724 .total_removed_cached_bytes
4725 .fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
4726 }
4727
4728 fn purge_dead_slots_from_storage<'a>(
4731 &'a self,
4732 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4733 purge_stats: &PurgeStats,
4734 ) {
4735 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4742 assert!(self
4743 .accounts_index
4744 .get_rooted_from_list(removed_slots.clone())
4745 .is_empty());
4746 safety_checks_elapsed.stop();
4747 purge_stats
4748 .safety_checks_elapsed
4749 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4750
4751 let mut total_removed_stored_bytes = 0;
4752 let mut all_removed_slot_storages = vec![];
4753
4754 let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed");
4755 for remove_slot in removed_slots {
4756 if let Some(store) = self.storage.remove(remove_slot, false) {
4758 total_removed_stored_bytes += store.accounts.capacity();
4759 all_removed_slot_storages.push(store);
4760 }
4761 }
4762 remove_storage_entries_elapsed.stop();
4763 let num_stored_slots_removed = all_removed_slot_storages.len();
4764
4765 let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
4768 drop(all_removed_slot_storages);
4769 drop_storage_entries_elapsed.stop();
4770
4771 purge_stats
4772 .remove_storage_entries_elapsed
4773 .fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4774 purge_stats
4775 .drop_storage_entries_elapsed
4776 .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4777 purge_stats
4778 .num_stored_slots_removed
4779 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4780 purge_stats
4781 .total_removed_storage_entries
4782 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4783 purge_stats
4784 .total_removed_stored_bytes
4785 .fetch_add(total_removed_stored_bytes, Ordering::Relaxed);
4786 self.stats
4787 .dropped_stores
4788 .fetch_add(num_stored_slots_removed as u64, Ordering::Relaxed);
4789 }
4790
4791 fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: &SlotCache) {
4792 let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
4793 .iter()
4794 .map(|account| (*account.key(), purged_slot))
4795 .collect();
4796 self.purge_slot_cache_pubkeys(purged_slot, pubkey_to_slot_set, true);
4797 }
4798
4799 fn purge_slot_cache_pubkeys(
4800 &self,
4801 purged_slot: Slot,
4802 pubkey_to_slot_set: Vec<(Pubkey, Slot)>,
4803 is_dead: bool,
4804 ) {
4805 assert!(self
4807 .storage
4808 .get_slot_storage_entry_shrinking_in_progress_ok(purged_slot)
4809 .is_none());
4810 let num_purged_keys = pubkey_to_slot_set.len();
4811 let (reclaims, _) = self.purge_keys_exact(pubkey_to_slot_set.iter());
4812 assert_eq!(reclaims.len(), num_purged_keys);
4813 if is_dead {
4814 self.remove_dead_slots_metadata(std::iter::once(&purged_slot));
4815 }
4816 }
4817
4818 fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) {
4819 let mut scan_storages_elapsed = Measure::start("scan_storages_elapsed");
4826 let mut stored_keys = HashSet::new();
4827 if let Some(storage) = self
4828 .storage
4829 .get_slot_storage_entry_shrinking_in_progress_ok(remove_slot)
4830 {
4831 storage
4832 .accounts
4833 .scan_pubkeys(|pk| {
4834 stored_keys.insert((*pk, remove_slot));
4835 })
4836 .expect("must scan accounts storage");
4837 }
4838 scan_storages_elapsed.stop();
4839 purge_stats
4840 .scan_storages_elapsed
4841 .fetch_add(scan_storages_elapsed.as_us(), Ordering::Relaxed);
4842
4843 let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed");
4844 let (reclaims, pubkeys_removed_from_accounts_index) =
4846 self.purge_keys_exact(stored_keys.iter());
4847 purge_accounts_index_elapsed.stop();
4848 purge_stats
4849 .purge_accounts_index_elapsed
4850 .fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed);
4851
4852 let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed");
4855 let expected_dead_slot = Some(remove_slot);
4858 self.handle_reclaims(
4859 (!reclaims.is_empty()).then(|| reclaims.iter()),
4860 expected_dead_slot,
4861 &pubkeys_removed_from_accounts_index,
4862 HandleReclaims::ProcessDeadSlots(purge_stats),
4863 MarkAccountsObsolete::No,
4864 );
4865 handle_reclaims_elapsed.stop();
4866 purge_stats
4867 .handle_reclaims_elapsed
4868 .fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed);
4869 assert!(
4872 self.storage.get_slot_storage_entry(remove_slot).is_none(),
4873 "slot {remove_slot} is not none"
4874 );
4875 }
4876
4877 fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot> + Clone) {
4878 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4880 let non_roots = slots
4881 .filter(|slot| !self.accounts_index.is_alive_root(**slot));
4891 safety_checks_elapsed.stop();
4892 self.external_purge_slots_stats
4893 .safety_checks_elapsed
4894 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4895 self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats);
4896 self.external_purge_slots_stats
4897 .report("external_purge_slots_stats", Some(1000));
4898 }
4899
4900 pub fn remove_unrooted_slots(&self, remove_slots: &[(Slot, BankId)]) {
4901 let rooted_slots = self
4902 .accounts_index
4903 .get_rooted_from_list(remove_slots.iter().map(|(slot, _)| slot));
4904 assert!(
4905 rooted_slots.is_empty(),
4906 "Trying to remove accounts for rooted slots {rooted_slots:?}"
4907 );
4908
4909 let RemoveUnrootedSlotsSynchronization {
4910 slots_under_contention,
4911 signal,
4912 } = &self.remove_unrooted_slots_synchronization;
4913
4914 {
4915 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4918
4919 let mut remaining_contended_flush_slots: Vec<Slot> = remove_slots
4922 .iter()
4923 .filter_map(|(remove_slot, _)| {
4924 let is_being_flushed = !currently_contended_slots.insert(*remove_slot);
4933 is_being_flushed.then_some(remove_slot)
4935 })
4936 .cloned()
4937 .collect();
4938
4939 loop {
4941 if !remaining_contended_flush_slots.is_empty() {
4942 currently_contended_slots = signal.wait(currently_contended_slots).unwrap();
4948 } else {
4949 break;
4952 }
4953
4954 remaining_contended_flush_slots.retain(|flush_slot| {
4957 !currently_contended_slots.insert(*flush_slot)
4959 });
4960 }
4961 }
4962
4963 {
4967 let mut locked_removed_bank_ids = self.accounts_index.removed_bank_ids.lock().unwrap();
4968 for (_slot, remove_bank_id) in remove_slots.iter() {
4969 locked_removed_bank_ids.insert(*remove_bank_id);
4970 }
4971 }
4972
4973 let remove_unrooted_purge_stats = PurgeStats::default();
4974 self.purge_slots_from_cache_and_store(
4975 remove_slots.iter().map(|(slot, _)| slot),
4976 &remove_unrooted_purge_stats,
4977 );
4978 remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", None);
4979
4980 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4981 for (remove_slot, _) in remove_slots {
4982 assert!(currently_contended_slots.remove(remove_slot));
4983 }
4984 }
4985
4986 pub fn lt_hash_account(account: &impl ReadableAccount, pubkey: &Pubkey) -> AccountLtHash {
4988 if account.lamports() == 0 {
4989 return ZERO_LAMPORT_ACCOUNT_LT_HASH;
4990 }
4991
4992 let hasher = Self::hash_account_helper(account, pubkey);
4993 let lt_hash = LtHash::with(&hasher);
4994 AccountLtHash(lt_hash)
4995 }
4996
4997 fn hash_account_helper(account: &impl ReadableAccount, pubkey: &Pubkey) -> blake3::Hasher {
4999 let mut hasher = blake3::Hasher::new();
5000
5001 const META_SIZE: usize = 8 + 1 + 32 + 32 ;
5004 const DATA_SIZE: usize = 200; const BUFFER_SIZE: usize = META_SIZE + DATA_SIZE;
5006 let mut buffer = SmallVec::<[u8; BUFFER_SIZE]>::new();
5007
5008 buffer.extend_from_slice(&account.lamports().to_le_bytes());
5010
5011 let data = account.data();
5012 if data.len() > DATA_SIZE {
5013 hasher.update(&buffer);
5015 buffer.clear();
5016
5017 hasher.update(data);
5019 } else {
5020 buffer.extend_from_slice(data);
5022 }
5023
5024 buffer.push(account.executable().into());
5026 buffer.extend_from_slice(account.owner().as_ref());
5027 buffer.extend_from_slice(pubkey.as_ref());
5028 hasher.update(&buffer);
5029
5030 hasher
5031 }
5032
5033 pub fn mark_slot_frozen(&self, slot: Slot) {
5034 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
5035 slot_cache.mark_slot_frozen();
5036 slot_cache.report_slot_store_metrics();
5037 }
5038 self.accounts_cache.report_size();
5039 }
5040
5041 #[cfg(feature = "dev-context-only-utils")]
5043 pub fn flush_accounts_cache_slot_for_tests(&self, slot: Slot) {
5044 self.flush_slot_cache(slot);
5045 }
5046
5047 fn should_aggressively_flush_cache(&self) -> bool {
5049 self.write_cache_limit_bytes
5050 .unwrap_or(WRITE_CACHE_LIMIT_BYTES_DEFAULT)
5051 < self.accounts_cache.size()
5052 }
5053
5054 pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option<Slot>) {
5058 #[cfg(not(test))]
5059 assert!(requested_flush_root.is_some());
5060
5061 if !force_flush && !self.should_aggressively_flush_cache() {
5062 return;
5063 }
5064
5065 let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
5068
5069 let _guard = self.active_stats.activate(ActiveStatItem::Flush);
5070
5071 let (total_new_cleaned_roots, num_cleaned_roots_flushed, mut flush_stats) = self
5075 .flush_rooted_accounts_cache(
5076 requested_flush_root,
5077 true, );
5079 flush_roots_elapsed.stop();
5080
5081 let (total_new_excess_roots, num_excess_roots_flushed, flush_stats_aggressively) =
5087 if self.should_aggressively_flush_cache() {
5088 self.flush_rooted_accounts_cache(None, false)
5094 } else {
5095 (0, 0, FlushStats::default())
5096 };
5097 flush_stats.accumulate(&flush_stats_aggressively);
5098
5099 let mut excess_slot_count = 0;
5100 let mut unflushable_unrooted_slot_count = 0;
5101 let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
5102 if self.should_aggressively_flush_cache() {
5103 let mut old_slots = self.accounts_cache.cached_frozen_slots();
5104 old_slots.sort_unstable();
5105 excess_slot_count = old_slots.len();
5106 let mut flush_stats = FlushStats::default();
5107 old_slots.into_iter().for_each(|old_slot| {
5108 if old_slot > max_flushed_root {
5110 if self.should_aggressively_flush_cache() {
5111 if let Some(stats) = self.flush_slot_cache(old_slot) {
5112 flush_stats.accumulate(&stats);
5113 }
5114 }
5115 } else {
5116 unflushable_unrooted_slot_count += 1;
5117 }
5118 });
5119 datapoint_info!(
5120 "accounts_db-flush_accounts_cache_aggressively",
5121 (
5122 "num_accounts_flushed",
5123 flush_stats.num_accounts_flushed.0,
5124 i64
5125 ),
5126 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
5127 (
5128 "account_bytes_flushed",
5129 flush_stats.num_bytes_flushed.0,
5130 i64
5131 ),
5132 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
5133 ("total_cache_size", self.accounts_cache.size(), i64),
5134 ("total_frozen_slots", excess_slot_count, i64),
5135 ("total_slots", self.accounts_cache.num_slots(), i64),
5136 );
5137 }
5138
5139 datapoint_info!(
5140 "accounts_db-flush_accounts_cache",
5141 ("total_new_cleaned_roots", total_new_cleaned_roots, i64),
5142 ("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64),
5143 ("total_new_excess_roots", total_new_excess_roots, i64),
5144 ("num_excess_roots_flushed", num_excess_roots_flushed, i64),
5145 ("excess_slot_count", excess_slot_count, i64),
5146 (
5147 "unflushable_unrooted_slot_count",
5148 unflushable_unrooted_slot_count,
5149 i64
5150 ),
5151 ("flush_roots_elapsed", flush_roots_elapsed.as_us(), i64),
5152 (
5153 "account_bytes_flushed",
5154 flush_stats.num_bytes_flushed.0,
5155 i64
5156 ),
5157 (
5158 "num_accounts_flushed",
5159 flush_stats.num_accounts_flushed.0,
5160 i64
5161 ),
5162 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
5163 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
5164 (
5165 "store_accounts_total_us",
5166 flush_stats.store_accounts_total_us.0,
5167 i64
5168 ),
5169 (
5170 "update_index_us",
5171 flush_stats.store_accounts_timing.update_index_elapsed,
5172 i64
5173 ),
5174 (
5175 "store_accounts_elapsed_us",
5176 flush_stats.store_accounts_timing.store_accounts_elapsed,
5177 i64
5178 ),
5179 (
5180 "handle_reclaims_elapsed_us",
5181 flush_stats.store_accounts_timing.handle_reclaims_elapsed,
5182 i64
5183 ),
5184 );
5185 }
5186
5187 fn flush_rooted_accounts_cache(
5188 &self,
5189 requested_flush_root: Option<Slot>,
5190 should_clean: bool,
5191 ) -> (usize, usize, FlushStats) {
5192 let max_clean_root = should_clean
5193 .then(|| {
5194 self.max_clean_root(requested_flush_root)
5197 })
5198 .flatten();
5199
5200 let mut written_accounts = HashSet::new();
5201
5202 let mut should_flush_f = should_clean
5205 .then(|| {
5206 Some(move |&pubkey: &Pubkey| {
5207 written_accounts.insert(pubkey)
5209 })
5210 })
5211 .flatten();
5212
5213 let flushed_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(requested_flush_root);
5215
5216 let mut num_roots_flushed = 0;
5219 let mut flush_stats = FlushStats::default();
5220 for &root in flushed_roots.iter().rev() {
5221 if let Some(stats) =
5222 self.flush_slot_cache_with_clean(root, should_flush_f.as_mut(), max_clean_root)
5223 {
5224 num_roots_flushed += 1;
5225 flush_stats.accumulate(&stats);
5226 }
5227 }
5228
5229 if let Some(&root) = flushed_roots.last() {
5238 self.accounts_cache.set_max_flush_root(root);
5239 }
5240 let num_new_roots = flushed_roots.len();
5241 (num_new_roots, num_roots_flushed, flush_stats)
5242 }
5243
5244 fn do_flush_slot_cache(
5245 &self,
5246 slot: Slot,
5247 slot_cache: &SlotCache,
5248 mut should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5249 max_clean_root: Option<Slot>,
5250 ) -> FlushStats {
5251 let mut flush_stats = FlushStats::default();
5252 let iter_items: Vec<_> = slot_cache.iter().collect();
5253 let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
5254 if should_flush_f.is_some() {
5255 if let Some(max_clean_root) = max_clean_root {
5256 if slot > max_clean_root {
5257 should_flush_f = None;
5261 }
5262 }
5263 }
5264
5265 let accounts: Vec<(&Pubkey, &AccountSharedData)> = iter_items
5266 .iter()
5267 .filter_map(|iter_item| {
5268 let key = iter_item.key();
5269 let account = &iter_item.value().account;
5270 let should_flush = should_flush_f
5271 .as_mut()
5272 .map(|should_flush_f| should_flush_f(key))
5273 .unwrap_or(true);
5274 if should_flush {
5275 flush_stats.num_bytes_flushed +=
5276 aligned_stored_size(account.data().len()) as u64;
5277 flush_stats.num_accounts_flushed += 1;
5278 Some((key, account))
5279 } else {
5280 pubkey_to_slot_set.push((*key, slot));
5283 flush_stats.num_bytes_purged +=
5284 aligned_stored_size(account.data().len()) as u64;
5285 flush_stats.num_accounts_purged += 1;
5286 None
5287 }
5288 })
5289 .collect();
5290
5291 let is_dead_slot = accounts.is_empty();
5292 self.purge_slot_cache_pubkeys(slot, pubkey_to_slot_set, is_dead_slot);
5295
5296 if !is_dead_slot {
5297 let flushed_store = self.create_and_insert_store(
5301 slot,
5302 flush_stats.num_bytes_flushed.0,
5303 "flush_slot_cache",
5304 );
5305
5306 let reclaim_method = if self.mark_obsolete_accounts && should_flush_f.is_some() {
5312 UpsertReclaim::ReclaimOldSlots
5313 } else {
5314 UpsertReclaim::IgnoreReclaims
5315 };
5316
5317 let (store_accounts_timing_inner, store_accounts_total_inner_us) = measure_us!(self
5318 ._store_accounts_frozen(
5319 (slot, &accounts[..]),
5320 &flushed_store,
5321 reclaim_method,
5322 UpdateIndexThreadSelection::PoolWithThreshold,
5323 ));
5324 flush_stats.store_accounts_timing = store_accounts_timing_inner;
5325 flush_stats.store_accounts_total_us = Saturating(store_accounts_total_inner_us);
5326
5327 assert!(self.storage.get_slot_storage_entry(slot).is_some());
5330 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
5331 }
5332
5333 assert!(self.accounts_cache.remove_slot(slot).is_some());
5338
5339 self.uncleaned_pubkeys
5342 .entry(slot)
5343 .or_default()
5344 .extend(accounts.iter().map(|(pubkey, _account)| **pubkey));
5345
5346 flush_stats
5347 }
5348
5349 fn flush_slot_cache(&self, slot: Slot) -> Option<FlushStats> {
5351 self.flush_slot_cache_with_clean(slot, None::<&mut fn(&_) -> bool>, None)
5352 }
5353
5354 fn flush_slot_cache_with_clean(
5358 &self,
5359 slot: Slot,
5360 should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5361 max_clean_root: Option<Slot>,
5362 ) -> Option<FlushStats> {
5363 if self
5364 .remove_unrooted_slots_synchronization
5365 .slots_under_contention
5366 .lock()
5367 .unwrap()
5368 .insert(slot)
5369 {
5370 let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| {
5372 #[cfg(test)]
5373 {
5374 sleep(Duration::from_millis(self.load_delay));
5376 }
5377 self.do_flush_slot_cache(slot, &slot_cache, should_flush_f, max_clean_root)
5382 });
5383
5384 assert!(self
5387 .remove_unrooted_slots_synchronization
5388 .slots_under_contention
5389 .lock()
5390 .unwrap()
5391 .remove(&slot));
5392
5393 self.remove_unrooted_slots_synchronization
5396 .signal
5397 .notify_all();
5398 flush_stats
5399 } else {
5400 None
5402 }
5403 }
5404
5405 fn report_store_stats(&self) {
5406 let mut total_count = 0;
5407 let mut newest_slot = 0;
5408 let mut oldest_slot = u64::MAX;
5409 let mut total_bytes = 0;
5410 let mut total_alive_bytes = 0;
5411 for (slot, store) in self.storage.iter() {
5412 total_count += 1;
5413 newest_slot = std::cmp::max(newest_slot, slot);
5414
5415 oldest_slot = std::cmp::min(oldest_slot, slot);
5416
5417 total_alive_bytes += store.alive_bytes();
5418 total_bytes += store.capacity();
5419 }
5420 info!(
5421 "total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}"
5422 );
5423
5424 let total_alive_ratio = if total_bytes > 0 {
5425 total_alive_bytes as f64 / total_bytes as f64
5426 } else {
5427 0.
5428 };
5429
5430 datapoint_info!(
5431 "accounts_db-stores",
5432 ("total_count", total_count, i64),
5433 ("total_bytes", total_bytes, i64),
5434 ("total_alive_bytes", total_alive_bytes, i64),
5435 ("total_alive_ratio", total_alive_ratio, f64),
5436 );
5437 }
5438
5439 pub fn calculate_accounts_lt_hash_at_startup_from_index(
5444 &self,
5445 ancestors: &Ancestors,
5446 startup_slot: Slot,
5447 ) -> AccountsLtHash {
5448 let lt_hash = self
5456 .accounts_index
5457 .account_maps
5458 .par_iter()
5459 .fold(
5460 LtHash::identity,
5461 |mut accumulator_lt_hash, accounts_index_bin| {
5462 for pubkey in accounts_index_bin.keys() {
5463 let account_lt_hash = self
5464 .accounts_index
5465 .get_with_and_then(
5466 &pubkey,
5467 Some(ancestors),
5468 Some(startup_slot),
5469 false,
5470 |(slot, account_info)| {
5471 (!account_info.is_zero_lamport()).then(|| {
5472 self.get_account_accessor(
5473 slot,
5474 &pubkey,
5475 &account_info.storage_location(),
5476 )
5477 .get_loaded_account(|loaded_account| {
5478 Self::lt_hash_account(&loaded_account, &pubkey)
5479 })
5480 .unwrap()
5483 })
5484 },
5485 )
5486 .flatten();
5487 if let Some(account_lt_hash) = account_lt_hash {
5488 accumulator_lt_hash.mix_in(&account_lt_hash.0);
5489 }
5490 }
5491 accumulator_lt_hash
5492 },
5493 )
5494 .reduce(LtHash::identity, |mut accum, elem| {
5495 accum.mix_in(&elem);
5496 accum
5497 });
5498
5499 AccountsLtHash(lt_hash)
5500 }
5501
5502 pub fn calculate_accounts_lt_hash_at_startup_from_storages(
5510 &self,
5511 storages: &[Arc<AccountStorageEntry>],
5512 duplicates_lt_hash: &DuplicatesLtHash,
5513 startup_slot: Slot,
5514 num_threads: NonZeroUsize,
5515 ) -> AccountsLtHash {
5516 let storages =
5520 AccountStoragesOrderer::with_random_order(storages).into_concurrent_consumer();
5521 let mut lt_hash = thread::scope(|s| {
5522 let handles = (0..num_threads.get())
5523 .map(|i| {
5524 thread::Builder::new()
5525 .name(format!("solAcctLtHash{i:02}"))
5526 .spawn_scoped(s, || {
5527 let mut thread_lt_hash = LtHash::identity();
5528 let mut reader = append_vec::new_scan_accounts_reader();
5529
5530 while let Some(storage) = storages.next() {
5531 let obsolete_accounts =
5535 storage.get_obsolete_accounts(Some(startup_slot));
5536 storage
5537 .accounts
5538 .scan_accounts(&mut reader, |offset, account| {
5539 if !obsolete_accounts
5541 .contains(&(offset, account.data.len()))
5542 {
5543 let account_lt_hash =
5544 Self::lt_hash_account(&account, account.pubkey());
5545 thread_lt_hash.mix_in(&account_lt_hash.0);
5546 }
5547 })
5548 .expect("must scan accounts storage");
5549 }
5550 thread_lt_hash
5551 })
5552 })
5553 .collect::<Result<Vec<_>, _>>()
5554 .expect("threads should spawn successfully");
5555 handles
5556 .into_iter()
5557 .map(|handle| handle.join().expect("thread should join successfully"))
5558 .fold(LtHash::identity(), |mut accum, elem| {
5559 accum.mix_in(&elem);
5560 accum
5561 })
5562 });
5563
5564 if self.mark_obsolete_accounts {
5565 assert_eq!(*duplicates_lt_hash, DuplicatesLtHash::default());
5569 }
5570 lt_hash.mix_out(&duplicates_lt_hash.0);
5571
5572 AccountsLtHash(lt_hash)
5573 }
5574
5575 pub fn calculate_capitalization_at_startup_from_index(
5584 &self,
5585 ancestors: &Ancestors,
5586 startup_slot: Slot,
5587 ) -> u64 {
5588 self.accounts_index
5589 .account_maps
5590 .par_iter()
5591 .map(|accounts_index_bin| {
5592 accounts_index_bin
5593 .keys()
5594 .into_iter()
5595 .map(|pubkey| {
5596 self.accounts_index
5597 .get_with_and_then(
5598 &pubkey,
5599 Some(ancestors),
5600 Some(startup_slot),
5601 false,
5602 |(slot, account_info)| {
5603 (!account_info.is_zero_lamport()).then(|| {
5604 self.get_account_accessor(
5605 slot,
5606 &pubkey,
5607 &account_info.storage_location(),
5608 )
5609 .get_loaded_account(|loaded_account| {
5610 loaded_account.lamports()
5611 })
5612 .unwrap()
5615 })
5616 },
5617 )
5618 .flatten()
5619 .unwrap_or(0)
5620 })
5621 .try_fold(0, u64::checked_add)
5622 })
5623 .try_reduce(|| 0, u64::checked_add)
5624 .expect("capitalization cannot overflow")
5625 }
5626
5627 fn apply_offset_to_slot(slot: Slot, offset: i64) -> Slot {
5629 if offset > 0 {
5630 slot.saturating_add(offset as u64)
5631 } else {
5632 slot.saturating_sub(offset.unsigned_abs())
5633 }
5634 }
5635
5636 pub fn get_pubkeys_for_slot(&self, slot: Slot) -> Vec<Pubkey> {
5638 let scan_result = self.scan_cache_storage_fallback(
5639 slot,
5640 |loaded_account| Some(*loaded_account.pubkey()),
5641 |accum: &mut HashSet<Pubkey>, storage| {
5642 storage
5643 .scan_pubkeys(|pubkey| {
5644 accum.insert(*pubkey);
5645 })
5646 .expect("must scan accounts storage");
5647 },
5648 );
5649 match scan_result {
5650 ScanStorageResult::Cached(cached_result) => cached_result,
5651 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5652 }
5653 }
5654
5655 pub fn get_pubkey_account_for_slot(&self, slot: Slot) -> Vec<(Pubkey, AccountSharedData)> {
5657 let scan_result = self.scan_account_storage(
5658 slot,
5659 |loaded_account| {
5660 Some((*loaded_account.pubkey(), loaded_account.take_account()))
5662 },
5663 |accum: &mut HashMap<_, _>, stored_account, data| {
5664 let data = data.unwrap();
5667 let loaded_account =
5668 LoadedAccount::Stored(StoredAccountInfo::new_from(stored_account, data));
5669 accum.insert(*loaded_account.pubkey(), loaded_account.take_account());
5671 },
5672 ScanAccountStorageData::DataRefForStorage,
5673 );
5674
5675 match scan_result {
5676 ScanStorageResult::Cached(cached_result) => cached_result,
5677 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5678 }
5679 }
5680
5681 fn update_index<'a>(
5682 &self,
5683 infos: Vec<AccountInfo>,
5684 accounts: &impl StorableAccounts<'a>,
5685 reclaim: UpsertReclaim,
5686 update_index_thread_selection: UpdateIndexThreadSelection,
5687 thread_pool: &ThreadPool,
5688 ) -> SlotList<AccountInfo> {
5689 let target_slot = accounts.target_slot();
5690 let len = std::cmp::min(accounts.len(), infos.len());
5691
5692 if reclaim == UpsertReclaim::ReclaimOldSlots {
5697 assert!(target_slot <= self.accounts_index.max_root_inclusive());
5698 }
5699
5700 let update = |start, end| {
5701 let mut reclaims = Vec::with_capacity((end - start) / 2);
5702
5703 (start..end).for_each(|i| {
5704 let info = infos[i];
5705 accounts.account(i, |account| {
5706 let old_slot = accounts.slot(i);
5707 self.accounts_index.upsert(
5708 target_slot,
5709 old_slot,
5710 account.pubkey(),
5711 &account,
5712 &self.account_indexes,
5713 info,
5714 &mut reclaims,
5715 reclaim,
5716 );
5717 });
5718 });
5719 reclaims
5720 };
5721
5722 let threshold = 1;
5723 if matches!(
5724 update_index_thread_selection,
5725 UpdateIndexThreadSelection::PoolWithThreshold,
5726 ) && len > threshold
5727 {
5728 let chunk_size = std::cmp::max(1, len / quarter_thread_count()); let batches = 1 + len / chunk_size;
5730 thread_pool.install(|| {
5731 (0..batches)
5732 .into_par_iter()
5733 .map(|batch| {
5734 let start = batch * chunk_size;
5735 let end = std::cmp::min(start + chunk_size, len);
5736 update(start, end)
5737 })
5738 .flatten()
5739 .collect::<Vec<_>>()
5740 })
5741 } else {
5742 update(0, len)
5743 }
5744 }
5745
5746 fn should_not_shrink(alive_bytes: u64, total_bytes: u64) -> bool {
5747 alive_bytes >= total_bytes
5748 }
5749
5750 fn is_shrinking_productive(store: &AccountStorageEntry) -> bool {
5751 let alive_count = store.count();
5752 let total_bytes = store.capacity();
5753 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5754 if Self::should_not_shrink(alive_bytes, total_bytes) {
5755 trace!(
5756 "shrink_slot_forced ({}): not able to shrink at all: num alive: {}, bytes alive: \
5757 {}, bytes total: {}, bytes saved: {}",
5758 store.slot(),
5759 alive_count,
5760 alive_bytes,
5761 total_bytes,
5762 total_bytes.saturating_sub(alive_bytes),
5763 );
5764 return false;
5765 }
5766
5767 true
5768 }
5769
5770 pub(crate) fn is_candidate_for_shrink(&self, store: &AccountStorageEntry) -> bool {
5773 let total_bytes = store.capacity();
5776
5777 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5778 match self.shrink_ratio {
5779 AccountShrinkThreshold::TotalSpace { shrink_ratio: _ } => alive_bytes < total_bytes,
5780 AccountShrinkThreshold::IndividualStore { shrink_ratio } => {
5781 (alive_bytes as f64 / total_bytes as f64) < shrink_ratio
5782 }
5783 }
5784 }
5785
5786 fn remove_dead_accounts<'a, I>(
5788 &'a self,
5789 reclaims: I,
5790 expected_slot: Option<Slot>,
5791 mark_accounts_obsolete: MarkAccountsObsolete,
5792 ) -> (IntSet<Slot>, SlotOffsets)
5793 where
5794 I: Iterator<Item = &'a (Slot, AccountInfo)>,
5795 {
5796 let mut reclaimed_offsets = SlotOffsets::default();
5797
5798 assert!(self.storage.no_shrink_in_progress());
5799
5800 let mut dead_slots = IntSet::default();
5801 let mut new_shrink_candidates = ShrinkCandidates::default();
5802 let mut measure = Measure::start("remove");
5803 for (slot, account_info) in reclaims {
5804 assert!(!account_info.is_cached());
5806 reclaimed_offsets
5807 .entry(*slot)
5808 .or_default()
5809 .insert(account_info.offset());
5810 }
5811 if let Some(expected_slot) = expected_slot {
5812 assert_eq!(reclaimed_offsets.len(), 1);
5813 assert!(reclaimed_offsets.contains_key(&expected_slot));
5814 }
5815
5816 self.clean_accounts_stats
5817 .slots_cleaned
5818 .fetch_add(reclaimed_offsets.len() as u64, Ordering::Relaxed);
5819
5820 reclaimed_offsets.iter().for_each(|(slot, offsets)| {
5821 if let Some(store) = self.storage.get_slot_storage_entry(*slot) {
5822 assert_eq!(
5823 *slot,
5824 store.slot(),
5825 "AccountsDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, \
5826 should only point to one slot",
5827 store.slot(),
5828 *slot
5829 );
5830
5831 let remaining_accounts = if offsets.len() == store.count() {
5832 store.remove_accounts(store.alive_bytes(), offsets.len())
5834 } else {
5835 let (remaining_accounts, us) = measure_us!({
5837 let mut offsets = offsets.iter().cloned().collect::<Vec<_>>();
5838 offsets.sort_unstable();
5840 let data_lens = store.accounts.get_account_data_lens(&offsets);
5841 let dead_bytes = data_lens
5842 .iter()
5843 .map(|len| store.accounts.calculate_stored_size(*len))
5844 .sum();
5845 let remaining_accounts = store.remove_accounts(dead_bytes, offsets.len());
5846
5847 if let MarkAccountsObsolete::Yes(slot_marked_obsolete) =
5848 mark_accounts_obsolete
5849 {
5850 store.mark_accounts_obsolete(
5851 offsets.into_iter().zip(data_lens),
5852 slot_marked_obsolete,
5853 );
5854 }
5855 remaining_accounts
5856 });
5857 self.clean_accounts_stats
5858 .get_account_sizes_us
5859 .fetch_add(us, Ordering::Relaxed);
5860 remaining_accounts
5861 };
5862
5863 if remaining_accounts == 0 {
5867 self.dirty_stores.insert(*slot, store);
5868 dead_slots.insert(*slot);
5869 } else if Self::is_shrinking_productive(&store)
5870 && self.is_candidate_for_shrink(&store)
5871 {
5872 new_shrink_candidates.insert(*slot);
5877 };
5878 }
5879 });
5880 measure.stop();
5881 self.clean_accounts_stats
5882 .remove_dead_accounts_remove_us
5883 .fetch_add(measure.as_us(), Ordering::Relaxed);
5884
5885 let mut measure = Measure::start("shrink");
5886 let mut shrink_candidate_slots = self.shrink_candidate_slots.lock().unwrap();
5887 for slot in new_shrink_candidates {
5888 shrink_candidate_slots.insert(slot);
5889 }
5890 drop(shrink_candidate_slots);
5891 measure.stop();
5892 self.clean_accounts_stats
5893 .remove_dead_accounts_shrink_us
5894 .fetch_add(measure.as_us(), Ordering::Relaxed);
5895
5896 dead_slots.retain(|slot| {
5897 if let Some(slot_store) = self.storage.get_slot_storage_entry(*slot) {
5898 if slot_store.count() != 0 {
5899 return false;
5900 }
5901 }
5902 true
5903 });
5904
5905 (dead_slots, reclaimed_offsets)
5906 }
5907
5908 fn remove_dead_slots_metadata<'a>(&'a self, dead_slots_iter: impl Iterator<Item = &'a Slot>) {
5909 let mut measure = Measure::start("remove_dead_slots_metadata-ms");
5910 self.clean_dead_slots_from_accounts_index(dead_slots_iter);
5911 measure.stop();
5912 inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
5913 }
5914
5915 fn unref_pubkeys<'a>(
5918 &'a self,
5919 pubkeys: impl Iterator<Item = &'a Pubkey> + Clone + Send + Sync,
5920 num_pubkeys: usize,
5921 pubkeys_removed_from_accounts_index: &'a PubkeysRemovedFromAccountsIndex,
5922 ) {
5923 let batches = 1 + (num_pubkeys / UNREF_ACCOUNTS_BATCH_SIZE);
5924 self.thread_pool_background.install(|| {
5925 (0..batches).into_par_iter().for_each(|batch| {
5926 let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE;
5927 self.accounts_index.scan(
5928 pubkeys
5929 .clone()
5930 .skip(skip)
5931 .take(UNREF_ACCOUNTS_BATCH_SIZE)
5932 .filter(|pubkey| {
5933 let already_removed =
5935 pubkeys_removed_from_accounts_index.contains(pubkey);
5936 !already_removed
5937 }),
5938 |_pubkey, slots_refs, _entry| {
5939 if let Some((slot_list, ref_count)) = slots_refs {
5940 if slot_list.len() == 1 && ref_count == 2 {
5942 if let Some((slot_alive, acct_info)) = slot_list.first() {
5943 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
5944 self.zero_lamport_single_ref_found(
5945 *slot_alive,
5946 acct_info.offset(),
5947 );
5948 }
5949 }
5950 }
5951 }
5952 AccountsIndexScanResult::Unref
5953 },
5954 None,
5955 false,
5956 ScanFilter::All,
5957 )
5958 });
5959 });
5960 }
5961
5962 fn unref_accounts(
5967 &self,
5968 purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
5969 purged_stored_account_slots: &mut AccountSlots,
5970 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
5971 ) {
5972 self.unref_pubkeys(
5973 purged_slot_pubkeys.iter().map(|(_slot, pubkey)| pubkey),
5974 purged_slot_pubkeys.len(),
5975 pubkeys_removed_from_accounts_index,
5976 );
5977 for (slot, pubkey) in purged_slot_pubkeys {
5978 purged_stored_account_slots
5979 .entry(pubkey)
5980 .or_default()
5981 .insert(slot);
5982 }
5983 }
5984
5985 fn clean_dead_slots_from_accounts_index<'a>(
5986 &'a self,
5987 dead_slots_iter: impl Iterator<Item = &'a Slot>,
5988 ) {
5989 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
5990 let mut measure = Measure::start("clean_dead_slot");
5991 let mut rooted_cleaned_count = 0;
5992 let mut unrooted_cleaned_count = 0;
5993 let dead_slots: Vec<_> = dead_slots_iter
5994 .map(|slot| {
5995 if self.accounts_index.clean_dead_slot(*slot) {
5996 rooted_cleaned_count += 1;
5997 } else {
5998 unrooted_cleaned_count += 1;
5999 }
6000 *slot
6001 })
6002 .collect();
6003 measure.stop();
6004 accounts_index_root_stats.clean_dead_slot_us += measure.as_us();
6005 if self.log_dead_slots.load(Ordering::Relaxed) {
6006 info!(
6007 "remove_dead_slots_metadata: {} dead slots",
6008 dead_slots.len()
6009 );
6010 trace!("remove_dead_slots_metadata: dead_slots: {dead_slots:?}");
6011 }
6012 self.accounts_index
6013 .update_roots_stats(&mut accounts_index_root_stats);
6014 accounts_index_root_stats.rooted_cleaned_count += rooted_cleaned_count;
6015 accounts_index_root_stats.unrooted_cleaned_count += unrooted_cleaned_count;
6016
6017 self.clean_accounts_stats
6018 .latest_accounts_index_roots_stats
6019 .update(&accounts_index_root_stats);
6020 }
6021
6022 fn clean_stored_dead_slots(
6025 &self,
6026 dead_slots: &IntSet<Slot>,
6027 purged_account_slots: Option<&mut AccountSlots>,
6028 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
6029 ) {
6030 let mut measure = Measure::start("clean_stored_dead_slots-ms");
6031 let mut stores = vec![];
6032 for slot in dead_slots.iter() {
6034 if let Some(slot_storage) = self.storage.get_slot_storage_entry(*slot) {
6035 stores.push(slot_storage);
6036 }
6037 }
6038 let purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = {
6040 self.thread_pool_background.install(|| {
6041 stores
6042 .into_par_iter()
6043 .map(|store| {
6044 let slot = store.slot();
6045 let mut pubkeys = Vec::with_capacity(store.count());
6046 let obsolete_accounts = store.get_obsolete_accounts(None);
6049 store
6050 .accounts
6051 .scan_accounts_without_data(|offset, account| {
6052 if !obsolete_accounts.contains(&(offset, account.data_len)) {
6053 pubkeys.push((slot, *account.pubkey));
6054 }
6055 })
6056 .expect("must scan accounts storage");
6057 pubkeys
6058 })
6059 .flatten()
6060 .collect::<HashSet<_>>()
6061 })
6062 };
6063
6064 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
6066 let mut measure_unref = Measure::start("unref_from_storage");
6067
6068 if let Some(purged_account_slots) = purged_account_slots {
6069 self.unref_accounts(
6070 purged_slot_pubkeys,
6071 purged_account_slots,
6072 pubkeys_removed_from_accounts_index,
6073 );
6074 }
6075 measure_unref.stop();
6076 accounts_index_root_stats.clean_unref_from_storage_us += measure_unref.as_us();
6077
6078 self.clean_accounts_stats
6079 .latest_accounts_index_roots_stats
6080 .update(&accounts_index_root_stats);
6081
6082 measure.stop();
6083 self.clean_accounts_stats
6084 .clean_stored_dead_slots_us
6085 .fetch_add(measure.as_us(), Ordering::Relaxed);
6086 }
6087
6088 pub(crate) fn store_accounts_unfrozen<'a>(
6091 &self,
6092 accounts: impl StorableAccounts<'a>,
6093 transactions: Option<&'a [&'a SanitizedTransaction]>,
6094 update_index_thread_selection: UpdateIndexThreadSelection,
6095 ) {
6096 if accounts.is_empty() {
6099 return;
6100 }
6101
6102 let mut total_data = 0;
6103 (0..accounts.len()).for_each(|index| {
6104 total_data += accounts.data_len(index);
6105 });
6106
6107 self.stats
6108 .store_total_data
6109 .fetch_add(total_data as u64, Ordering::Relaxed);
6110
6111 let mut store_accounts_time = Measure::start("store_accounts");
6113 let infos = self.write_accounts_to_cache(accounts.target_slot(), &accounts, transactions);
6114 store_accounts_time.stop();
6115 self.stats
6116 .store_accounts
6117 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
6118
6119 let mut update_index_time = Measure::start("update_index");
6121
6122 self.update_index(
6123 infos,
6124 &accounts,
6125 UpsertReclaim::PreviousSlotEntryWasCached,
6126 update_index_thread_selection,
6127 &self.thread_pool_foreground,
6128 );
6129
6130 update_index_time.stop();
6131 self.stats
6132 .store_update_index
6133 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
6134 self.stats
6135 .store_num_accounts
6136 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
6137 self.report_store_timings();
6138 }
6139
6140 pub fn store_accounts_frozen<'a>(
6146 &self,
6147 accounts: impl StorableAccounts<'a>,
6148 storage: &Arc<AccountStorageEntry>,
6149 update_index_thread_selection: UpdateIndexThreadSelection,
6150 ) -> StoreAccountsTiming {
6151 self._store_accounts_frozen(
6152 accounts,
6153 storage,
6154 UpsertReclaim::IgnoreReclaims,
6155 update_index_thread_selection,
6156 )
6157 }
6158
6159 fn _store_accounts_frozen<'a>(
6163 &self,
6164 accounts: impl StorableAccounts<'a>,
6165 storage: &Arc<AccountStorageEntry>,
6166 reclaim_handling: UpsertReclaim,
6167 update_index_thread_selection: UpdateIndexThreadSelection,
6168 ) -> StoreAccountsTiming {
6169 let slot = accounts.target_slot();
6170 let mut store_accounts_time = Measure::start("store_accounts");
6171
6172 if self.read_only_accounts_cache.can_slot_be_in_cache(slot) {
6174 (0..accounts.len()).for_each(|index| {
6175 self.read_only_accounts_cache
6178 .remove_assume_not_present(accounts.pubkey(index));
6179 });
6180 }
6181
6182 let infos = self.write_accounts_to_storage(slot, storage, &accounts);
6184 store_accounts_time.stop();
6185 self.stats
6186 .store_accounts
6187 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
6188
6189 self.mark_zero_lamport_single_ref_accounts(&infos, storage, reclaim_handling);
6190
6191 let mut update_index_time = Measure::start("update_index");
6192
6193 let reclaims = self.update_index(
6198 infos,
6199 &accounts,
6200 reclaim_handling,
6201 update_index_thread_selection,
6202 &self.thread_pool_background,
6203 );
6204
6205 update_index_time.stop();
6206 self.stats
6207 .store_update_index
6208 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
6209 self.stats
6210 .store_num_accounts
6211 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
6212
6213 let mut handle_reclaims_elapsed = 0;
6216 if !reclaims.is_empty() {
6217 let purge_stats = PurgeStats::default();
6218 let mut handle_reclaims_time = Measure::start("handle_reclaims");
6219 self.handle_reclaims(
6220 (!reclaims.is_empty()).then(|| reclaims.iter()),
6221 None,
6222 &HashSet::default(),
6223 HandleReclaims::ProcessDeadSlots(&purge_stats),
6224 MarkAccountsObsolete::Yes(slot),
6225 );
6226 handle_reclaims_time.stop();
6227 handle_reclaims_elapsed = handle_reclaims_time.as_us();
6228 self.stats.num_obsolete_slots_removed.fetch_add(
6229 purge_stats.num_stored_slots_removed.load(Ordering::Relaxed),
6230 Ordering::Relaxed,
6231 );
6232 self.stats.num_obsolete_bytes_removed.fetch_add(
6233 purge_stats
6234 .total_removed_stored_bytes
6235 .load(Ordering::Relaxed),
6236 Ordering::Relaxed,
6237 );
6238 self.stats
6239 .store_handle_reclaims
6240 .fetch_add(handle_reclaims_elapsed, Ordering::Relaxed);
6241 }
6242
6243 StoreAccountsTiming {
6244 store_accounts_elapsed: store_accounts_time.as_us(),
6245 update_index_elapsed: update_index_time.as_us(),
6246 handle_reclaims_elapsed,
6247 }
6248 }
6249
6250 fn write_accounts_to_cache<'a, 'b>(
6251 &self,
6252 slot: Slot,
6253 accounts_and_meta_to_store: &impl StorableAccounts<'b>,
6254 txs: Option<&[&SanitizedTransaction]>,
6255 ) -> Vec<AccountInfo> {
6256 let mut current_write_version = if self.accounts_update_notifier.is_some() {
6257 self.write_version
6258 .fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel)
6259 } else {
6260 0
6261 };
6262
6263 (0..accounts_and_meta_to_store.len())
6264 .map(|index| {
6265 let txn = txs.map(|txs| *txs.get(index).expect("txs must be present if provided"));
6266 accounts_and_meta_to_store.account_default_if_zero_lamport(index, |account| {
6267 let account_shared_data = account.to_account_shared_data();
6268 let pubkey = account.pubkey();
6269 let account_info =
6270 AccountInfo::new(StorageLocation::Cached, account.is_zero_lamport());
6271
6272 self.notify_account_at_accounts_update(
6273 slot,
6274 &account_shared_data,
6275 &txn,
6276 pubkey,
6277 current_write_version,
6278 );
6279 current_write_version = current_write_version.saturating_add(1);
6280
6281 self.accounts_cache.store(slot, pubkey, account_shared_data);
6282 account_info
6283 })
6284 })
6285 .collect()
6286 }
6287
6288 fn write_accounts_to_storage<'a>(
6289 &self,
6290 slot: Slot,
6291 storage: &AccountStorageEntry,
6292 accounts_and_meta_to_store: &impl StorableAccounts<'a>,
6293 ) -> Vec<AccountInfo> {
6294 let mut infos: Vec<AccountInfo> = Vec::with_capacity(accounts_and_meta_to_store.len());
6295 let mut total_append_accounts_us = 0;
6296 while infos.len() < accounts_and_meta_to_store.len() {
6297 let mut append_accounts = Measure::start("append_accounts");
6298 let stored_accounts_info = storage
6299 .accounts
6300 .write_accounts(accounts_and_meta_to_store, infos.len());
6301 append_accounts.stop();
6302 total_append_accounts_us += append_accounts.as_us();
6303 let Some(stored_accounts_info) = stored_accounts_info else {
6304 storage.set_status(AccountStorageStatus::Full);
6305
6306 let data_len = accounts_and_meta_to_store.data_len(infos.len());
6308 let data_len = (data_len + STORE_META_OVERHEAD) as u64;
6309 if !self.has_space_available(slot, data_len) {
6310 info!(
6311 "write_accounts_to_storage, no space: {}, {}, {}, {}, {}",
6312 storage.accounts.capacity(),
6313 storage.accounts.remaining_bytes(),
6314 data_len,
6315 infos.len(),
6316 accounts_and_meta_to_store.len()
6317 );
6318 let special_store_size = std::cmp::max(data_len * 2, self.file_size);
6319 self.create_and_insert_store(slot, special_store_size, "large create");
6320 }
6321 continue;
6322 };
6323
6324 let store_id = storage.id();
6325 for (i, offset) in stored_accounts_info.offsets.iter().enumerate() {
6326 infos.push(AccountInfo::new(
6327 StorageLocation::AppendVec(store_id, *offset),
6328 accounts_and_meta_to_store.is_zero_lamport(i),
6329 ));
6330 }
6331 storage.add_accounts(
6332 stored_accounts_info.offsets.len(),
6333 stored_accounts_info.size,
6334 );
6335
6336 storage.set_status(AccountStorageStatus::Available);
6338 }
6339
6340 self.stats
6341 .store_append_accounts
6342 .fetch_add(total_append_accounts_us, Ordering::Relaxed);
6343
6344 infos
6345 }
6346
6347 fn mark_zero_lamport_single_ref_accounts(
6349 &self,
6350 account_infos: &[AccountInfo],
6351 storage: &AccountStorageEntry,
6352 reclaim_handling: UpsertReclaim,
6353 ) {
6354 if reclaim_handling == UpsertReclaim::ReclaimOldSlots {
6360 let mut add_zero_lamport_accounts = Measure::start("add_zero_lamport_accounts");
6361 let mut num_zero_lamport_accounts_added = 0;
6362
6363 for account_info in account_infos {
6364 if account_info.is_zero_lamport() {
6365 storage.insert_zero_lamport_single_ref_account_offset(account_info.offset());
6366 num_zero_lamport_accounts_added += 1;
6367 }
6368 }
6369
6370 if num_zero_lamport_accounts_added > 0
6372 && self.is_candidate_for_shrink(storage)
6373 && Self::is_shrinking_productive(storage)
6374 {
6375 self.shrink_candidate_slots
6376 .lock()
6377 .unwrap()
6378 .insert(storage.slot);
6379 }
6380
6381 add_zero_lamport_accounts.stop();
6382 self.stats
6383 .add_zero_lamport_accounts_us
6384 .fetch_add(add_zero_lamport_accounts.as_us(), Ordering::Relaxed);
6385 self.stats
6386 .num_zero_lamport_accounts_added
6387 .fetch_add(num_zero_lamport_accounts_added, Ordering::Relaxed);
6388 }
6389 }
6390
6391 fn report_store_timings(&self) {
6392 if self.stats.last_store_report.should_update(1000) {
6393 let read_cache_stats = self.read_only_accounts_cache.get_and_reset_stats();
6394 datapoint_info!(
6395 "accounts_db_store_timings",
6396 (
6397 "hash_accounts",
6398 self.stats.store_hash_accounts.swap(0, Ordering::Relaxed),
6399 i64
6400 ),
6401 (
6402 "store_accounts",
6403 self.stats.store_accounts.swap(0, Ordering::Relaxed),
6404 i64
6405 ),
6406 (
6407 "update_index",
6408 self.stats.store_update_index.swap(0, Ordering::Relaxed),
6409 i64
6410 ),
6411 (
6412 "handle_reclaims",
6413 self.stats.store_handle_reclaims.swap(0, Ordering::Relaxed),
6414 i64
6415 ),
6416 (
6417 "append_accounts",
6418 self.stats.store_append_accounts.swap(0, Ordering::Relaxed),
6419 i64
6420 ),
6421 (
6422 "stakes_cache_check_and_store_us",
6423 self.stats
6424 .stakes_cache_check_and_store_us
6425 .swap(0, Ordering::Relaxed),
6426 i64
6427 ),
6428 (
6429 "num_accounts",
6430 self.stats.store_num_accounts.swap(0, Ordering::Relaxed),
6431 i64
6432 ),
6433 (
6434 "total_data",
6435 self.stats.store_total_data.swap(0, Ordering::Relaxed),
6436 i64
6437 ),
6438 (
6439 "read_only_accounts_cache_entries",
6440 self.read_only_accounts_cache.cache_len(),
6441 i64
6442 ),
6443 (
6444 "read_only_accounts_cache_data_size",
6445 self.read_only_accounts_cache.data_size(),
6446 i64
6447 ),
6448 ("read_only_accounts_cache_hits", read_cache_stats.hits, i64),
6449 (
6450 "read_only_accounts_cache_misses",
6451 read_cache_stats.misses,
6452 i64
6453 ),
6454 (
6455 "read_only_accounts_cache_evicts",
6456 read_cache_stats.evicts,
6457 i64
6458 ),
6459 (
6460 "read_only_accounts_cache_load_us",
6461 read_cache_stats.load_us,
6462 i64
6463 ),
6464 (
6465 "read_only_accounts_cache_store_us",
6466 read_cache_stats.store_us,
6467 i64
6468 ),
6469 (
6470 "read_only_accounts_cache_evict_us",
6471 read_cache_stats.evict_us,
6472 i64
6473 ),
6474 (
6475 "read_only_accounts_cache_evictor_wakeup_count_all",
6476 read_cache_stats.evictor_wakeup_count_all,
6477 i64
6478 ),
6479 (
6480 "read_only_accounts_cache_evictor_wakeup_count_productive",
6481 read_cache_stats.evictor_wakeup_count_productive,
6482 i64
6483 ),
6484 (
6485 "handle_dead_keys_us",
6486 self.stats.handle_dead_keys_us.swap(0, Ordering::Relaxed),
6487 i64
6488 ),
6489 (
6490 "purge_exact_us",
6491 self.stats.purge_exact_us.swap(0, Ordering::Relaxed),
6492 i64
6493 ),
6494 (
6495 "purge_exact_count",
6496 self.stats.purge_exact_count.swap(0, Ordering::Relaxed),
6497 i64
6498 ),
6499 (
6500 "num_obsolete_slots_removed",
6501 self.stats
6502 .num_obsolete_slots_removed
6503 .swap(0, Ordering::Relaxed),
6504 i64
6505 ),
6506 (
6507 "num_obsolete_bytes_removed",
6508 self.stats
6509 .num_obsolete_bytes_removed
6510 .swap(0, Ordering::Relaxed),
6511 i64
6512 ),
6513 (
6514 "add_zero_lamport_accounts_us",
6515 self.stats
6516 .add_zero_lamport_accounts_us
6517 .swap(0, Ordering::Relaxed),
6518 i64
6519 ),
6520 (
6521 "num_zero_lamport_accounts_added",
6522 self.stats
6523 .num_zero_lamport_accounts_added
6524 .swap(0, Ordering::Relaxed),
6525 i64
6526 ),
6527 );
6528
6529 datapoint_info!(
6530 "accounts_db_store_timings2",
6531 (
6532 "create_store_count",
6533 self.stats.create_store_count.swap(0, Ordering::Relaxed),
6534 i64
6535 ),
6536 (
6537 "store_get_slot_store",
6538 self.stats.store_get_slot_store.swap(0, Ordering::Relaxed),
6539 i64
6540 ),
6541 (
6542 "store_find_existing",
6543 self.stats.store_find_existing.swap(0, Ordering::Relaxed),
6544 i64
6545 ),
6546 (
6547 "dropped_stores",
6548 self.stats.dropped_stores.swap(0, Ordering::Relaxed),
6549 i64
6550 ),
6551 );
6552 }
6553 }
6554
6555 pub fn add_root(&self, slot: Slot) -> AccountsAddRootTiming {
6556 let mut index_time = Measure::start("index_add_root");
6557 self.accounts_index.add_root(slot);
6558 index_time.stop();
6559 let mut cache_time = Measure::start("cache_add_root");
6560 self.accounts_cache.add_root(slot);
6561 cache_time.stop();
6562 let mut store_time = Measure::start("store_add_root");
6563 if let Some(store) = self
6567 .storage
6568 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
6569 {
6570 self.dirty_stores.insert(slot, store);
6571 }
6572 store_time.stop();
6573
6574 AccountsAddRootTiming {
6575 index_us: index_time.as_us(),
6576 cache_us: cache_time.as_us(),
6577 store_us: store_time.as_us(),
6578 }
6579 }
6580
6581 pub fn get_storages(
6583 &self,
6584 requested_slots: impl RangeBounds<Slot> + Sync,
6585 ) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
6586 let start = Instant::now();
6587 let (slots, storages) = self
6588 .storage
6589 .get_if(|slot, storage| requested_slots.contains(slot) && storage.has_accounts())
6590 .into_vec()
6591 .into_iter()
6592 .unzip();
6593 let duration = start.elapsed();
6594 debug!("get_snapshot_storages: {duration:?}");
6595 (storages, slots)
6596 }
6597
6598 pub fn latest_full_snapshot_slot(&self) -> Option<Slot> {
6600 self.latest_full_snapshot_slot.read()
6601 }
6602
6603 pub fn set_latest_full_snapshot_slot(&self, slot: Slot) {
6605 *self.latest_full_snapshot_slot.lock_write() = Some(slot);
6606 }
6607
6608 fn generate_index_for_slot<'a>(
6609 &self,
6610 reader: &mut impl RequiredLenBufFileRead<'a>,
6611 storage: &'a AccountStorageEntry,
6612 slot: Slot,
6613 store_id: AccountsFileId,
6614 storage_info: &StorageSizeAndCountMap,
6615 ) -> SlotIndexGenerationInfo {
6616 if storage.accounts.get_account_data_lens(&[0]).is_empty() {
6617 return SlotIndexGenerationInfo::default();
6618 }
6619 let secondary = !self.account_indexes.is_empty();
6620
6621 let mut accounts_data_len = 0;
6622 let mut stored_size_alive = 0;
6623 let mut zero_lamport_pubkeys = vec![];
6624 let mut all_accounts_are_zero_lamports = true;
6625
6626 let (insert_time_us, generate_index_results) = {
6627 let mut keyed_account_infos = vec![];
6628 let mut itemizer = |info: IndexInfo| {
6630 stored_size_alive += info.stored_size_aligned;
6631 if info.index_info.lamports > 0 {
6632 accounts_data_len += info.index_info.data_len;
6633 all_accounts_are_zero_lamports = false;
6634 } else {
6635 zero_lamport_pubkeys.push(info.index_info.pubkey);
6637 }
6638 keyed_account_infos.push((
6639 info.index_info.pubkey,
6640 AccountInfo::new(
6641 StorageLocation::AppendVec(store_id, info.index_info.offset), info.index_info.is_zero_lamport(),
6643 ),
6644 ));
6645 };
6646
6647 if secondary {
6648 storage.accounts.scan_accounts(reader, |offset, account| {
6650 let data_len = account.data.len() as u64;
6651 let stored_size_aligned =
6652 storage.accounts.calculate_stored_size(data_len as usize);
6653 let info = IndexInfo {
6654 stored_size_aligned,
6655 index_info: IndexInfoInner {
6656 offset,
6657 pubkey: *account.pubkey,
6658 lamports: account.lamports,
6659 data_len,
6660 },
6661 };
6662 itemizer(info);
6663 self.accounts_index.update_secondary_indexes(
6664 account.pubkey,
6665 &account,
6666 &self.account_indexes,
6667 );
6668 })
6669 } else {
6670 storage
6672 .accounts
6673 .scan_accounts_without_data(|offset, account| {
6674 let data_len = account.data_len as u64;
6675 let stored_size_aligned =
6676 storage.accounts.calculate_stored_size(data_len as usize);
6677 let info = IndexInfo {
6678 stored_size_aligned,
6679 index_info: IndexInfoInner {
6680 offset,
6681 pubkey: *account.pubkey,
6682 lamports: account.lamports,
6683 data_len,
6684 },
6685 };
6686 itemizer(info);
6687 })
6688 }
6689 .expect("must scan accounts storage");
6690 self.accounts_index
6691 .insert_new_if_missing_into_primary_index(slot, keyed_account_infos)
6692 };
6693
6694 {
6695 let mut info = storage_info.entry(store_id).or_default();
6697 info.stored_size += stored_size_alive;
6698 info.count += generate_index_results.count;
6699
6700 assert!(
6703 info.stored_size <= u64_align!(storage.accounts.len()),
6704 "Stored size ({}) is larger than the size of the accounts file ({}) for store_id: \
6705 {}",
6706 info.stored_size,
6707 storage.accounts.len(),
6708 store_id
6709 );
6710 }
6711 if !zero_lamport_pubkeys.is_empty() {
6715 let old = self
6716 .uncleaned_pubkeys
6717 .insert(slot, zero_lamport_pubkeys.clone());
6718 assert!(old.is_none());
6719 }
6720 SlotIndexGenerationInfo {
6721 insert_time_us,
6722 num_accounts: generate_index_results.count as u64,
6723 accounts_data_len,
6724 zero_lamport_pubkeys,
6725 all_accounts_are_zero_lamports,
6726 num_did_not_exist: generate_index_results.num_did_not_exist,
6727 num_existed_in_mem: generate_index_results.num_existed_in_mem,
6728 num_existed_on_disk: generate_index_results.num_existed_on_disk,
6729 }
6730 }
6731
6732 pub fn generate_index(
6733 &self,
6734 limit_load_slot_count_from_snapshot: Option<usize>,
6735 verify: bool,
6736 ) -> IndexGenerationInfo {
6737 let mut total_time = Measure::start("generate_index");
6738
6739 let mut storages = self.storage.all_storages();
6740 storages.sort_unstable_by_key(|storage| storage.slot);
6741 if let Some(limit) = limit_load_slot_count_from_snapshot {
6742 storages.truncate(limit); }
6744 let accounts_data_len = AtomicU64::new(0);
6745
6746 let zero_lamport_pubkeys = Mutex::new(HashSet::new());
6747 let mut outer_duplicates_lt_hash = None;
6748
6749 let passes = if verify { 2 } else { 1 };
6753 for pass in 0..passes {
6754 if pass == 0 {
6755 self.accounts_index
6756 .set_startup(Startup::StartupWithExtraThreads);
6757 }
6758 let storage_info = StorageSizeAndCountMap::default();
6759 let total_processed_slots_across_all_threads = AtomicU64::new(0);
6760 let outer_slots_len = storages.len();
6761 let threads = num_cpus::get();
6762 let chunk_size = (outer_slots_len / (std::cmp::max(1, threads.saturating_sub(1)))) + 1; let mut index_time = Measure::start("index");
6764 let insertion_time_us = AtomicU64::new(0);
6765 let total_including_duplicates = AtomicU64::new(0);
6766 let all_accounts_are_zero_lamports_slots = AtomicU64::new(0);
6767 let mut all_zeros_slots = Mutex::new(Vec::<(Slot, Arc<AccountStorageEntry>)>::new());
6768 let scan_time: u64 = storages
6769 .par_chunks(chunk_size)
6770 .map(|storages| {
6771 let mut reader = append_vec::new_scan_accounts_reader();
6772 let mut log_status = MultiThreadProgress::new(
6773 &total_processed_slots_across_all_threads,
6774 2,
6775 outer_slots_len as u64,
6776 );
6777 let mut scan_time_sum = 0;
6778 let mut all_accounts_are_zero_lamports_slots_inner = 0;
6779 let mut all_zeros_slots_inner = vec![];
6780 let mut local_zero_lamport_pubkeys = Vec::new();
6781 let mut insert_time_sum = 0;
6782 let mut total_including_duplicates_sum = 0;
6783 let mut accounts_data_len_sum = 0;
6784 let mut local_num_did_not_exist = 0;
6785 let mut local_num_existed_in_mem = 0;
6786 let mut local_num_existed_on_disk = 0;
6787 for (index, storage) in storages.iter().enumerate() {
6788 let mut scan_time = Measure::start("scan");
6789 log_status.report(index as u64);
6790 let store_id = storage.id();
6791 let slot = storage.slot();
6792
6793 scan_time.stop();
6794 scan_time_sum += scan_time.as_us();
6795
6796 let insert_us = if pass == 0 {
6797 self.maybe_throttle_index_generation();
6799 let SlotIndexGenerationInfo {
6800 insert_time_us: insert_us,
6801 num_accounts: total_this_slot,
6802 accounts_data_len: accounts_data_len_this_slot,
6803 zero_lamport_pubkeys: mut zero_lamport_pubkeys_this_slot,
6804 all_accounts_are_zero_lamports,
6805 num_did_not_exist,
6806 num_existed_in_mem,
6807 num_existed_on_disk,
6808 } = self.generate_index_for_slot(
6809 &mut reader,
6810 storage,
6811 slot,
6812 store_id,
6813 &storage_info,
6814 );
6815
6816 local_num_did_not_exist += num_did_not_exist;
6817 local_num_existed_in_mem += num_existed_in_mem;
6818 local_num_existed_on_disk += num_existed_on_disk;
6819 total_including_duplicates_sum += total_this_slot;
6820 accounts_data_len_sum += accounts_data_len_this_slot;
6821 if all_accounts_are_zero_lamports {
6822 all_accounts_are_zero_lamports_slots_inner += 1;
6823 all_zeros_slots_inner.push((slot, Arc::clone(storage)));
6824 }
6825 local_zero_lamport_pubkeys.append(&mut zero_lamport_pubkeys_this_slot);
6826
6827 insert_us
6828 } else {
6829 assert!(verify);
6831 let mut lookup_time = Measure::start("lookup_time");
6832 storage
6833 .accounts
6834 .scan_accounts_without_data(|offset, account| {
6835 let key = account.pubkey();
6836 let index_entry = self.accounts_index.get_cloned(key).unwrap();
6837 let slot_list = index_entry.slot_list.read().unwrap();
6838 let mut count = 0;
6839 for (slot2, account_info2) in slot_list.iter() {
6840 if *slot2 == slot {
6841 count += 1;
6842 let ai = AccountInfo::new(
6843 StorageLocation::AppendVec(store_id, offset), account.is_zero_lamport(),
6845 );
6846 assert_eq!(&ai, account_info2);
6847 }
6848 }
6849 assert_eq!(1, count);
6850 })
6851 .expect("must scan accounts storage");
6852 lookup_time.stop();
6853 lookup_time.as_us()
6854 };
6855 insert_time_sum += insert_us;
6856 }
6857
6858 if pass == 0 {
6859 let mut zero_lamport_pubkeys_lock = zero_lamport_pubkeys.lock().unwrap();
6860 zero_lamport_pubkeys_lock.reserve(local_zero_lamport_pubkeys.len());
6861 zero_lamport_pubkeys_lock.extend(local_zero_lamport_pubkeys.into_iter());
6862 drop(zero_lamport_pubkeys_lock);
6863
6864 let index_stats = self.accounts_index.bucket_map_holder_stats();
6867
6868 index_stats.inc_insert_count(local_num_did_not_exist);
6870 index_stats.add_mem_count(local_num_did_not_exist as usize);
6871
6872 index_stats
6874 .entries_from_mem
6875 .fetch_add(local_num_existed_in_mem, Ordering::Relaxed);
6876 index_stats
6877 .updates_in_mem
6878 .fetch_add(local_num_existed_in_mem, Ordering::Relaxed);
6879
6880 index_stats.add_mem_count(local_num_existed_on_disk as usize);
6882 index_stats
6883 .entries_missing
6884 .fetch_add(local_num_existed_on_disk, Ordering::Relaxed);
6885 index_stats
6886 .updates_in_mem
6887 .fetch_add(local_num_existed_on_disk, Ordering::Relaxed);
6888 }
6889
6890 all_accounts_are_zero_lamports_slots.fetch_add(
6891 all_accounts_are_zero_lamports_slots_inner,
6892 Ordering::Relaxed,
6893 );
6894 all_zeros_slots
6895 .lock()
6896 .unwrap()
6897 .append(&mut all_zeros_slots_inner);
6898 insertion_time_us.fetch_add(insert_time_sum, Ordering::Relaxed);
6899 total_including_duplicates
6900 .fetch_add(total_including_duplicates_sum, Ordering::Relaxed);
6901 accounts_data_len.fetch_add(accounts_data_len_sum, Ordering::Relaxed);
6902 scan_time_sum
6903 })
6904 .sum();
6905 index_time.stop();
6906
6907 let mut index_flush_us = 0;
6908 let total_duplicate_slot_keys = AtomicU64::default();
6909 let mut populate_duplicate_keys_us = 0;
6910 let total_num_unique_duplicate_keys = AtomicU64::default();
6911
6912 let unique_pubkeys_by_bin = Mutex::new(Vec::<Vec<Pubkey>>::default());
6915 if pass == 0 {
6916 let mut m = Measure::start("accounts_index_idle_us");
6918 self.accounts_index.set_startup(Startup::Normal);
6919 m.stop();
6920 index_flush_us = m.as_us();
6921
6922 populate_duplicate_keys_us = measure_us!({
6923 self.accounts_index
6926 .populate_and_retrieve_duplicate_keys_from_startup(|slot_keys| {
6927 total_duplicate_slot_keys
6928 .fetch_add(slot_keys.len() as u64, Ordering::Relaxed);
6929 let unique_keys =
6930 HashSet::<Pubkey>::from_iter(slot_keys.iter().map(|(_, key)| *key));
6931 for (slot, key) in slot_keys {
6932 self.uncleaned_pubkeys.entry(slot).or_default().push(key);
6933 }
6934 let unique_pubkeys_by_bin_inner =
6935 unique_keys.into_iter().collect::<Vec<_>>();
6936 total_num_unique_duplicate_keys.fetch_add(
6937 unique_pubkeys_by_bin_inner.len() as u64,
6938 Ordering::Relaxed,
6939 );
6940 unique_pubkeys_by_bin
6942 .lock()
6943 .unwrap()
6944 .push(unique_pubkeys_by_bin_inner);
6945 });
6946 })
6947 .1;
6948 }
6949 let unique_pubkeys_by_bin = unique_pubkeys_by_bin.into_inner().unwrap();
6950
6951 let mut timings = GenerateIndexTimings {
6952 index_flush_us,
6953 scan_time,
6954 index_time: index_time.as_us(),
6955 insertion_time_us: insertion_time_us.load(Ordering::Relaxed),
6956 total_duplicate_slot_keys: total_duplicate_slot_keys.load(Ordering::Relaxed),
6957 total_num_unique_duplicate_keys: total_num_unique_duplicate_keys
6958 .load(Ordering::Relaxed),
6959 populate_duplicate_keys_us,
6960 total_including_duplicates: total_including_duplicates.load(Ordering::Relaxed),
6961 total_slots: storages.len() as u64,
6962 all_accounts_are_zero_lamports_slots: all_accounts_are_zero_lamports_slots
6963 .load(Ordering::Relaxed),
6964 ..GenerateIndexTimings::default()
6965 };
6966
6967 if pass == 0 {
6968 #[derive(Debug, Default)]
6969 struct DuplicatePubkeysVisitedInfo {
6970 accounts_data_len_from_duplicates: u64,
6971 num_duplicate_accounts: u64,
6972 duplicates_lt_hash: Option<Box<DuplicatesLtHash>>,
6973 }
6974 impl DuplicatePubkeysVisitedInfo {
6975 fn reduce(mut self, other: Self) -> Self {
6976 self.accounts_data_len_from_duplicates +=
6977 other.accounts_data_len_from_duplicates;
6978 self.num_duplicate_accounts += other.num_duplicate_accounts;
6979
6980 match (
6981 self.duplicates_lt_hash.is_some(),
6982 other.duplicates_lt_hash.is_some(),
6983 ) {
6984 (true, true) => {
6985 self.duplicates_lt_hash
6987 .as_mut()
6988 .unwrap()
6989 .0
6990 .mix_in(&other.duplicates_lt_hash.as_ref().unwrap().0);
6991 }
6992 (true, false) => {
6993 }
6995 (false, true) => {
6996 self.duplicates_lt_hash = other.duplicates_lt_hash;
6998 }
6999 (false, false) => {
7000 }
7002 }
7003 self
7004 }
7005 }
7006
7007 let zero_lamport_pubkeys_to_visit =
7008 std::mem::take(&mut *zero_lamport_pubkeys.lock().unwrap());
7009 let (num_zero_lamport_single_refs, visit_zero_lamports_us) =
7010 measure_us!(self
7011 .visit_zero_lamport_pubkeys_during_startup(&zero_lamport_pubkeys_to_visit));
7012 timings.visit_zero_lamports_us = visit_zero_lamports_us;
7013 timings.num_zero_lamport_single_refs = num_zero_lamport_single_refs;
7014
7015 let mut accounts_data_len_dedup_timer =
7017 Measure::start("handle accounts data len duplicates");
7018 let DuplicatePubkeysVisitedInfo {
7019 accounts_data_len_from_duplicates,
7020 num_duplicate_accounts,
7021 duplicates_lt_hash,
7022 } = unique_pubkeys_by_bin
7023 .par_iter()
7024 .fold(
7025 DuplicatePubkeysVisitedInfo::default,
7026 |accum, pubkeys_by_bin| {
7027 let intermediate = pubkeys_by_bin
7028 .par_chunks(4096)
7029 .fold(DuplicatePubkeysVisitedInfo::default, |accum, pubkeys| {
7030 let (
7031 accounts_data_len_from_duplicates,
7032 accounts_duplicates_num,
7033 duplicates_lt_hash,
7034 ) = self
7035 .visit_duplicate_pubkeys_during_startup(pubkeys, &timings);
7036 let intermediate = DuplicatePubkeysVisitedInfo {
7037 accounts_data_len_from_duplicates,
7038 num_duplicate_accounts: accounts_duplicates_num,
7039 duplicates_lt_hash,
7040 };
7041 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
7042 })
7043 .reduce(
7044 DuplicatePubkeysVisitedInfo::default,
7045 DuplicatePubkeysVisitedInfo::reduce,
7046 );
7047 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
7048 },
7049 )
7050 .reduce(
7051 DuplicatePubkeysVisitedInfo::default,
7052 DuplicatePubkeysVisitedInfo::reduce,
7053 );
7054 accounts_data_len_dedup_timer.stop();
7055 timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us();
7056 timings.num_duplicate_accounts = num_duplicate_accounts;
7057
7058 accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);
7059 if let Some(duplicates_lt_hash) = duplicates_lt_hash {
7060 let old_val = outer_duplicates_lt_hash.replace(duplicates_lt_hash);
7061 assert!(old_val.is_none());
7062 }
7063 info!(
7064 "accounts data len: {}",
7065 accounts_data_len.load(Ordering::Relaxed)
7066 );
7067
7068 let all_zero_slots_to_clean = std::mem::take(all_zeros_slots.get_mut().unwrap());
7070 info!(
7071 "insert all zero slots to clean at startup {}",
7072 all_zero_slots_to_clean.len()
7073 );
7074 for (slot, storage) in all_zero_slots_to_clean {
7075 self.dirty_stores.insert(slot, storage);
7076 }
7077 }
7078
7079 if pass == 0 {
7080 for storage in &storages {
7082 self.accounts_index.add_root(storage.slot());
7083 }
7084
7085 self.set_storage_count_and_alive_bytes(storage_info, &mut timings);
7086
7087 if self.mark_obsolete_accounts {
7088 let mut mark_obsolete_accounts_time =
7089 Measure::start("mark_obsolete_accounts_time");
7090 let slot_marked_obsolete = storages.last().unwrap().slot();
7096 let obsolete_account_stats = self.mark_obsolete_accounts_at_startup(
7097 slot_marked_obsolete,
7098 unique_pubkeys_by_bin,
7099 );
7100
7101 mark_obsolete_accounts_time.stop();
7102 timings.mark_obsolete_accounts_us = mark_obsolete_accounts_time.as_us();
7103 timings.num_obsolete_accounts_marked =
7104 obsolete_account_stats.accounts_marked_obsolete;
7105 timings.num_slots_removed_as_obsolete = obsolete_account_stats.slots_removed;
7106 }
7107 }
7108 total_time.stop();
7109 timings.total_time_us = total_time.as_us();
7110 timings.report(self.accounts_index.get_startup_stats());
7111 }
7112
7113 self.accounts_index.log_secondary_indexes();
7114
7115 if outer_duplicates_lt_hash.is_none() {
7121 outer_duplicates_lt_hash = Some(Box::new(DuplicatesLtHash::default()));
7122 }
7123
7124 IndexGenerationInfo {
7125 accounts_data_len: accounts_data_len.load(Ordering::Relaxed),
7126 duplicates_lt_hash: outer_duplicates_lt_hash,
7127 }
7128 }
7129
7130 fn mark_obsolete_accounts_at_startup(
7133 &self,
7134 slot_marked_obsolete: Slot,
7135 pubkeys_with_duplicates_by_bin: Vec<Vec<Pubkey>>,
7136 ) -> ObsoleteAccountsStats {
7137 let stats: ObsoleteAccountsStats = pubkeys_with_duplicates_by_bin
7138 .par_iter()
7139 .map(|pubkeys_by_bin| {
7140 let reclaims = self.accounts_index.clean_and_unref_rooted_entries_by_bin(
7141 pubkeys_by_bin,
7142 |slot, account_info| {
7143 if account_info.is_zero_lamport() {
7146 self.zero_lamport_single_ref_found(slot, account_info.offset());
7147 }
7148 },
7149 );
7150 let stats = PurgeStats::default();
7151
7152 self.handle_reclaims(
7154 (!reclaims.is_empty()).then(|| reclaims.iter()),
7155 None,
7156 &HashSet::new(),
7157 HandleReclaims::ProcessDeadSlots(&stats),
7158 MarkAccountsObsolete::Yes(slot_marked_obsolete),
7159 );
7160 ObsoleteAccountsStats {
7161 accounts_marked_obsolete: reclaims.len() as u64,
7162 slots_removed: stats.total_removed_storage_entries.load(Ordering::Relaxed)
7163 as u64,
7164 }
7165 })
7166 .sum();
7167 stats
7168 }
7169
7170 fn maybe_throttle_index_generation(&self) {
7173 if !self.accounts_index.is_disk_index_enabled() {
7175 return;
7176 }
7177 const LIMIT: usize = 10_000_000;
7185 while self
7186 .accounts_index
7187 .get_startup_remaining_items_to_flush_estimate()
7188 > LIMIT
7189 {
7190 sleep(Duration::from_millis(10));
7193 }
7194 }
7195
7196 fn visit_zero_lamport_pubkeys_during_startup(&self, pubkeys: &HashSet<Pubkey>) -> u64 {
7200 let mut count = 0;
7201 self.accounts_index.scan(
7202 pubkeys.iter(),
7203 |_pubkey, slots_refs, _entry| {
7204 let (slot_list, ref_count) = slots_refs.unwrap();
7205 if ref_count == 1 {
7206 assert_eq!(slot_list.len(), 1);
7207 let (slot_alive, account_info) = slot_list.first().unwrap();
7208 assert!(!account_info.is_cached());
7209 if account_info.is_zero_lamport() {
7210 count += 1;
7211 self.zero_lamport_single_ref_found(*slot_alive, account_info.offset());
7212 }
7213 }
7214 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
7215 },
7216 None,
7217 false,
7218 ScanFilter::All,
7219 );
7220 count
7221 }
7222
7223 fn visit_duplicate_pubkeys_during_startup(
7235 &self,
7236 pubkeys: &[Pubkey],
7237 timings: &GenerateIndexTimings,
7238 ) -> (u64, u64, Option<Box<DuplicatesLtHash>>) {
7239 let mut accounts_data_len_from_duplicates = 0;
7240 let mut num_duplicate_accounts = 0_u64;
7241 let mut duplicates_lt_hash =
7244 (!self.mark_obsolete_accounts).then(|| Box::new(DuplicatesLtHash::default()));
7245 let mut lt_hash_time = Duration::default();
7246 self.accounts_index.scan(
7247 pubkeys.iter(),
7248 |pubkey, slots_refs, _entry| {
7249 if let Some((slot_list, _ref_count)) = slots_refs {
7250 if slot_list.len() > 1 {
7251 let max = slot_list.iter().map(|(slot, _)| slot).max().unwrap();
7257 slot_list.iter().for_each(|(slot, account_info)| {
7258 if slot == max {
7259 return;
7261 }
7262 let maybe_storage_entry = self
7263 .storage
7264 .get_account_storage_entry(*slot, account_info.store_id());
7265 let mut accessor = LoadedAccountAccessor::Stored(
7266 maybe_storage_entry.map(|entry| (entry, account_info.offset())),
7267 );
7268 accessor.check_and_get_loaded_account(|loaded_account| {
7269 let data_len = loaded_account.data_len();
7270 if loaded_account.lamports() > 0 {
7271 accounts_data_len_from_duplicates += data_len;
7272 }
7273 num_duplicate_accounts += 1;
7274 if let Some(duplicates_lt_hash) = duplicates_lt_hash.as_mut() {
7275 let (_, duration) = meas_dur!({
7276 let account_lt_hash =
7277 Self::lt_hash_account(&loaded_account, pubkey);
7278 duplicates_lt_hash.0.mix_in(&account_lt_hash.0);
7279 });
7280 lt_hash_time += duration;
7281 }
7282 });
7283 });
7284 }
7285 }
7286 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
7287 },
7288 None,
7289 false,
7290 ScanFilter::All,
7291 );
7292 timings
7293 .par_duplicates_lt_hash_us
7294 .fetch_add(lt_hash_time.as_micros() as u64, Ordering::Relaxed);
7295 (
7296 accounts_data_len_from_duplicates as u64,
7297 num_duplicate_accounts,
7298 duplicates_lt_hash,
7299 )
7300 }
7301
7302 fn set_storage_count_and_alive_bytes(
7303 &self,
7304 stored_sizes_and_counts: StorageSizeAndCountMap,
7305 timings: &mut GenerateIndexTimings,
7306 ) {
7307 let mut storage_size_storages_time = Measure::start("storage_size_storages");
7309 for (_slot, store) in self.storage.iter() {
7310 let id = store.id();
7311 assert_eq!(store.alive_bytes(), 0);
7313 if let Some(entry) = stored_sizes_and_counts.get(&id) {
7314 trace!(
7315 "id: {} setting count: {} cur: {}",
7316 id,
7317 entry.count,
7318 store.count(),
7319 );
7320 {
7321 let mut count_and_status = store.count_and_status.lock_write();
7322 assert_eq!(count_and_status.0, 0);
7323 count_and_status.0 = entry.count;
7324 }
7325 store
7326 .alive_bytes
7327 .store(entry.stored_size, Ordering::Release);
7328 } else {
7329 trace!("id: {id} clearing count");
7330 store.count_and_status.lock_write().0 = 0;
7331 }
7332 }
7333 storage_size_storages_time.stop();
7334 timings.storage_size_storages_us = storage_size_storages_time.as_us();
7335 }
7336
7337 pub fn print_accounts_stats(&self, label: &str) {
7338 self.print_index(label);
7339 self.print_count_and_status(label);
7340 }
7341
7342 fn print_index(&self, label: &str) {
7343 let mut alive_roots: Vec<_> = self.accounts_index.all_alive_roots();
7344 #[allow(clippy::stable_sort_primitive)]
7345 alive_roots.sort();
7346 info!("{label}: accounts_index alive_roots: {alive_roots:?}");
7347 self.accounts_index.account_maps.iter().for_each(|map| {
7348 for pubkey in map.keys() {
7349 self.accounts_index.get_and_then(&pubkey, |account_entry| {
7350 if let Some(account_entry) = account_entry {
7351 let list_r = &account_entry.slot_list.read().unwrap();
7352 info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),);
7353 info!(" slots: {list_r:?}");
7354 }
7355 let add_to_in_mem_cache = false;
7356 (add_to_in_mem_cache, ())
7357 });
7358 }
7359 });
7360 }
7361
7362 pub fn print_count_and_status(&self, label: &str) {
7363 let mut slots: Vec<_> = self.storage.all_slots();
7364 #[allow(clippy::stable_sort_primitive)]
7365 slots.sort();
7366 info!("{}: count_and status for {} slots:", label, slots.len());
7367 for slot in &slots {
7368 let entry = self.storage.get_slot_storage_entry(*slot).unwrap();
7369 info!(
7370 " slot: {} id: {} count_and_status: {:?} len: {} capacity: {}",
7371 slot,
7372 entry.id(),
7373 entry.count_and_status.read(),
7374 entry.accounts.len(),
7375 entry.accounts.capacity(),
7376 );
7377 }
7378 }
7379}
7380
7381#[derive(Debug, Copy, Clone)]
7382enum HandleReclaims<'a> {
7383 ProcessDeadSlots(&'a PurgeStats),
7384}
7385
7386#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7390enum MarkAccountsObsolete {
7391 Yes(Slot),
7392 No,
7393}
7394
7395pub enum UpdateIndexThreadSelection {
7396 Inline,
7398 PoolWithThreshold,
7400}
7401
7402#[cfg(feature = "dev-context-only-utils")]
7404impl AccountStorageEntry {
7405 fn accounts_count(&self) -> usize {
7406 let mut count = 0;
7407 self.accounts
7408 .scan_pubkeys(|_| {
7409 count += 1;
7410 })
7411 .expect("must scan accounts storage");
7412 count
7413 }
7414}
7415
7416#[cfg(feature = "dev-context-only-utils")]
7418impl AccountsDb {
7419 pub fn get_len_of_slots_with_uncleaned_pubkeys(&self) -> usize {
7422 self.uncleaned_pubkeys.len()
7423 }
7424
7425 pub fn add_root_and_flush_write_cache(&self, slot: Slot) {
7428 self.add_root(slot);
7429 self.flush_root_write_cache(slot);
7430 }
7431
7432 pub fn load_without_fixed_root(
7433 &self,
7434 ancestors: &Ancestors,
7435 pubkey: &Pubkey,
7436 ) -> Option<(AccountSharedData, Slot)> {
7437 self.do_load(
7438 ancestors,
7439 pubkey,
7440 None,
7441 LoadHint::Unspecified,
7442 LoadZeroLamports::SomeWithZeroLamportAccountForTests,
7444 )
7445 }
7446
7447 pub fn assert_load_account(&self, slot: Slot, pubkey: Pubkey, expected_lamports: u64) {
7448 let ancestors = vec![(slot, 0)].into_iter().collect();
7449 let (account, slot) = self.load_without_fixed_root(&ancestors, &pubkey).unwrap();
7450 assert_eq!((account.lamports(), slot), (expected_lamports, slot));
7451 }
7452
7453 pub fn assert_not_load_account(&self, slot: Slot, pubkey: Pubkey) {
7454 let ancestors = vec![(slot, 0)].into_iter().collect();
7455 let load = self.load_without_fixed_root(&ancestors, &pubkey);
7456 assert!(load.is_none(), "{load:?}");
7457 }
7458
7459 pub fn check_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7460 let ancestors = vec![(slot, 0)].into_iter().collect();
7461 for _ in 0..num {
7462 let idx = thread_rng().gen_range(0..num);
7463 let account = self.load_without_fixed_root(&ancestors, &pubkeys[idx]);
7464 let account1 = Some((
7465 AccountSharedData::new(
7466 (idx + count) as u64,
7467 0,
7468 AccountSharedData::default().owner(),
7469 ),
7470 slot,
7471 ));
7472 assert_eq!(account, account1);
7473 }
7474 }
7475
7476 pub fn scan_accounts_from_storages(
7482 storages: &[Arc<AccountStorageEntry>],
7483 mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>),
7484 ) {
7485 let mut reader = append_vec::new_scan_accounts_reader();
7486 for storage in storages {
7487 storage
7488 .accounts
7489 .scan_accounts(&mut reader, &mut callback)
7490 .expect("must scan accounts storage");
7491 }
7492 }
7493
7494 pub fn store_for_tests<'a>(&self, accounts: impl StorableAccounts<'a>) {
7496 self.store_accounts_unfrozen(
7497 accounts,
7498 None,
7499 UpdateIndexThreadSelection::PoolWithThreshold,
7500 );
7501 }
7502
7503 #[allow(clippy::needless_range_loop)]
7504 pub fn modify_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7505 for idx in 0..num {
7506 let account = AccountSharedData::new(
7507 (idx + count) as u64,
7508 0,
7509 AccountSharedData::default().owner(),
7510 );
7511 self.store_for_tests((slot, [(&pubkeys[idx], &account)].as_slice()));
7512 }
7513 }
7514
7515 pub fn check_storage(&self, slot: Slot, alive_count: usize, total_count: usize) {
7516 let store = self.storage.get_slot_storage_entry(slot).unwrap();
7517 assert_eq!(store.status(), AccountStorageStatus::Available);
7518 assert_eq!(store.count(), alive_count);
7519 assert_eq!(store.accounts_count(), total_count);
7520 }
7521
7522 pub fn create_account(
7523 &self,
7524 pubkeys: &mut Vec<Pubkey>,
7525 slot: Slot,
7526 num: usize,
7527 space: usize,
7528 num_vote: usize,
7529 ) {
7530 let ancestors = vec![(slot, 0)].into_iter().collect();
7531 for t in 0..num {
7532 let pubkey = solana_pubkey::new_rand();
7533 let account =
7534 AccountSharedData::new((t + 1) as u64, space, AccountSharedData::default().owner());
7535 pubkeys.push(pubkey);
7536 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7537 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7538 }
7539 for t in 0..num_vote {
7540 let pubkey = solana_pubkey::new_rand();
7541 let account =
7542 AccountSharedData::new((num + t + 1) as u64, space, &solana_vote_program::id());
7543 pubkeys.push(pubkey);
7544 let ancestors = vec![(slot, 0)].into_iter().collect();
7545 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7546 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7547 }
7548 }
7549
7550 pub fn sizes_of_accounts_in_storage_for_tests(&self, slot: Slot) -> Vec<usize> {
7551 let mut sizes = Vec::default();
7552 if let Some(storage) = self.storage.get_slot_storage_entry(slot) {
7553 storage
7554 .accounts
7555 .scan_accounts_stored_meta(|account| {
7556 sizes.push(account.stored_size());
7557 })
7558 .expect("must scan accounts storage");
7559 }
7560 sizes
7561 }
7562
7563 pub fn ref_count_for_pubkey(&self, pubkey: &Pubkey) -> RefCount {
7564 self.accounts_index.ref_count_from_storage(pubkey)
7565 }
7566
7567 pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize {
7568 self.storage
7569 .get_slot_storage_entry(slot)
7570 .map(|storage| storage.count())
7571 .unwrap_or(0)
7572 .saturating_add(
7573 self.accounts_cache
7574 .slot_cache(slot)
7575 .map(|slot_cache| slot_cache.len())
7576 .unwrap_or_default(),
7577 )
7578 }
7579
7580 pub fn flush_root_write_cache(&self, root: Slot) {
7583 assert!(
7584 self.accounts_index
7585 .roots_tracker
7586 .read()
7587 .unwrap()
7588 .alive_roots
7589 .contains(&root),
7590 "slot: {root}"
7591 );
7592 self.flush_accounts_cache(true, Some(root));
7593 }
7594
7595 pub fn all_account_count_in_accounts_file(&self, slot: Slot) -> usize {
7596 let store = self.storage.get_slot_storage_entry(slot);
7597 if let Some(store) = store {
7598 store.accounts_count()
7599 } else {
7600 0
7601 }
7602 }
7603
7604 pub fn uncleaned_pubkeys(&self) -> &DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>> {
7605 &self.uncleaned_pubkeys
7606 }
7607}