1mod accounts_db_config;
22mod geyser_plugin_utils;
23pub mod stats;
24pub mod tests;
25
26pub use accounts_db_config::{
27 AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS, ACCOUNTS_DB_CONFIG_FOR_TESTING,
28};
29#[cfg(feature = "dev-context-only-utils")]
30use qualifier_attr::qualifiers;
31use {
32 crate::{
33 account_info::{AccountInfo, Offset, StorageLocation},
34 account_storage::{
35 stored_account_info::{StoredAccountInfo, StoredAccountInfoWithoutData},
36 AccountStorage, AccountStoragesOrderer, ShrinkInProgress,
37 },
38 accounts_cache::{AccountsCache, CachedAccount, SlotCache},
39 accounts_db::stats::{
40 AccountsStats, CleanAccountsStats, FlushStats, ObsoleteAccountsStats, PurgeStats,
41 ShrinkAncientStats, ShrinkStats, ShrinkStatsSub, StoreAccountsTiming,
42 },
43 accounts_file::{AccountsFile, AccountsFileError, AccountsFileProvider, StorageAccess},
44 accounts_hash::{AccountLtHash, AccountsLtHash, ZERO_LAMPORT_ACCOUNT_LT_HASH},
45 accounts_index::{
46 in_mem_accounts_index::StartupStats, AccountSecondaryIndexes, AccountsIndex,
47 AccountsIndexRootsStats, AccountsIndexScanResult, IndexKey, IsCached, ReclaimsSlotList,
48 RefCount, ScanConfig, ScanFilter, ScanResult, SlotList, Startup, UpsertReclaim,
49 },
50 accounts_update_notifier_interface::{AccountForGeyser, AccountsUpdateNotifier},
51 active_stats::{ActiveStatItem, ActiveStats},
52 ancestors::Ancestors,
53 append_vec::{self, aligned_stored_size, STORE_META_OVERHEAD},
54 contains::Contains,
55 is_zero_lamport::IsZeroLamport,
56 obsolete_accounts::ObsoleteAccounts,
57 partitioned_rewards::PartitionedEpochRewardsConfig,
58 read_only_accounts_cache::ReadOnlyAccountsCache,
59 storable_accounts::{StorableAccounts, StorableAccountsBySlot},
60 u64_align,
61 utils::{self, create_account_shared_data},
62 },
63 agave_fs::buffered_reader::RequiredLenBufFileRead,
64 dashmap::{DashMap, DashSet},
65 log::*,
66 rand::{thread_rng, Rng},
67 rayon::{prelude::*, ThreadPool},
68 seqlock::SeqLock,
69 smallvec::SmallVec,
70 solana_account::{Account, AccountSharedData, ReadableAccount},
71 solana_clock::{BankId, Epoch, Slot},
72 solana_epoch_schedule::EpochSchedule,
73 solana_lattice_hash::lt_hash::LtHash,
74 solana_measure::{measure::Measure, measure_us},
75 solana_nohash_hasher::{BuildNoHashHasher, IntMap, IntSet},
76 solana_pubkey::Pubkey,
77 solana_rayon_threadlimit::get_thread_count,
78 solana_transaction::sanitized::SanitizedTransaction,
79 std::{
80 borrow::Cow,
81 boxed::Box,
82 collections::{BTreeSet, HashMap, HashSet, VecDeque},
83 io, iter, mem,
84 num::Saturating,
85 ops::RangeBounds,
86 path::{Path, PathBuf},
87 sync::{
88 atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
89 Arc, Condvar, Mutex, RwLock, RwLockReadGuard,
90 },
91 thread::{self, sleep},
92 time::{Duration, Instant},
93 },
94 tempfile::TempDir,
95};
96
97const WRITE_CACHE_LIMIT_BYTES_DEFAULT: u64 = 15_000_000_000;
100const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
101
102const UNREF_ACCOUNTS_BATCH_SIZE: usize = 10_000;
103
104const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
105const DEFAULT_NUM_DIRS: u32 = 4;
106
107pub const DEFAULT_MEMLOCK_BUDGET_SIZE: usize = 2_000_000_000;
111const MEMLOCK_BUDGET_SIZE_FOR_TESTS: usize = 4_000_000;
113
114const SHRINK_COLLECT_CHUNK_SIZE: usize = 50;
117
118const SHRINK_INSERT_ANCIENT_THRESHOLD: usize = 10;
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124pub(crate) enum ScanAccountStorageData {
125 #[cfg_attr(not(test), allow(dead_code))]
128 NoData,
129 DataRefForStorage,
132}
133
134#[derive(Default, Debug)]
135pub(crate) struct AliveAccounts<'a> {
138 pub(crate) slot: Slot,
140 pub(crate) accounts: Vec<&'a AccountFromStorage>,
141 pub(crate) bytes: usize,
142}
143
144#[derive(Debug)]
146pub(crate) struct ShrinkCollectAliveSeparatedByRefs<'a> {
147 pub(crate) one_ref: AliveAccounts<'a>,
149 pub(crate) many_refs_this_is_newest_alive: AliveAccounts<'a>,
151 pub(crate) many_refs_old_alive: AliveAccounts<'a>,
153}
154
155pub(crate) trait ShrinkCollectRefs<'a>: Sync + Send {
156 fn with_capacity(capacity: usize, slot: Slot) -> Self;
157 fn collect(&mut self, other: Self);
158 fn add(
159 &mut self,
160 ref_count: RefCount,
161 account: &'a AccountFromStorage,
162 slot_list: &[(Slot, AccountInfo)],
163 );
164 fn len(&self) -> usize;
165 fn alive_bytes(&self) -> usize;
166 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage>;
167}
168
169impl<'a> ShrinkCollectRefs<'a> for AliveAccounts<'a> {
170 fn collect(&mut self, mut other: Self) {
171 self.bytes = self.bytes.saturating_add(other.bytes);
172 self.accounts.append(&mut other.accounts);
173 }
174 fn with_capacity(capacity: usize, slot: Slot) -> Self {
175 Self {
176 accounts: Vec::with_capacity(capacity),
177 bytes: 0,
178 slot,
179 }
180 }
181 fn add(
182 &mut self,
183 _ref_count: RefCount,
184 account: &'a AccountFromStorage,
185 _slot_list: &[(Slot, AccountInfo)],
186 ) {
187 self.accounts.push(account);
188 self.bytes = self.bytes.saturating_add(account.stored_size());
189 }
190 fn len(&self) -> usize {
191 self.accounts.len()
192 }
193 fn alive_bytes(&self) -> usize {
194 self.bytes
195 }
196 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
197 &self.accounts
198 }
199}
200
201impl<'a> ShrinkCollectRefs<'a> for ShrinkCollectAliveSeparatedByRefs<'a> {
202 fn collect(&mut self, other: Self) {
203 self.one_ref.collect(other.one_ref);
204 self.many_refs_this_is_newest_alive
205 .collect(other.many_refs_this_is_newest_alive);
206 self.many_refs_old_alive.collect(other.many_refs_old_alive);
207 }
208 fn with_capacity(capacity: usize, slot: Slot) -> Self {
209 Self {
210 one_ref: AliveAccounts::with_capacity(capacity, slot),
211 many_refs_this_is_newest_alive: AliveAccounts::with_capacity(0, slot),
212 many_refs_old_alive: AliveAccounts::with_capacity(0, slot),
213 }
214 }
215 fn add(
216 &mut self,
217 ref_count: RefCount,
218 account: &'a AccountFromStorage,
219 slot_list: &[(Slot, AccountInfo)],
220 ) {
221 let other = if ref_count == 1 {
222 &mut self.one_ref
223 } else if slot_list.len() == 1
224 || !slot_list
225 .iter()
226 .any(|(slot_list_slot, _info)| slot_list_slot > &self.many_refs_old_alive.slot)
227 {
228 &mut self.many_refs_this_is_newest_alive
230 } else {
231 &mut self.many_refs_old_alive
234 };
235 other.add(ref_count, account, slot_list);
236 }
237 fn len(&self) -> usize {
238 self.one_ref
239 .len()
240 .saturating_add(self.many_refs_old_alive.len())
241 .saturating_add(self.many_refs_this_is_newest_alive.len())
242 }
243 fn alive_bytes(&self) -> usize {
244 self.one_ref
245 .alive_bytes()
246 .saturating_add(self.many_refs_old_alive.alive_bytes())
247 .saturating_add(self.many_refs_this_is_newest_alive.alive_bytes())
248 }
249 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
250 unimplemented!("illegal use");
251 }
252}
253
254pub enum StoreReclaims {
255 Default,
257 Ignore,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263enum LoadZeroLamports {
264 None,
266 #[cfg(feature = "dev-context-only-utils")]
272 SomeWithZeroLamportAccountForTests,
273}
274
275#[derive(Debug)]
276pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
277 pub(crate) slot: Slot,
278 pub(crate) capacity: u64,
279 pub(crate) pubkeys_to_unref: Vec<&'a Pubkey>,
280 pub(crate) zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
281 pub(crate) alive_accounts: T,
282 pub(crate) alive_total_bytes: usize,
284 pub(crate) total_starting_accounts: usize,
285 pub(crate) all_are_zero_lamports: bool,
287}
288
289struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
290 alive_accounts: T,
292 pubkeys_to_unref: Vec<&'a Pubkey>,
295 zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
297 all_are_zero_lamports: bool,
299}
300
301#[derive(Debug, PartialEq, Copy, Clone)]
303pub struct AccountFromStorage {
304 pub index_info: AccountInfo,
305 pub data_len: u64,
306 pub pubkey: Pubkey,
307}
308
309impl IsZeroLamport for AccountFromStorage {
310 fn is_zero_lamport(&self) -> bool {
311 self.index_info.is_zero_lamport()
312 }
313}
314
315impl AccountFromStorage {
316 pub fn pubkey(&self) -> &Pubkey {
317 &self.pubkey
318 }
319 pub fn stored_size(&self) -> usize {
320 aligned_stored_size(self.data_len as usize)
321 }
322 pub fn data_len(&self) -> usize {
323 self.data_len as usize
324 }
325 #[cfg(test)]
326 pub(crate) fn new(offset: Offset, account: &StoredAccountInfoWithoutData) -> Self {
327 let storage_id = 0;
331 AccountFromStorage {
332 index_info: AccountInfo::new(
333 StorageLocation::AppendVec(storage_id, offset),
334 account.is_zero_lamport(),
335 ),
336 pubkey: *account.pubkey(),
337 data_len: account.data_len as u64,
338 }
339 }
340}
341
342pub struct GetUniqueAccountsResult {
343 pub stored_accounts: Vec<AccountFromStorage>,
344 pub capacity: u64,
345 pub num_duplicated_accounts: usize,
346}
347
348pub struct AccountsAddRootTiming {
349 pub index_us: u64,
350 pub cache_us: u64,
351}
352
353const ANCIENT_APPEND_VEC_DEFAULT_OFFSET: Option<i64> = Some(100_000);
371const DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE: u64 = 100_000;
375pub const DEFAULT_MAX_ANCIENT_STORAGES: usize = 100_000;
378
379#[cfg(not(test))]
380const ABSURD_CONSECUTIVE_FAILED_ITERATIONS: usize = 100;
381
382#[derive(Debug, Clone, Copy)]
383pub enum AccountShrinkThreshold {
384 TotalSpace { shrink_ratio: f64 },
389 IndividualStore { shrink_ratio: f64 },
392}
393pub const DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE: bool = true;
394pub const DEFAULT_ACCOUNTS_SHRINK_RATIO: f64 = 0.80;
395const DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION: AccountShrinkThreshold =
397 AccountShrinkThreshold::TotalSpace {
398 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_RATIO,
399 };
400
401impl Default for AccountShrinkThreshold {
402 fn default() -> AccountShrinkThreshold {
403 DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION
404 }
405}
406
407pub enum ScanStorageResult<R, B> {
408 Cached(Vec<R>),
409 Stored(B),
410}
411
412#[derive(Debug)]
413pub struct IndexGenerationInfo {
414 pub accounts_data_len: u64,
415 pub calculated_accounts_lt_hash: AccountsLtHash,
418}
419
420#[derive(Debug, Default)]
421struct SlotIndexGenerationInfo {
422 insert_time_us: u64,
423 num_accounts: u64,
424 accounts_data_len: u64,
425 zero_lamport_pubkeys: Vec<Pubkey>,
426 all_accounts_are_zero_lamports: bool,
427 num_did_not_exist: u64,
429 num_existed_in_mem: u64,
431 num_existed_on_disk: u64,
433 slot_lt_hash: SlotLtHash,
435 num_obsolete_accounts_skipped: u64,
438}
439
440#[derive(Debug, Clone, Eq, PartialEq)]
447pub struct DuplicatesLtHash(pub LtHash);
448
449impl Default for DuplicatesLtHash {
450 fn default() -> Self {
451 Self(LtHash::identity())
452 }
453}
454
455#[derive(Debug)]
457struct SlotLtHash(pub LtHash);
458
459impl Default for SlotLtHash {
460 fn default() -> Self {
461 Self(LtHash::identity())
462 }
463}
464
465#[derive(Default, Debug)]
466struct GenerateIndexTimings {
467 pub total_time_us: u64,
468 pub index_time: u64,
469 pub insertion_time_us: u64,
470 pub storage_size_storages_us: u64,
471 pub index_flush_us: u64,
472 pub total_including_duplicates: u64,
473 pub visit_duplicate_accounts_time_us: u64,
474 pub total_duplicate_slot_keys: u64,
475 pub total_num_unique_duplicate_keys: u64,
476 pub num_duplicate_accounts: u64,
477 pub populate_duplicate_keys_us: u64,
478 pub total_slots: u64,
479 pub visit_zero_lamports_us: u64,
480 pub num_zero_lamport_single_refs: u64,
481 pub all_accounts_are_zero_lamports_slots: u64,
482 pub mark_obsolete_accounts_us: u64,
483 pub num_obsolete_accounts_marked: u64,
484 pub num_slots_removed_as_obsolete: u64,
485 pub num_obsolete_accounts_skipped: u64,
486}
487
488#[derive(Default, Debug, PartialEq, Eq)]
489struct StorageSizeAndCount {
490 pub stored_size: usize,
492 pub count: usize,
494}
495type StorageSizeAndCountMap =
496 DashMap<AccountsFileId, StorageSizeAndCount, BuildNoHashHasher<AccountsFileId>>;
497
498impl GenerateIndexTimings {
499 pub fn report(&self, startup_stats: &StartupStats) {
500 datapoint_info!(
501 "generate_index",
502 ("overall_us", self.total_time_us, i64),
503 ("total_us", self.index_time, i64),
505 ("insertion_time_us", self.insertion_time_us, i64),
506 (
507 "storage_size_storages_us",
508 self.storage_size_storages_us,
509 i64
510 ),
511 ("index_flush_us", self.index_flush_us, i64),
512 (
513 "total_items_including_duplicates",
514 self.total_including_duplicates,
515 i64
516 ),
517 (
518 "visit_duplicate_accounts_us",
519 self.visit_duplicate_accounts_time_us,
520 i64
521 ),
522 (
523 "total_duplicate_slot_keys",
524 self.total_duplicate_slot_keys,
525 i64
526 ),
527 (
528 "total_num_unique_duplicate_keys",
529 self.total_num_unique_duplicate_keys,
530 i64
531 ),
532 ("num_duplicate_accounts", self.num_duplicate_accounts, i64),
533 (
534 "populate_duplicate_keys_us",
535 self.populate_duplicate_keys_us,
536 i64
537 ),
538 ("total_slots", self.total_slots, i64),
539 (
540 "copy_data_us",
541 startup_stats.copy_data_us.swap(0, Ordering::Relaxed),
542 i64
543 ),
544 (
545 "num_zero_lamport_single_refs",
546 self.num_zero_lamport_single_refs,
547 i64
548 ),
549 ("visit_zero_lamports_us", self.visit_zero_lamports_us, i64),
550 (
551 "all_accounts_are_zero_lamports_slots",
552 self.all_accounts_are_zero_lamports_slots,
553 i64
554 ),
555 (
556 "mark_obsolete_accounts_us",
557 self.mark_obsolete_accounts_us,
558 i64
559 ),
560 (
561 "num_obsolete_accounts_marked",
562 self.num_obsolete_accounts_marked,
563 i64
564 ),
565 (
566 "num_slots_removed_as_obsolete",
567 self.num_slots_removed_as_obsolete,
568 i64
569 ),
570 (
571 "num_obsolete_accounts_skipped",
572 self.num_obsolete_accounts_skipped,
573 i64
574 ),
575 );
576 }
577}
578
579impl IsZeroLamport for AccountSharedData {
580 fn is_zero_lamport(&self) -> bool {
581 self.lamports() == 0
582 }
583}
584
585impl IsZeroLamport for Account {
586 fn is_zero_lamport(&self) -> bool {
587 self.lamports() == 0
588 }
589}
590
591pub type AtomicAccountsFileId = AtomicU32;
593pub type AccountsFileId = u32;
594
595type AccountSlots = HashMap<Pubkey, IntSet<Slot>>;
596type SlotOffsets = IntMap<Slot, IntSet<Offset>>;
597type ReclaimResult = (AccountSlots, SlotOffsets);
598type PubkeysRemovedFromAccountsIndex = HashSet<Pubkey>;
599type ShrinkCandidates = IntSet<Slot>;
600
601#[derive(Clone, Copy, Debug, PartialEq, Eq)]
606pub enum LoadHint {
607 FixedMaxRoot,
615 FixedMaxRootDoNotPopulateReadCache,
617 Unspecified,
621}
622
623#[derive(Debug)]
624pub enum LoadedAccountAccessor<'a> {
625 Stored(Option<(Arc<AccountStorageEntry>, usize)>),
628 Cached(Option<Cow<'a, Arc<CachedAccount>>>),
630}
631
632impl LoadedAccountAccessor<'_> {
633 fn check_and_get_loaded_account_shared_data(&mut self) -> AccountSharedData {
634 match self {
638 LoadedAccountAccessor::Stored(Some((maybe_storage_entry, offset))) => {
639 maybe_storage_entry
645 .accounts
646 .get_account_shared_data(*offset)
647 .expect(
648 "If a storage entry was found in the storage map, it must not have been \
649 reset yet",
650 )
651 }
652 _ => self.check_and_get_loaded_account(|loaded_account| loaded_account.take_account()),
653 }
654 }
655
656 fn check_and_get_loaded_account<T>(
657 &mut self,
658 callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
659 ) -> T {
660 match self {
664 LoadedAccountAccessor::Cached(None) | LoadedAccountAccessor::Stored(None) => {
665 panic!(
666 "Should have already been taken care of when creating this \
667 LoadedAccountAccessor"
668 );
669 }
670 LoadedAccountAccessor::Cached(Some(_cached_account)) => {
671 self.get_loaded_account(callback).unwrap()
674 }
675 LoadedAccountAccessor::Stored(Some(_maybe_storage_entry)) => {
676 self.get_loaded_account(callback).expect(
682 "If a storage entry was found in the storage map, it must not have been reset \
683 yet",
684 )
685 }
686 }
687 }
688
689 fn get_loaded_account<T>(
690 &mut self,
691 mut callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
692 ) -> Option<T> {
693 match self {
694 LoadedAccountAccessor::Cached(cached_account) => {
695 let cached_account = cached_account.take().expect(
696 "Cache flushed/purged should be handled before trying to fetch account",
697 );
698 Some(callback(LoadedAccount::Cached(cached_account)))
699 }
700 LoadedAccountAccessor::Stored(maybe_storage_entry) => {
701 maybe_storage_entry
705 .as_ref()
706 .and_then(|(storage_entry, offset)| {
707 storage_entry
708 .accounts
709 .get_stored_account_callback(*offset, |account| {
710 callback(LoadedAccount::Stored(account))
711 })
712 })
713 }
714 }
715 }
716}
717
718pub enum LoadedAccount<'a> {
719 Stored(StoredAccountInfo<'a>),
720 Cached(Cow<'a, Arc<CachedAccount>>),
721}
722
723impl LoadedAccount<'_> {
724 pub fn pubkey(&self) -> &Pubkey {
725 match self {
726 LoadedAccount::Stored(stored_account) => stored_account.pubkey(),
727 LoadedAccount::Cached(cached_account) => cached_account.pubkey(),
728 }
729 }
730
731 pub fn take_account(&self) -> AccountSharedData {
732 match self {
733 LoadedAccount::Stored(stored_account) => create_account_shared_data(stored_account),
734 LoadedAccount::Cached(cached_account) => match cached_account {
735 Cow::Owned(cached_account) => cached_account.account.clone(),
736 Cow::Borrowed(cached_account) => cached_account.account.clone(),
737 },
738 }
739 }
740
741 pub fn is_cached(&self) -> bool {
742 match self {
743 LoadedAccount::Stored(_) => false,
744 LoadedAccount::Cached(_) => true,
745 }
746 }
747
748 pub fn data_len(&self) -> usize {
750 self.data().len()
751 }
752}
753
754impl ReadableAccount for LoadedAccount<'_> {
755 fn lamports(&self) -> u64 {
756 match self {
757 LoadedAccount::Stored(stored_account) => stored_account.lamports(),
758 LoadedAccount::Cached(cached_account) => cached_account.account.lamports(),
759 }
760 }
761 fn data(&self) -> &[u8] {
762 match self {
763 LoadedAccount::Stored(stored_account) => stored_account.data(),
764 LoadedAccount::Cached(cached_account) => cached_account.account.data(),
765 }
766 }
767 fn owner(&self) -> &Pubkey {
768 match self {
769 LoadedAccount::Stored(stored_account) => stored_account.owner(),
770 LoadedAccount::Cached(cached_account) => cached_account.account.owner(),
771 }
772 }
773 fn executable(&self) -> bool {
774 match self {
775 LoadedAccount::Stored(stored_account) => stored_account.executable(),
776 LoadedAccount::Cached(cached_account) => cached_account.account.executable(),
777 }
778 }
779 fn rent_epoch(&self) -> Epoch {
780 match self {
781 LoadedAccount::Stored(stored_account) => stored_account.rent_epoch(),
782 LoadedAccount::Cached(cached_account) => cached_account.account.rent_epoch(),
783 }
784 }
785 fn to_account_shared_data(&self) -> AccountSharedData {
786 self.take_account()
787 }
788}
789
790#[derive(Default)]
791struct CleanKeyTimings {
792 collect_delta_keys_us: u64,
793 delta_insert_us: u64,
794 dirty_store_processing_us: u64,
795 delta_key_count: u64,
796 dirty_pubkeys_count: u64,
797 oldest_dirty_slot: Slot,
798 dirty_ancient_stores: usize,
800}
801
802#[derive(Debug)]
804pub struct AccountStorageEntry {
805 pub(crate) id: AccountsFileId,
806
807 pub(crate) slot: Slot,
808
809 pub accounts: AccountsFile,
811
812 count: AtomicUsize,
814
815 alive_bytes: AtomicUsize,
816
817 zero_lamport_single_ref_offsets: RwLock<IntSet<Offset>>,
828
829 obsolete_accounts: RwLock<ObsoleteAccounts>,
838}
839
840impl AccountStorageEntry {
841 pub fn new(
842 path: &Path,
843 slot: Slot,
844 id: AccountsFileId,
845 file_size: u64,
846 provider: AccountsFileProvider,
847 storage_access: StorageAccess,
848 ) -> Self {
849 let tail = AccountsFile::file_name(slot, id);
850 let path = Path::new(path).join(tail);
851 let accounts = provider.new_writable(path, file_size, storage_access);
852
853 Self {
854 id,
855 slot,
856 accounts,
857 count: AtomicUsize::new(0),
858 alive_bytes: AtomicUsize::new(0),
859 zero_lamport_single_ref_offsets: RwLock::default(),
860 obsolete_accounts: RwLock::default(),
861 }
862 }
863
864 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
866 fn reopen_as_readonly(&self, storage_access: StorageAccess) -> Option<Self> {
867 if storage_access != StorageAccess::File {
868 return None;
870 }
871
872 self.accounts.reopen_as_readonly().map(|accounts| Self {
873 id: self.id,
874 slot: self.slot,
875 count: AtomicUsize::new(self.count()),
876 alive_bytes: AtomicUsize::new(self.alive_bytes()),
877 accounts,
878 zero_lamport_single_ref_offsets: RwLock::new(
879 self.zero_lamport_single_ref_offsets.read().unwrap().clone(),
880 ),
881 obsolete_accounts: RwLock::new(self.obsolete_accounts.read().unwrap().clone()),
882 })
883 }
884
885 pub fn new_existing(
886 slot: Slot,
887 id: AccountsFileId,
888 accounts: AccountsFile,
889 obsolete_accounts: ObsoleteAccounts,
890 ) -> Self {
891 Self {
892 id,
893 slot,
894 accounts,
895 count: AtomicUsize::new(0),
896 alive_bytes: AtomicUsize::new(0),
897 zero_lamport_single_ref_offsets: RwLock::default(),
898 obsolete_accounts: RwLock::new(obsolete_accounts),
899 }
900 }
901
902 pub fn count(&self) -> usize {
904 self.count.load(Ordering::Acquire)
905 }
906
907 pub fn alive_bytes(&self) -> usize {
908 self.alive_bytes.load(Ordering::Acquire)
909 }
910
911 pub fn obsolete_accounts_for_snapshots(&self, slot: Slot) -> ObsoleteAccounts {
915 self.obsolete_accounts_read_lock()
916 .obsolete_accounts_for_snapshots(slot)
917 }
918
919 pub(crate) fn obsolete_accounts_read_lock(&self) -> RwLockReadGuard<'_, ObsoleteAccounts> {
921 self.obsolete_accounts.read().unwrap()
922 }
923
924 pub fn get_obsolete_bytes(&self, slot: Option<Slot>) -> usize {
928 let obsolete_bytes: usize = self
929 .obsolete_accounts_read_lock()
930 .filter_obsolete_accounts(slot)
931 .map(|(offset, data_len)| {
932 self.accounts
933 .calculate_stored_size(data_len)
934 .min(self.accounts.len() - offset)
935 })
936 .sum();
937 obsolete_bytes
938 }
939
940 fn insert_zero_lamport_single_ref_account_offset(&self, offset: usize) -> bool {
943 let mut zero_lamport_single_ref_offsets =
944 self.zero_lamport_single_ref_offsets.write().unwrap();
945 zero_lamport_single_ref_offsets.insert(offset)
946 }
947
948 fn batch_insert_zero_lamport_single_ref_account_offsets(&self, offsets: &[Offset]) -> u64 {
951 let mut zero_lamport_single_ref_offsets =
952 self.zero_lamport_single_ref_offsets.write().unwrap();
953 let mut count = 0;
954 for offset in offsets {
955 if zero_lamport_single_ref_offsets.insert(*offset) {
956 count += 1;
957 }
958 }
959 count
960 }
961
962 fn num_zero_lamport_single_ref_accounts(&self) -> usize {
964 self.zero_lamport_single_ref_offsets.read().unwrap().len()
965 }
966
967 fn alive_bytes_exclude_zero_lamport_single_ref_accounts(&self) -> usize {
969 let zero_lamport_dead_bytes = self
970 .accounts
971 .dead_bytes_due_to_zero_lamport_single_ref(self.num_zero_lamport_single_ref_accounts());
972 self.alive_bytes().saturating_sub(zero_lamport_dead_bytes)
973 }
974
975 pub fn written_bytes(&self) -> u64 {
977 self.accounts.len() as u64
978 }
979
980 pub fn capacity(&self) -> u64 {
982 self.accounts.capacity()
983 }
984
985 pub fn has_accounts(&self) -> bool {
986 self.count() > 0
987 }
988
989 pub fn slot(&self) -> Slot {
990 self.slot
991 }
992
993 pub fn id(&self) -> AccountsFileId {
994 self.id
995 }
996
997 pub fn flush(&self) -> Result<(), AccountsFileError> {
998 self.accounts.flush()
999 }
1000
1001 fn add_accounts(&self, num_accounts: usize, num_bytes: usize) {
1002 self.count.fetch_add(num_accounts, Ordering::Release);
1003 self.alive_bytes.fetch_add(num_bytes, Ordering::Release);
1004 }
1005
1006 fn remove_accounts(&self, num_bytes: usize, num_accounts: usize) -> usize {
1009 let prev_alive_bytes = self.alive_bytes.fetch_sub(num_bytes, Ordering::Release);
1010 let prev_count = self.count.fetch_sub(num_accounts, Ordering::Release);
1011
1012 assert!(
1014 num_bytes <= prev_alive_bytes && num_accounts <= prev_count,
1015 "Too many bytes or accounts removed from storage! slot: {}, id: {}, initial alive \
1016 bytes: {prev_alive_bytes}, initial num accounts: {prev_count}, num bytes removed: \
1017 {num_bytes}, num accounts removed: {num_accounts}",
1018 self.slot,
1019 self.id,
1020 );
1021
1022 prev_count - num_accounts
1024 }
1025
1026 pub fn path(&self) -> &Path {
1028 self.accounts.path()
1029 }
1030}
1031
1032pub fn get_temp_accounts_paths(count: u32) -> io::Result<(Vec<TempDir>, Vec<PathBuf>)> {
1033 let temp_dirs: io::Result<Vec<TempDir>> = (0..count).map(|_| TempDir::new()).collect();
1034 let temp_dirs = temp_dirs?;
1035
1036 let paths: io::Result<Vec<_>> = temp_dirs
1037 .iter()
1038 .map(|temp_dir| {
1039 utils::create_accounts_run_and_snapshot_dirs(temp_dir)
1040 .map(|(run_dir, _snapshot_dir)| run_dir)
1041 })
1042 .collect();
1043 let paths = paths?;
1044 Ok((temp_dirs, paths))
1045}
1046
1047#[derive(Default, Debug)]
1048struct CleaningInfo {
1049 slot_list: SlotList<AccountInfo>,
1050 ref_count: RefCount,
1051 might_contain_zero_lamport_entry: bool,
1055}
1056
1057#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
1061pub enum MarkObsoleteAccounts {
1062 #[default]
1063 Disabled,
1064 Enabled,
1065}
1066
1067type CleaningCandidates = (Box<[RwLock<HashMap<Pubkey, CleaningInfo>>]>, Option<Slot>);
1073
1074#[derive(Debug, Default)]
1078struct RemoveUnrootedSlotsSynchronization {
1079 slots_under_contention: Mutex<IntSet<Slot>>,
1081 signal: Condvar,
1082}
1083
1084type AccountInfoAccountsIndex = AccountsIndex<AccountInfo, AccountInfo>;
1085
1086#[derive(Debug)]
1088pub struct AccountsDb {
1089 pub accounts_index: AccountInfoAccountsIndex,
1091
1092 pub ancient_append_vec_offset: Option<i64>,
1095 pub ancient_storage_ideal_size: u64,
1096 pub max_ancient_storages: usize,
1097 pub skip_initial_hash_calc: bool,
1099
1100 pub storage: AccountStorage,
1101
1102 pub accounts_cache: AccountsCache,
1103
1104 write_cache_limit_bytes: Option<u64>,
1105
1106 read_only_accounts_cache: ReadOnlyAccountsCache,
1107
1108 pub next_id: AtomicAccountsFileId,
1110
1111 pub shrink_candidate_slots: Mutex<ShrinkCandidates>,
1113
1114 pub write_version: AtomicU64,
1115
1116 pub paths: Vec<PathBuf>,
1118
1119 base_working_path: PathBuf,
1121 #[allow(dead_code)]
1123 base_working_temp_dir: Option<TempDir>,
1124
1125 shrink_paths: Vec<PathBuf>,
1126
1127 #[allow(dead_code)]
1129 pub temp_paths: Option<Vec<TempDir>>,
1130
1131 file_size: u64,
1133
1134 pub thread_pool_foreground: ThreadPool,
1136 pub thread_pool_background: ThreadPool,
1138
1139 pub stats: AccountsStats,
1140
1141 clean_accounts_stats: CleanAccountsStats,
1142
1143 external_purge_slots_stats: PurgeStats,
1145
1146 pub shrink_stats: ShrinkStats,
1147
1148 pub(crate) shrink_ancient_stats: ShrinkAncientStats,
1149
1150 pub account_indexes: AccountSecondaryIndexes,
1151
1152 uncleaned_pubkeys: DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>>,
1156
1157 #[cfg(test)]
1158 load_delay: u64,
1159
1160 #[cfg(test)]
1161 load_limit: AtomicU64,
1162
1163 is_bank_drop_callback_enabled: AtomicBool,
1165
1166 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization,
1170
1171 shrink_ratio: AccountShrinkThreshold,
1172
1173 dirty_stores: DashMap<Slot, Arc<AccountStorageEntry>, BuildNoHashHasher<Slot>>,
1177
1178 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet<(Slot, Pubkey)>,
1181
1182 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1184
1185 pub(crate) active_stats: ActiveStats,
1186
1187 pub log_dead_slots: AtomicBool,
1190
1191 exhaustively_verify_refcounts: bool,
1193
1194 accounts_file_provider: AccountsFileProvider,
1196
1197 storage_access: StorageAccess,
1199
1200 scan_filter_for_shrinking: ScanFilter,
1202
1203 pub partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig,
1206
1207 latest_full_snapshot_slot: SeqLock<Option<Slot>>,
1210
1211 pub(crate) best_ancient_slots_to_shrink: RwLock<VecDeque<(Slot, u64)>>,
1217
1218 pub mark_obsolete_accounts: MarkObsoleteAccounts,
1222}
1223
1224pub fn quarter_thread_count() -> usize {
1225 std::cmp::max(2, num_cpus::get() / 4)
1226}
1227
1228pub fn default_num_foreground_threads() -> usize {
1229 get_thread_count()
1230}
1231
1232#[cfg(feature = "frozen-abi")]
1233impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
1234 fn example() -> Self {
1235 let accounts_db = AccountsDb::new_single_for_tests();
1236 let key = Pubkey::default();
1237 let some_data_len = 5;
1238 let some_slot: Slot = 0;
1239 let account = AccountSharedData::new(1, some_data_len, &key);
1240 accounts_db.store_for_tests((some_slot, [(&key, &account)].as_slice()));
1241 accounts_db.add_root_and_flush_write_cache(0);
1242 accounts_db
1243 }
1244}
1245
1246impl AccountsDb {
1247 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1254 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO: usize = 3_000_000_000;
1255 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1256 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI: usize = 3_100_000_000;
1257
1258 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1260 const DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE: usize = 8;
1261
1262 pub fn new_with_config(
1263 paths: Vec<PathBuf>,
1264 accounts_db_config: AccountsDbConfig,
1265 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1266 exit: Arc<AtomicBool>,
1267 ) -> Self {
1268 let accounts_index_config = accounts_db_config.index.unwrap_or_default();
1269 let accounts_index = AccountsIndex::new(&accounts_index_config, exit);
1270
1271 let base_working_path = accounts_db_config.base_working_path.clone();
1272 let (base_working_path, base_working_temp_dir) =
1273 if let Some(base_working_path) = base_working_path {
1274 (base_working_path, None)
1275 } else {
1276 let base_working_temp_dir = TempDir::new().unwrap();
1277 let base_working_path = base_working_temp_dir.path().to_path_buf();
1278 (base_working_path, Some(base_working_temp_dir))
1279 };
1280
1281 let (paths, temp_paths) = if paths.is_empty() {
1282 let (temp_dirs, temp_paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
1285 (temp_paths, Some(temp_dirs))
1286 } else {
1287 (paths, None)
1288 };
1289
1290 let shrink_paths = accounts_db_config
1291 .shrink_paths
1292 .clone()
1293 .unwrap_or_else(|| paths.clone());
1294
1295 let read_cache_size = accounts_db_config.read_cache_limit_bytes.unwrap_or((
1296 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO,
1297 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI,
1298 ));
1299 let read_cache_evict_sample_size = accounts_db_config
1300 .read_cache_evict_sample_size
1301 .unwrap_or(Self::DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE);
1302
1303 const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
1306 let num_foreground_threads = accounts_db_config
1307 .num_foreground_threads
1308 .map(Into::into)
1309 .unwrap_or_else(default_num_foreground_threads);
1310 let thread_pool_foreground = rayon::ThreadPoolBuilder::new()
1311 .num_threads(num_foreground_threads)
1312 .thread_name(|i| format!("solAcctsDbFg{i:02}"))
1313 .stack_size(ACCOUNTS_STACK_SIZE)
1314 .build()
1315 .expect("new rayon threadpool");
1316
1317 let num_background_threads = accounts_db_config
1318 .num_background_threads
1319 .map(Into::into)
1320 .unwrap_or_else(quarter_thread_count);
1321 let thread_pool_background = rayon::ThreadPoolBuilder::new()
1322 .thread_name(|i| format!("solAcctsDbBg{i:02}"))
1323 .num_threads(num_background_threads)
1324 .build()
1325 .expect("new rayon threadpool");
1326
1327 let new = Self {
1328 accounts_index,
1329 paths,
1330 base_working_path,
1331 base_working_temp_dir,
1332 temp_paths,
1333 shrink_paths,
1334 skip_initial_hash_calc: accounts_db_config.skip_initial_hash_calc,
1335 ancient_append_vec_offset: accounts_db_config
1336 .ancient_append_vec_offset
1337 .or(ANCIENT_APPEND_VEC_DEFAULT_OFFSET),
1338 ancient_storage_ideal_size: accounts_db_config
1339 .ancient_storage_ideal_size
1340 .unwrap_or(DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE),
1341 max_ancient_storages: accounts_db_config
1342 .max_ancient_storages
1343 .unwrap_or(DEFAULT_MAX_ANCIENT_STORAGES),
1344 account_indexes: accounts_db_config.account_indexes.unwrap_or_default(),
1345 shrink_ratio: accounts_db_config.shrink_ratio,
1346 accounts_update_notifier,
1347 read_only_accounts_cache: ReadOnlyAccountsCache::new(
1348 read_cache_size.0,
1349 read_cache_size.1,
1350 read_cache_evict_sample_size,
1351 ),
1352 write_cache_limit_bytes: accounts_db_config.write_cache_limit_bytes,
1353 partitioned_epoch_rewards_config: accounts_db_config.partitioned_epoch_rewards_config,
1354 exhaustively_verify_refcounts: accounts_db_config.exhaustively_verify_refcounts,
1355 storage_access: accounts_db_config.storage_access,
1356 scan_filter_for_shrinking: accounts_db_config.scan_filter_for_shrinking,
1357 thread_pool_foreground,
1358 thread_pool_background,
1359 active_stats: ActiveStats::default(),
1360 storage: AccountStorage::default(),
1361 accounts_cache: AccountsCache::default(),
1362 uncleaned_pubkeys: DashMap::default(),
1363 next_id: AtomicAccountsFileId::new(0),
1364 shrink_candidate_slots: Mutex::new(ShrinkCandidates::default()),
1365 write_version: AtomicU64::new(0),
1366 file_size: DEFAULT_FILE_SIZE,
1367 external_purge_slots_stats: PurgeStats::default(),
1368 clean_accounts_stats: CleanAccountsStats::default(),
1369 shrink_stats: ShrinkStats::default(),
1370 shrink_ancient_stats: ShrinkAncientStats::default(),
1371 stats: AccountsStats::default(),
1372 #[cfg(test)]
1373 load_delay: u64::default(),
1374 #[cfg(test)]
1375 load_limit: AtomicU64::default(),
1376 is_bank_drop_callback_enabled: AtomicBool::default(),
1377 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(),
1378 dirty_stores: DashMap::default(),
1379 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet::default(),
1380 log_dead_slots: AtomicBool::new(true),
1381 accounts_file_provider: AccountsFileProvider::default(),
1382 latest_full_snapshot_slot: SeqLock::new(None),
1383 best_ancient_slots_to_shrink: RwLock::default(),
1384 mark_obsolete_accounts: accounts_db_config.mark_obsolete_accounts,
1385 };
1386
1387 {
1388 for path in new.paths.iter() {
1389 std::fs::create_dir_all(path).expect("Create directory failed.");
1390 }
1391 }
1392 new
1393 }
1394
1395 pub fn file_size(&self) -> u64 {
1396 self.file_size
1397 }
1398
1399 pub fn get_base_working_path(&self) -> PathBuf {
1401 self.base_working_path.clone()
1402 }
1403
1404 pub fn has_accounts_update_notifier(&self) -> bool {
1406 self.accounts_update_notifier.is_some()
1407 }
1408
1409 fn next_id(&self) -> AccountsFileId {
1410 let next_id = self.next_id.fetch_add(1, Ordering::AcqRel);
1411 assert!(
1412 next_id != AccountsFileId::MAX,
1413 "We've run out of storage ids!"
1414 );
1415 next_id
1416 }
1417
1418 fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
1419 AccountStorageEntry::new(
1420 path,
1421 slot,
1422 self.next_id(),
1423 size,
1424 self.accounts_file_provider,
1425 self.storage_access,
1426 )
1427 }
1428
1429 fn collect_reclaims(
1433 &self,
1434 pubkey: &Pubkey,
1435 max_clean_root_inclusive: Option<Slot>,
1436 ancient_account_cleans: &AtomicU64,
1437 epoch_schedule: &EpochSchedule,
1438 pubkeys_removed_from_accounts_index: &Mutex<PubkeysRemovedFromAccountsIndex>,
1439 ) -> ReclaimsSlotList<AccountInfo> {
1440 let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
1441 let mut clean_rooted = Measure::start("clean_old_root-ms");
1442 let mut reclaims = ReclaimsSlotList::new();
1443 let removed_from_index = self.accounts_index.clean_rooted_entries(
1444 pubkey,
1445 &mut reclaims,
1446 max_clean_root_inclusive,
1447 );
1448 if removed_from_index {
1449 pubkeys_removed_from_accounts_index
1450 .lock()
1451 .unwrap()
1452 .insert(*pubkey);
1453 }
1454 if !reclaims.is_empty() {
1455 let old_reclaims = reclaims
1457 .iter()
1458 .filter_map(|(slot, _)| (slot < &one_epoch_old).then_some(1))
1459 .sum();
1460 ancient_account_cleans.fetch_add(old_reclaims, Ordering::Relaxed);
1461 }
1462 clean_rooted.stop();
1463 self.clean_accounts_stats
1464 .clean_old_root_us
1465 .fetch_add(clean_rooted.as_us(), Ordering::Relaxed);
1466 reclaims
1467 }
1468
1469 fn clean_accounts_older_than_root(
1473 &self,
1474 reclaims: &SlotList<AccountInfo>,
1475 pubkeys_removed_from_accounts_index: &HashSet<Pubkey>,
1476 ) -> ReclaimResult {
1477 if reclaims.is_empty() {
1478 return ReclaimResult::default();
1479 }
1480 let (reclaim_result, reclaim_us) = measure_us!(self.handle_reclaims(
1481 reclaims.iter(),
1482 None,
1483 pubkeys_removed_from_accounts_index,
1484 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
1485 MarkAccountsObsolete::No,
1486 ));
1487 self.clean_accounts_stats
1488 .clean_old_root_reclaim_us
1489 .fetch_add(reclaim_us, Ordering::Relaxed);
1490 reclaim_result
1491 }
1492
1493 fn calc_delete_dependencies(
1498 &self,
1499 candidates: &[HashMap<Pubkey, CleaningInfo>],
1500 store_counts: &mut HashMap<Slot, (usize, HashSet<Pubkey>)>,
1501 min_slot: Option<Slot>,
1502 ) {
1503 let mut already_counted = IntSet::default();
1507 for (bin_index, bin) in candidates.iter().enumerate() {
1508 for (pubkey, cleaning_info) in bin.iter() {
1509 let slot_list = &cleaning_info.slot_list;
1510 let ref_count = &cleaning_info.ref_count;
1511 let mut failed_slot = None;
1512 let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count;
1513 if all_stores_being_deleted {
1514 let mut delete = true;
1515 for (slot, _account_info) in slot_list {
1516 if let Some(count) = store_counts.get(slot).map(|s| s.0) {
1517 debug!("calc_delete_dependencies() slot: {slot}, count len: {count}");
1518 if count == 0 {
1519 continue;
1521 }
1522 }
1523 failed_slot = Some(*slot);
1526 delete = false;
1527 break;
1528 }
1529 if delete {
1530 continue;
1532 }
1533 } else {
1534 debug!(
1536 "calc_delete_dependencies(), pubkey: {pubkey}, slot list len: {}, ref \
1537 count: {ref_count}, slot list: {slot_list:?}",
1538 slot_list.len(),
1539 );
1540 }
1541
1542 let mut pending_stores = IntSet::default();
1544 for (slot, _account_info) in slot_list {
1545 if !already_counted.contains(slot) {
1546 pending_stores.insert(*slot);
1547 }
1548 }
1549 while !pending_stores.is_empty() {
1550 let slot = pending_stores.iter().next().cloned().unwrap();
1551 if Some(slot) == min_slot {
1552 if let Some(failed_slot) = failed_slot.take() {
1553 info!(
1554 "calc_delete_dependencies, oldest slot is not able to be deleted \
1555 because of {pubkey} in slot {failed_slot}"
1556 );
1557 } else {
1558 info!(
1559 "calc_delete_dependencies, oldest slot is not able to be deleted \
1560 because of {pubkey}, slot list len: {}, ref count: {ref_count}",
1561 slot_list.len()
1562 );
1563 }
1564 }
1565
1566 pending_stores.remove(&slot);
1567 if !already_counted.insert(slot) {
1568 continue;
1569 }
1570 if let Some(store_count) = store_counts.remove(&slot) {
1572 let affected_pubkeys = &store_count.1;
1574 for key in affected_pubkeys {
1575 let candidates_bin_index =
1576 self.accounts_index.bin_calculator.bin_from_pubkey(key);
1577 let mut update_pending_stores =
1578 |bin: &HashMap<Pubkey, CleaningInfo>| {
1579 for (slot, _account_info) in &bin.get(key).unwrap().slot_list {
1580 if !already_counted.contains(slot) {
1581 pending_stores.insert(*slot);
1582 }
1583 }
1584 };
1585 if candidates_bin_index == bin_index {
1586 update_pending_stores(bin);
1587 } else {
1588 update_pending_stores(&candidates[candidates_bin_index]);
1589 }
1590 }
1591 }
1592 }
1593 }
1594 }
1595 }
1596
1597 #[must_use]
1598 pub fn purge_keys_exact<C>(
1599 &self,
1600 pubkey_to_slot_set: impl IntoIterator<Item = (Pubkey, C)>,
1601 ) -> (
1602 ReclaimsSlotList<AccountInfo>,
1603 PubkeysRemovedFromAccountsIndex,
1604 )
1605 where
1606 C: for<'a> Contains<'a, Slot>,
1607 {
1608 let mut reclaims = ReclaimsSlotList::new();
1609 let mut dead_keys = Vec::new();
1610
1611 let mut purge_exact_count = 0;
1612 let (_, purge_exact_us) =
1613 measure_us!(for (pubkey, slots_set) in pubkey_to_slot_set.into_iter() {
1614 purge_exact_count += 1;
1615 let is_empty = self
1616 .accounts_index
1617 .purge_exact(&pubkey, slots_set, &mut reclaims);
1618 if is_empty {
1619 dead_keys.push(pubkey);
1620 }
1621 });
1622
1623 let (pubkeys_removed_from_accounts_index, handle_dead_keys_us) = measure_us!(self
1624 .accounts_index
1625 .handle_dead_keys(&dead_keys, &self.account_indexes));
1626
1627 self.stats
1628 .purge_exact_count
1629 .fetch_add(purge_exact_count, Ordering::Relaxed);
1630 self.stats
1631 .handle_dead_keys_us
1632 .fetch_add(handle_dead_keys_us, Ordering::Relaxed);
1633 self.stats
1634 .purge_exact_us
1635 .fetch_add(purge_exact_us, Ordering::Relaxed);
1636 (reclaims, pubkeys_removed_from_accounts_index)
1637 }
1638
1639 fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
1640 match (
1641 self.accounts_index.min_ongoing_scan_root(),
1642 proposed_clean_root,
1643 ) {
1644 (None, None) => None,
1645 (Some(min_scan_root), None) => Some(min_scan_root),
1646 (None, Some(proposed_clean_root)) => Some(proposed_clean_root),
1647 (Some(min_scan_root), Some(proposed_clean_root)) => {
1648 Some(std::cmp::min(min_scan_root, proposed_clean_root))
1649 }
1650 }
1651 }
1652
1653 fn get_oldest_non_ancient_slot(&self, epoch_schedule: &EpochSchedule) -> Slot {
1656 self.get_oldest_non_ancient_slot_from_slot(
1657 epoch_schedule,
1658 self.accounts_index.max_root_inclusive(),
1659 )
1660 }
1661
1662 fn get_oldest_non_ancient_slot_from_slot(
1665 &self,
1666 epoch_schedule: &EpochSchedule,
1667 max_root_inclusive: Slot,
1668 ) -> Slot {
1669 let mut result = max_root_inclusive;
1670 if let Some(offset) = self.ancient_append_vec_offset {
1671 result = Self::apply_offset_to_slot(result, offset);
1672 }
1673 result = Self::apply_offset_to_slot(
1674 result,
1675 -((epoch_schedule.slots_per_epoch as i64).saturating_sub(1)),
1676 );
1677 result.min(max_root_inclusive)
1678 }
1679
1680 fn collect_uncleaned_slots_up_to_slot(&self, max_slot_inclusive: Slot) -> Vec<Slot> {
1684 self.uncleaned_pubkeys
1685 .iter()
1686 .filter_map(|entry| {
1687 let slot = *entry.key();
1688 (slot <= max_slot_inclusive).then_some(slot)
1689 })
1690 .collect()
1691 }
1692
1693 fn remove_uncleaned_slots_up_to_slot_and_move_pubkeys(
1697 &self,
1698 max_slot_inclusive: Slot,
1699 candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
1700 ) {
1701 let uncleaned_slots = self.collect_uncleaned_slots_up_to_slot(max_slot_inclusive);
1702 for uncleaned_slot in uncleaned_slots.into_iter() {
1703 if let Some((_removed_slot, mut removed_pubkeys)) =
1704 self.uncleaned_pubkeys.remove(&uncleaned_slot)
1705 {
1706 removed_pubkeys.sort_by(|a, b| {
1709 self.accounts_index
1710 .bin_calculator
1711 .bin_from_pubkey(a)
1712 .cmp(&self.accounts_index.bin_calculator.bin_from_pubkey(b))
1713 });
1714 if let Some(first_removed_pubkey) = removed_pubkeys.first() {
1715 let mut prev_bin = self
1716 .accounts_index
1717 .bin_calculator
1718 .bin_from_pubkey(first_removed_pubkey);
1719 let mut candidates_bin = candidates[prev_bin].write().unwrap();
1720 for removed_pubkey in removed_pubkeys {
1721 let curr_bin = self
1722 .accounts_index
1723 .bin_calculator
1724 .bin_from_pubkey(&removed_pubkey);
1725 if curr_bin != prev_bin {
1726 candidates_bin = candidates[curr_bin].write().unwrap();
1727 prev_bin = curr_bin;
1728 }
1729 candidates_bin.insert(
1735 removed_pubkey,
1736 CleaningInfo {
1737 might_contain_zero_lamport_entry: true,
1738 ..Default::default()
1739 },
1740 );
1741 }
1742 }
1743 }
1744 }
1745 }
1746
1747 fn count_pubkeys(candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>]) -> u64 {
1748 candidates
1749 .iter()
1750 .map(|x| x.read().unwrap().len())
1751 .sum::<usize>() as u64
1752 }
1753
1754 fn construct_candidate_clean_keys(
1760 &self,
1761 max_clean_root_inclusive: Option<Slot>,
1762 is_startup: bool,
1763 timings: &mut CleanKeyTimings,
1764 epoch_schedule: &EpochSchedule,
1765 ) -> CleaningCandidates {
1766 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
1767 let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
1768 let max_root_inclusive = self.accounts_index.max_root_inclusive();
1769 let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);
1770 let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
1771 let mut min_dirty_slot = None::<u64>;
1774 self.dirty_stores.retain(|slot, store| {
1775 if *slot > max_slot_inclusive {
1776 true
1777 } else {
1778 min_dirty_slot = min_dirty_slot.map(|min| min.min(*slot)).or(Some(*slot));
1779 dirty_stores.push((*slot, store.clone()));
1780 false
1781 }
1782 });
1783 let dirty_stores_len = dirty_stores.len();
1784 let num_bins = self.accounts_index.bins();
1785 let candidates: Box<_> =
1786 std::iter::repeat_with(|| RwLock::new(HashMap::<Pubkey, CleaningInfo>::new()))
1787 .take(num_bins)
1788 .collect();
1789
1790 let insert_candidate = |pubkey, is_zero_lamport| {
1791 let index = self.accounts_index.bin_calculator.bin_from_pubkey(&pubkey);
1792 let mut candidates_bin = candidates[index].write().unwrap();
1793 candidates_bin
1794 .entry(pubkey)
1795 .or_default()
1796 .might_contain_zero_lamport_entry |= is_zero_lamport;
1797 };
1798
1799 let dirty_ancient_stores = AtomicUsize::default();
1800 let mut dirty_store_routine = || {
1801 let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads()));
1802 let oldest_dirty_slots: Vec<u64> = dirty_stores
1803 .par_chunks(chunk_size)
1804 .map(|dirty_store_chunk| {
1805 let mut oldest_dirty_slot = max_slot_inclusive.saturating_add(1);
1806 dirty_store_chunk.iter().for_each(|(slot, store)| {
1807 if *slot < oldest_non_ancient_slot {
1808 dirty_ancient_stores.fetch_add(1, Ordering::Relaxed);
1809 }
1810 oldest_dirty_slot = oldest_dirty_slot.min(*slot);
1811
1812 store
1813 .accounts
1814 .scan_accounts_without_data(|_offset, account| {
1815 let pubkey = *account.pubkey();
1816 let is_zero_lamport = account.is_zero_lamport();
1817 insert_candidate(pubkey, is_zero_lamport);
1818 })
1819 .expect("must scan accounts storage");
1820 });
1821 oldest_dirty_slot
1822 })
1823 .collect();
1824 timings.oldest_dirty_slot = *oldest_dirty_slots
1825 .iter()
1826 .min()
1827 .unwrap_or(&max_slot_inclusive.saturating_add(1));
1828 };
1829
1830 if is_startup {
1831 dirty_store_routine();
1833 } else {
1834 self.thread_pool_background.install(|| {
1835 dirty_store_routine();
1836 });
1837 }
1838 timings.dirty_pubkeys_count = Self::count_pubkeys(&candidates);
1839 trace!(
1840 "dirty_stores.len: {} pubkeys.len: {}",
1841 dirty_stores_len,
1842 timings.dirty_pubkeys_count,
1843 );
1844 dirty_store_processing_time.stop();
1845 timings.dirty_store_processing_us += dirty_store_processing_time.as_us();
1846 timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed);
1847
1848 let mut collect_delta_keys = Measure::start("key_create");
1849 self.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(max_slot_inclusive, &candidates);
1850 collect_delta_keys.stop();
1851 timings.collect_delta_keys_us += collect_delta_keys.as_us();
1852
1853 timings.delta_key_count = Self::count_pubkeys(&candidates);
1854
1855 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
1859 assert!(
1860 latest_full_snapshot_slot.is_some()
1861 || self
1862 .zero_lamport_accounts_to_purge_after_full_snapshot
1863 .is_empty(),
1864 "if snapshots are disabled, then zero_lamport_accounts_to_purge_later should always \
1865 be empty"
1866 );
1867 if let Some(latest_full_snapshot_slot) = latest_full_snapshot_slot {
1868 self.zero_lamport_accounts_to_purge_after_full_snapshot
1869 .retain(|(slot, pubkey)| {
1870 let is_candidate_for_clean =
1871 max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot;
1872 if is_candidate_for_clean {
1873 insert_candidate(*pubkey, true);
1874 }
1875 !is_candidate_for_clean
1876 });
1877 }
1878
1879 (candidates, min_dirty_slot)
1880 }
1881
1882 fn exhaustively_verify_refcounts(&self, max_slot_inclusive: Option<Slot>) {
1887 let max_slot_inclusive =
1888 max_slot_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
1889 info!("exhaustively verifying refcounts as of slot: {max_slot_inclusive}");
1890 let pubkey_refcount = DashMap::<Pubkey, Vec<Slot>>::default();
1891 let mut storages = self.storage.all_storages();
1892 storages.retain(|s| s.slot() <= max_slot_inclusive);
1893 storages.par_iter().for_each_init(
1895 || Box::new(append_vec::new_scan_accounts_reader()),
1896 |reader, storage| {
1897 let slot = storage.slot();
1898 storage
1899 .accounts
1900 .scan_accounts(reader.as_mut(), |_offset, account| {
1901 let pk = account.pubkey();
1902 match pubkey_refcount.entry(*pk) {
1903 dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
1904 if !occupied_entry.get().iter().any(|s| s == &slot) {
1905 occupied_entry.get_mut().push(slot);
1906 }
1907 }
1908 dashmap::mapref::entry::Entry::Vacant(vacant_entry) => {
1909 vacant_entry.insert(vec![slot]);
1910 }
1911 }
1912 })
1913 .expect("must scan accounts storage")
1914 },
1915 );
1916 let total = pubkey_refcount.len();
1917 let failed = AtomicBool::default();
1918 let threads = quarter_thread_count();
1919 let per_batch = total / threads;
1920 (0..=threads).into_par_iter().for_each(|attempt| {
1921 pubkey_refcount
1922 .iter()
1923 .skip(attempt * per_batch)
1924 .take(per_batch)
1925 .for_each(|entry| {
1926 if failed.load(Ordering::Relaxed) {
1927 return;
1928 }
1929
1930 self.accounts_index
1931 .get_and_then(entry.key(), |index_entry| {
1932 if let Some(index_entry) = index_entry {
1933 match (index_entry.ref_count() as usize).cmp(&entry.value().len()) {
1934 std::cmp::Ordering::Equal => {
1935 }
1937 std::cmp::Ordering::Greater => {
1938 let slot_list = index_entry.slot_list_read_lock();
1939 let num_too_new = slot_list
1940 .iter()
1941 .filter(|(slot, _)| slot > &max_slot_inclusive)
1942 .count();
1943
1944 if ((index_entry.ref_count() as usize) - num_too_new)
1945 > entry.value().len()
1946 {
1947 failed.store(true, Ordering::Relaxed);
1948 error!(
1949 "exhaustively_verify_refcounts: {} refcount too \
1950 large: {}, should be: {}, {:?}, {:?}, too_new: \
1951 {num_too_new}",
1952 entry.key(),
1953 index_entry.ref_count(),
1954 entry.value().len(),
1955 *entry.value(),
1956 slot_list
1957 );
1958 }
1959 }
1960 std::cmp::Ordering::Less => {
1961 error!(
1962 "exhaustively_verify_refcounts: {} refcount too \
1963 small: {}, should be: {}, {:?}, {:?}",
1964 entry.key(),
1965 index_entry.ref_count(),
1966 entry.value().len(),
1967 *entry.value(),
1968 index_entry.slot_list_read_lock()
1969 );
1970 }
1971 }
1972 };
1973 (false, ())
1974 });
1975 });
1976 });
1977 if failed.load(Ordering::Relaxed) {
1978 panic!("exhaustively_verify_refcounts failed");
1979 }
1980 }
1981
1982 pub fn clean_accounts(
1987 &self,
1988 max_clean_root_inclusive: Option<Slot>,
1989 is_startup: bool,
1990 epoch_schedule: &EpochSchedule,
1991 ) {
1992 if self.exhaustively_verify_refcounts {
1993 if is_startup {
1995 self.exhaustively_verify_refcounts(max_clean_root_inclusive);
1996 } else {
1997 self.thread_pool_background
1999 .install(|| self.exhaustively_verify_refcounts(max_clean_root_inclusive));
2000 }
2001 }
2002
2003 let _guard = self.active_stats.activate(ActiveStatItem::Clean);
2004
2005 let ancient_account_cleans = AtomicU64::default();
2006 let purges_old_accounts_count = AtomicU64::default();
2007
2008 let mut measure_all = Measure::start("clean_accounts");
2009 let max_clean_root_inclusive = self.max_clean_root(max_clean_root_inclusive);
2010
2011 self.report_store_stats();
2012
2013 let active_guard = self
2014 .active_stats
2015 .activate(ActiveStatItem::CleanConstructCandidates);
2016 let mut measure_construct_candidates = Measure::start("construct_candidates");
2017 let mut key_timings = CleanKeyTimings::default();
2018 let (mut candidates, min_dirty_slot) = self.construct_candidate_clean_keys(
2019 max_clean_root_inclusive,
2020 is_startup,
2021 &mut key_timings,
2022 epoch_schedule,
2023 );
2024 measure_construct_candidates.stop();
2025 drop(active_guard);
2026
2027 let num_candidates = Self::count_pubkeys(&candidates);
2028 let found_not_zero_accum = AtomicU64::new(0);
2029 let not_found_on_fork_accum = AtomicU64::new(0);
2030 let missing_accum = AtomicU64::new(0);
2031 let useful_accum = AtomicU64::new(0);
2032 let reclaims: SlotList<AccountInfo> = SlotList::with_capacity(num_candidates as usize);
2033 let reclaims = Mutex::new(reclaims);
2034 let pubkeys_removed_from_accounts_index: PubkeysRemovedFromAccountsIndex = HashSet::new();
2035 let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);
2036 let do_clean_scan = || {
2038 candidates.par_iter().for_each(|candidates_bin| {
2039 let mut found_not_zero = 0;
2040 let mut not_found_on_fork = 0;
2041 let mut missing = 0;
2042 let mut useful = 0;
2043 let mut purges_old_accounts_local = 0;
2044 let mut candidates_bin = candidates_bin.write().unwrap();
2045 candidates_bin.retain(|candidate_pubkey, candidate_info| {
2050 let mut should_collect_reclaims = false;
2051 self.accounts_index.scan(
2052 iter::once(candidate_pubkey),
2053 |_candidate_pubkey, slot_list_and_ref_count| {
2054 let mut useless = true;
2055 if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
2056 let index_in_slot_list = self.accounts_index.latest_slot(
2058 None,
2059 slot_list,
2060 max_clean_root_inclusive,
2061 );
2062
2063 match index_in_slot_list {
2064 Some(index_in_slot_list) => {
2065 let (slot, account_info) = &slot_list[index_in_slot_list];
2067 if account_info.is_zero_lamport() {
2068 useless = false;
2069 candidate_info.slot_list =
2073 self.accounts_index.get_rooted_entries(
2074 slot_list,
2075 max_clean_root_inclusive,
2076 );
2077 candidate_info.ref_count = ref_count;
2078 } else {
2079 found_not_zero += 1;
2080 }
2081
2082 if slot_list.len() > 1
2085 && *slot
2086 <= max_clean_root_inclusive.unwrap_or(Slot::MAX)
2087 {
2088 should_collect_reclaims = true;
2089 purges_old_accounts_local += 1;
2090 useless = false;
2091 }
2092 }
2093 None => {
2094 not_found_on_fork += 1;
2101 should_collect_reclaims = true;
2102 purges_old_accounts_local += 1;
2103 useless = false;
2104 }
2105 }
2106 } else {
2107 missing += 1;
2108 }
2109 if !useless {
2110 useful += 1;
2111 }
2112 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2113 },
2114 None,
2115 if candidate_info.might_contain_zero_lamport_entry {
2116 ScanFilter::All
2117 } else {
2118 self.scan_filter_for_shrinking
2119 },
2120 );
2121 if should_collect_reclaims {
2122 let reclaims_new = self.collect_reclaims(
2123 candidate_pubkey,
2124 max_clean_root_inclusive,
2125 &ancient_account_cleans,
2126 epoch_schedule,
2127 &pubkeys_removed_from_accounts_index,
2128 );
2129 if !reclaims_new.is_empty() {
2130 reclaims.lock().unwrap().extend(reclaims_new);
2131 }
2132 }
2133 !candidate_info.slot_list.is_empty()
2134 });
2135 found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
2136 not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
2137 missing_accum.fetch_add(missing, Ordering::Relaxed);
2138 useful_accum.fetch_add(useful, Ordering::Relaxed);
2139 purges_old_accounts_count.fetch_add(purges_old_accounts_local, Ordering::Relaxed);
2140 });
2141 };
2142 let active_guard = self
2143 .active_stats
2144 .activate(ActiveStatItem::CleanScanCandidates);
2145 let mut accounts_scan = Measure::start("accounts_scan");
2146 if is_startup {
2147 do_clean_scan();
2148 } else {
2149 self.thread_pool_background.install(do_clean_scan);
2150 }
2151 accounts_scan.stop();
2152 drop(active_guard);
2153
2154 let mut candidates: Box<_> = candidates
2156 .iter_mut()
2157 .map(|candidates_bin| mem::take(candidates_bin.get_mut().unwrap()))
2158 .collect();
2159
2160 let retained_keys_count: usize = candidates.iter().map(HashMap::len).sum();
2161 let reclaims = reclaims.into_inner().unwrap();
2162 let mut pubkeys_removed_from_accounts_index =
2163 pubkeys_removed_from_accounts_index.into_inner().unwrap();
2164
2165 let active_guard = self.active_stats.activate(ActiveStatItem::CleanOldAccounts);
2166 let mut clean_old_rooted = Measure::start("clean_old_roots");
2167 let (purged_account_slots, removed_accounts) =
2168 self.clean_accounts_older_than_root(&reclaims, &pubkeys_removed_from_accounts_index);
2169 clean_old_rooted.stop();
2170 drop(active_guard);
2171
2172 let active_guard = self
2175 .active_stats
2176 .activate(ActiveStatItem::CleanCollectStoreCounts);
2177 let mut store_counts_time = Measure::start("store_counts");
2178 let mut store_counts: HashMap<Slot, (usize, HashSet<Pubkey>)> = HashMap::new();
2179 for candidates_bin in candidates.iter_mut() {
2180 for (pubkey, cleaning_info) in candidates_bin.iter_mut() {
2181 let slot_list = &mut cleaning_info.slot_list;
2182 let ref_count = &mut cleaning_info.ref_count;
2183 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2184 if purged_account_slots.contains_key(pubkey) {
2185 *ref_count = self.accounts_index.ref_count_from_storage(pubkey);
2186 }
2187 slot_list.retain(|(slot, account_info)| {
2188 let was_slot_purged = purged_account_slots
2189 .get(pubkey)
2190 .map(|slots_removed| slots_removed.contains(slot))
2191 .unwrap_or(false);
2192 if was_slot_purged {
2193 return false;
2196 }
2197 let was_reclaimed = removed_accounts
2200 .get(slot)
2201 .map(|store_removed| store_removed.contains(&account_info.offset()))
2202 .unwrap_or(false);
2203 if was_reclaimed {
2204 return false;
2205 }
2206 if let Some(store_count) = store_counts.get_mut(slot) {
2207 store_count.0 -= 1;
2208 store_count.1.insert(*pubkey);
2209 } else {
2210 let mut key_set = HashSet::new();
2211 key_set.insert(*pubkey);
2212 assert!(
2213 !account_info.is_cached(),
2214 "The Accounts Cache must be flushed first for this account info. \
2215 pubkey: {}, slot: {}",
2216 *pubkey,
2217 *slot
2218 );
2219 let count = self
2220 .storage
2221 .get_account_storage_entry(*slot, account_info.store_id())
2222 .map(|store| store.count())
2223 .unwrap()
2224 - 1;
2225 debug!(
2226 "store_counts, inserting slot: {}, store id: {}, count: {}",
2227 slot,
2228 account_info.store_id(),
2229 count
2230 );
2231 store_counts.insert(*slot, (count, key_set));
2232 }
2233 true
2234 });
2235 }
2236 }
2237 store_counts_time.stop();
2238 drop(active_guard);
2239
2240 let active_guard = self
2241 .active_stats
2242 .activate(ActiveStatItem::CleanCalcDeleteDeps);
2243 let mut calc_deps_time = Measure::start("calc_deps");
2244 self.calc_delete_dependencies(&candidates, &mut store_counts, min_dirty_slot);
2245 calc_deps_time.stop();
2246 drop(active_guard);
2247
2248 let active_guard = self
2249 .active_stats
2250 .activate(ActiveStatItem::CleanFilterZeroLamport);
2251 let mut purge_filter = Measure::start("purge_filter");
2252 self.filter_zero_lamport_clean_for_incremental_snapshots(
2253 max_clean_root_inclusive,
2254 &store_counts,
2255 &mut candidates,
2256 );
2257 purge_filter.stop();
2258 drop(active_guard);
2259
2260 let active_guard = self.active_stats.activate(ActiveStatItem::CleanReclaims);
2261 let mut reclaims_time = Measure::start("reclaims");
2262 let mut pubkey_to_slot_set = Vec::new();
2264 for candidates_bin in candidates {
2265 let mut bin_set = candidates_bin
2266 .into_iter()
2267 .filter_map(|(pubkey, cleaning_info)| {
2268 let slot_list = cleaning_info.slot_list;
2269 (!slot_list.is_empty()).then_some((
2270 pubkey,
2271 slot_list
2272 .iter()
2273 .map(|(slot, _)| *slot)
2274 .collect::<HashSet<Slot>>(),
2275 ))
2276 })
2277 .collect::<Vec<_>>();
2278 pubkey_to_slot_set.append(&mut bin_set);
2279 }
2280
2281 let (reclaims, pubkeys_removed_from_accounts_index2) =
2282 self.purge_keys_exact(pubkey_to_slot_set);
2283 pubkeys_removed_from_accounts_index.extend(pubkeys_removed_from_accounts_index2);
2284
2285 if !reclaims.is_empty() {
2286 self.handle_reclaims(
2287 reclaims.iter(),
2288 None,
2289 &pubkeys_removed_from_accounts_index,
2290 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
2291 MarkAccountsObsolete::No,
2292 );
2293 }
2294
2295 reclaims_time.stop();
2296 drop(active_guard);
2297
2298 measure_all.stop();
2299
2300 self.clean_accounts_stats.report();
2301 datapoint_info!(
2302 "clean_accounts",
2303 ("max_clean_root", max_clean_root_inclusive, Option<i64>),
2304 ("total_us", measure_all.as_us(), i64),
2305 (
2306 "collect_delta_keys_us",
2307 key_timings.collect_delta_keys_us,
2308 i64
2309 ),
2310 ("oldest_dirty_slot", key_timings.oldest_dirty_slot, i64),
2311 (
2312 "pubkeys_removed_from_accounts_index",
2313 pubkeys_removed_from_accounts_index.len(),
2314 i64
2315 ),
2316 (
2317 "dirty_ancient_stores",
2318 key_timings.dirty_ancient_stores,
2319 i64
2320 ),
2321 (
2322 "dirty_store_processing_us",
2323 key_timings.dirty_store_processing_us,
2324 i64
2325 ),
2326 ("construct_candidates_us", measure_construct_candidates.as_us(), i64),
2327 ("accounts_scan", accounts_scan.as_us(), i64),
2328 ("clean_old_rooted", clean_old_rooted.as_us(), i64),
2329 ("store_counts", store_counts_time.as_us(), i64),
2330 ("purge_filter", purge_filter.as_us(), i64),
2331 ("calc_deps", calc_deps_time.as_us(), i64),
2332 ("reclaims", reclaims_time.as_us(), i64),
2333 ("delta_insert_us", key_timings.delta_insert_us, i64),
2334 ("delta_key_count", key_timings.delta_key_count, i64),
2335 ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64),
2336 ("useful_keys", useful_accum.load(Ordering::Relaxed), i64),
2337 ("total_keys_count", num_candidates, i64),
2338 ("retained_keys_count", retained_keys_count, i64),
2339 (
2340 "scan_found_not_zero",
2341 found_not_zero_accum.load(Ordering::Relaxed),
2342 i64
2343 ),
2344 (
2345 "scan_not_found_on_fork",
2346 not_found_on_fork_accum.load(Ordering::Relaxed),
2347 i64
2348 ),
2349 ("scan_missing", missing_accum.load(Ordering::Relaxed), i64),
2350 (
2351 "get_account_sizes_us",
2352 self.clean_accounts_stats
2353 .get_account_sizes_us
2354 .swap(0, Ordering::Relaxed),
2355 i64
2356 ),
2357 (
2358 "slots_cleaned",
2359 self.clean_accounts_stats
2360 .slots_cleaned
2361 .swap(0, Ordering::Relaxed),
2362 i64
2363 ),
2364 (
2365 "clean_old_root_us",
2366 self.clean_accounts_stats
2367 .clean_old_root_us
2368 .swap(0, Ordering::Relaxed),
2369 i64
2370 ),
2371 (
2372 "clean_old_root_reclaim_us",
2373 self.clean_accounts_stats
2374 .clean_old_root_reclaim_us
2375 .swap(0, Ordering::Relaxed),
2376 i64
2377 ),
2378 (
2379 "remove_dead_accounts_remove_us",
2380 self.clean_accounts_stats
2381 .remove_dead_accounts_remove_us
2382 .swap(0, Ordering::Relaxed),
2383 i64
2384 ),
2385 (
2386 "remove_dead_accounts_shrink_us",
2387 self.clean_accounts_stats
2388 .remove_dead_accounts_shrink_us
2389 .swap(0, Ordering::Relaxed),
2390 i64
2391 ),
2392 (
2393 "clean_stored_dead_slots_us",
2394 self.clean_accounts_stats
2395 .clean_stored_dead_slots_us
2396 .swap(0, Ordering::Relaxed),
2397 i64
2398 ),
2399 (
2400 "roots_added",
2401 self.accounts_index.roots_added.swap(0, Ordering::Relaxed),
2402 i64
2403 ),
2404 (
2405 "purge_older_root_entries_one_slot_list",
2406 self.accounts_index
2407 .purge_older_root_entries_one_slot_list
2408 .swap(0, Ordering::Relaxed),
2409 i64
2410 ),
2411 (
2412 "roots_removed",
2413 self.accounts_index.roots_removed.swap(0, Ordering::Relaxed),
2414 i64
2415 ),
2416 (
2417 "active_scans",
2418 self.accounts_index.active_scans.load(Ordering::Relaxed),
2419 i64
2420 ),
2421 (
2422 "max_distance_to_min_scan_slot",
2423 self.accounts_index
2424 .max_distance_to_min_scan_slot
2425 .swap(0, Ordering::Relaxed),
2426 i64
2427 ),
2428 (
2429 "ancient_account_cleans",
2430 ancient_account_cleans.load(Ordering::Relaxed),
2431 i64
2432 ),
2433 (
2434 "purges_old_accounts_count",
2435 purges_old_accounts_count.load(Ordering::Relaxed),
2436 i64
2437 ),
2438 ("next_store_id", self.next_id.load(Ordering::Relaxed), i64),
2439 );
2440 }
2441
2442 fn handle_reclaims<'a, I>(
2473 &'a self,
2474 reclaims: I,
2475 expected_single_dead_slot: Option<Slot>,
2476 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2477 handle_reclaims: HandleReclaims<'a>,
2478 mark_accounts_obsolete: MarkAccountsObsolete,
2479 ) -> ReclaimResult
2480 where
2481 I: Iterator<Item = &'a (Slot, AccountInfo)>,
2482 {
2483 let mut reclaim_result = ReclaimResult::default();
2484 let (dead_slots, reclaimed_offsets) =
2485 self.remove_dead_accounts(reclaims, expected_single_dead_slot, mark_accounts_obsolete);
2486 reclaim_result.1 = reclaimed_offsets;
2487 let HandleReclaims::ProcessDeadSlots(purge_stats) = handle_reclaims;
2488 if let Some(expected_single_dead_slot) = expected_single_dead_slot {
2489 assert!(dead_slots.len() <= 1);
2490 if dead_slots.len() == 1 {
2491 assert!(dead_slots.contains(&expected_single_dead_slot));
2492 }
2493 }
2494 let clean_stored_dead_slots =
2496 !matches!(mark_accounts_obsolete, MarkAccountsObsolete::Yes(_));
2497
2498 self.process_dead_slots(
2499 &dead_slots,
2500 Some(&mut reclaim_result.0),
2501 purge_stats,
2502 pubkeys_removed_from_accounts_index,
2503 clean_stored_dead_slots,
2504 );
2505 reclaim_result
2506 }
2507
2508 fn filter_zero_lamport_clean_for_incremental_snapshots(
2532 &self,
2533 max_clean_root_inclusive: Option<Slot>,
2534 store_counts: &HashMap<Slot, (usize, HashSet<Pubkey>)>,
2535 candidates: &mut [HashMap<Pubkey, CleaningInfo>],
2536 ) {
2537 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2538 let should_filter_for_incremental_snapshots = max_clean_root_inclusive.unwrap_or(Slot::MAX)
2539 > latest_full_snapshot_slot.unwrap_or(Slot::MAX);
2540 assert!(
2541 latest_full_snapshot_slot.is_some() || !should_filter_for_incremental_snapshots,
2542 "if filtering for incremental snapshots, then snapshots should be enabled",
2543 );
2544
2545 for bin in candidates {
2546 bin.retain(|pubkey, cleaning_info| {
2547 let slot_list = &cleaning_info.slot_list;
2548 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2549 for (slot, _account_info) in slot_list.iter() {
2552 if let Some(store_count) = store_counts.get(slot) {
2553 if store_count.0 != 0 {
2554 return false;
2556 }
2557 } else {
2558 return false;
2560 }
2561 }
2562
2563 if !should_filter_for_incremental_snapshots {
2565 return true;
2566 }
2567
2568 let (slot, account_info) = slot_list
2571 .iter()
2572 .max_by_key(|(slot, _account_info)| slot)
2573 .unwrap();
2574
2575 assert!(account_info.is_zero_lamport());
2581 let cannot_purge = *slot > latest_full_snapshot_slot.unwrap();
2582 if cannot_purge {
2583 self.zero_lamport_accounts_to_purge_after_full_snapshot
2584 .insert((*slot, *pubkey));
2585 }
2586 !cannot_purge
2587 });
2588 }
2589 }
2590
2591 fn process_dead_slots(
2600 &self,
2601 dead_slots: &IntSet<Slot>,
2602 purged_account_slots: Option<&mut AccountSlots>,
2603 purge_stats: &PurgeStats,
2604 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2605 clean_stored_dead_slots: bool,
2606 ) {
2607 if dead_slots.is_empty() {
2608 return;
2609 }
2610 let mut clean_dead_slots = Measure::start("reclaims::clean_dead_slots");
2611
2612 if clean_stored_dead_slots {
2613 self.clean_stored_dead_slots(
2614 dead_slots,
2615 purged_account_slots,
2616 pubkeys_removed_from_accounts_index,
2617 );
2618 }
2619
2620 self.remove_dead_slots_metadata(dead_slots.iter());
2622
2623 clean_dead_slots.stop();
2624
2625 let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
2626 self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats);
2627 purge_removed_slots.stop();
2628
2629 {
2632 let mut list = self.shrink_candidate_slots.lock().unwrap();
2633 for slot in dead_slots {
2634 list.remove(slot);
2635 }
2636 }
2637
2638 debug!(
2639 "process_dead_slots({}): {} {} {:?}",
2640 dead_slots.len(),
2641 clean_dead_slots,
2642 purge_removed_slots,
2643 dead_slots,
2644 );
2645 }
2646
2647 fn load_accounts_index_for_shrink<'a, T: ShrinkCollectRefs<'a>>(
2652 &self,
2653 accounts: &'a [AccountFromStorage],
2654 stats: &ShrinkStats,
2655 slot_to_shrink: Slot,
2656 ) -> LoadAccountsIndexForShrink<'a, T> {
2657 let count = accounts.len();
2658 let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
2659 let mut pubkeys_to_unref = Vec::with_capacity(count);
2660 let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);
2661
2662 let mut alive = 0;
2663 let mut dead = 0;
2664 let mut index = 0;
2665 let mut index_scan_returned_some_count = 0;
2666 let mut index_scan_returned_none_count = 0;
2667 let mut all_are_zero_lamports = true;
2668 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2669 self.accounts_index.scan(
2670 accounts.iter().map(|account| account.pubkey()),
2671 |pubkey, slots_refs| {
2672 let stored_account = &accounts[index];
2673 let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
2674 if stored_account.is_zero_lamport()
2675 && ref_count == 1
2676 && latest_full_snapshot_slot
2677 .map(|latest_full_snapshot_slot| {
2678 latest_full_snapshot_slot >= slot_to_shrink
2679 })
2680 .unwrap_or(true)
2681 {
2682 zero_lamport_single_ref_pubkeys.push(pubkey);
2685 self.add_uncleaned_pubkeys_after_shrink(
2686 slot_to_shrink,
2687 [*pubkey].into_iter(),
2688 );
2689 } else {
2690 all_are_zero_lamports &= stored_account.is_zero_lamport();
2691 alive_accounts.add(ref_count, stored_account, slot_list);
2692 alive += 1;
2693 }
2694 };
2695 if let Some((slot_list, ref_count)) = slots_refs {
2696 index_scan_returned_some_count += 1;
2697 let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
2698 *slot == slot_to_shrink
2700 });
2701
2702 if !is_alive {
2703 pubkeys_to_unref.push(pubkey);
2708 dead += 1;
2709 } else {
2710 do_populate_accounts_for_shrink(ref_count, slot_list);
2711 }
2712 } else {
2713 index_scan_returned_none_count += 1;
2714 let ref_count = 1;
2720 let slot_list = [(slot_to_shrink, AccountInfo::default())];
2721 do_populate_accounts_for_shrink(ref_count, &slot_list);
2722 }
2723 index += 1;
2724 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2725 },
2726 None,
2727 self.scan_filter_for_shrinking,
2728 );
2729 assert_eq!(index, std::cmp::min(accounts.len(), count));
2730 stats
2731 .index_scan_returned_some
2732 .fetch_add(index_scan_returned_some_count, Ordering::Relaxed);
2733 stats
2734 .index_scan_returned_none
2735 .fetch_add(index_scan_returned_none_count, Ordering::Relaxed);
2736 stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
2737 stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);
2738
2739 LoadAccountsIndexForShrink {
2740 alive_accounts,
2741 pubkeys_to_unref,
2742 zero_lamport_single_ref_pubkeys,
2743 all_are_zero_lamports,
2744 }
2745 }
2746
2747 pub fn get_unique_accounts_from_storage(
2750 &self,
2751 store: &AccountStorageEntry,
2752 ) -> GetUniqueAccountsResult {
2753 let capacity = store.capacity();
2754 let mut stored_accounts = Vec::with_capacity(store.count());
2755 store
2756 .accounts
2757 .scan_accounts_without_data(|offset, account| {
2758 let file_id = 0;
2760 stored_accounts.push(AccountFromStorage {
2761 index_info: AccountInfo::new(
2762 StorageLocation::AppendVec(file_id, offset),
2763 account.is_zero_lamport(),
2764 ),
2765 pubkey: *account.pubkey(),
2766 data_len: account.data_len as u64,
2767 });
2768 })
2769 .expect("must scan accounts storage");
2770
2771 let num_duplicated_accounts = Self::sort_and_remove_dups(&mut stored_accounts);
2773
2774 GetUniqueAccountsResult {
2775 stored_accounts,
2776 capacity,
2777 num_duplicated_accounts,
2778 }
2779 }
2780
2781 #[cfg(feature = "dev-context-only-utils")]
2782 pub fn set_storage_access(&mut self, storage_access: StorageAccess) {
2783 self.storage_access = storage_access;
2784 }
2785
2786 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
2791 fn sort_and_remove_dups(accounts: &mut Vec<AccountFromStorage>) -> usize {
2792 accounts.sort_by(|a, b| a.pubkey().cmp(b.pubkey()));
2794 let len0 = accounts.len();
2795 if accounts.len() > 1 {
2796 let mut last = 0;
2797 let mut curr = 1;
2798
2799 while curr < accounts.len() {
2800 if accounts[curr].pubkey() != accounts[last].pubkey() {
2801 last += 1;
2802 }
2803 accounts[last] = accounts[curr];
2804 curr += 1;
2805 }
2806 accounts.truncate(last + 1);
2807 }
2808 len0 - accounts.len()
2809 }
2810
2811 pub(crate) fn get_unique_accounts_from_storage_for_shrink(
2812 &self,
2813 store: &AccountStorageEntry,
2814 stats: &ShrinkStats,
2815 ) -> GetUniqueAccountsResult {
2816 let (result, storage_read_elapsed_us) =
2817 measure_us!(self.get_unique_accounts_from_storage(store));
2818 stats
2819 .storage_read_elapsed
2820 .fetch_add(storage_read_elapsed_us, Ordering::Relaxed);
2821 stats
2822 .num_duplicated_accounts
2823 .fetch_add(result.num_duplicated_accounts as u64, Ordering::Relaxed);
2824 result
2825 }
2826
2827 pub(crate) fn shrink_collect<'a: 'b, 'b, T: ShrinkCollectRefs<'b>>(
2830 &self,
2831 store: &'a AccountStorageEntry,
2832 unique_accounts: &'b mut GetUniqueAccountsResult,
2833 stats: &ShrinkStats,
2834 ) -> ShrinkCollect<'b, T> {
2835 let slot = store.slot();
2836
2837 let GetUniqueAccountsResult {
2838 stored_accounts,
2839 capacity,
2840 num_duplicated_accounts,
2841 } = unique_accounts;
2842
2843 let mut index_read_elapsed = Measure::start("index_read_elapsed");
2844
2845 let obsolete_offsets: IntSet<_> = store
2849 .obsolete_accounts_read_lock()
2850 .filter_obsolete_accounts(None)
2851 .map(|(offset, _)| offset)
2852 .collect();
2853
2854 let total_starting_accounts = stored_accounts.len();
2856 stored_accounts.retain(|account| !obsolete_offsets.contains(&account.index_info.offset()));
2857
2858 let len = stored_accounts.len();
2859 let shrink_collect = Mutex::new(ShrinkCollect {
2860 slot,
2861 capacity: *capacity,
2862 pubkeys_to_unref: Vec::with_capacity(len),
2863 zero_lamport_single_ref_pubkeys: Vec::new(),
2864 alive_accounts: T::with_capacity(len, slot),
2865 total_starting_accounts,
2866 all_are_zero_lamports: true,
2867 alive_total_bytes: 0, });
2869
2870 stats
2871 .accounts_loaded
2872 .fetch_add(len as u64, Ordering::Relaxed);
2873 stats
2874 .obsolete_accounts_filtered
2875 .fetch_add((total_starting_accounts - len) as u64, Ordering::Relaxed);
2876 stats
2877 .num_duplicated_accounts
2878 .fetch_add(*num_duplicated_accounts as u64, Ordering::Relaxed);
2879 self.thread_pool_background.install(|| {
2880 stored_accounts
2881 .par_chunks(SHRINK_COLLECT_CHUNK_SIZE)
2882 .for_each(|stored_accounts| {
2883 let LoadAccountsIndexForShrink {
2884 alive_accounts,
2885 mut pubkeys_to_unref,
2886 all_are_zero_lamports,
2887 mut zero_lamport_single_ref_pubkeys,
2888 } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
2889
2890 let mut shrink_collect = shrink_collect.lock().unwrap();
2892 shrink_collect.alive_accounts.collect(alive_accounts);
2893 shrink_collect
2894 .pubkeys_to_unref
2895 .append(&mut pubkeys_to_unref);
2896 shrink_collect
2897 .zero_lamport_single_ref_pubkeys
2898 .append(&mut zero_lamport_single_ref_pubkeys);
2899 if !all_are_zero_lamports {
2900 shrink_collect.all_are_zero_lamports = false;
2901 }
2902 });
2903 });
2904
2905 index_read_elapsed.stop();
2906
2907 let mut shrink_collect = shrink_collect.into_inner().unwrap();
2908 let alive_total_bytes = shrink_collect.alive_accounts.alive_bytes();
2909 shrink_collect.alive_total_bytes = alive_total_bytes;
2910
2911 stats
2912 .index_read_elapsed
2913 .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
2914
2915 stats.accounts_removed.fetch_add(
2916 total_starting_accounts - shrink_collect.alive_accounts.len(),
2917 Ordering::Relaxed,
2918 );
2919 stats.bytes_removed.fetch_add(
2920 capacity.saturating_sub(alive_total_bytes as u64),
2921 Ordering::Relaxed,
2922 );
2923 stats
2924 .bytes_written
2925 .fetch_add(alive_total_bytes as u64, Ordering::Relaxed);
2926
2927 shrink_collect
2928 }
2929
2930 fn remove_zero_lamport_single_ref_accounts_after_shrink(
2941 &self,
2942 zero_lamport_single_ref_pubkeys: &[&Pubkey],
2943 slot: Slot,
2944 stats: &ShrinkStats,
2945 do_assert: bool,
2946 ) {
2947 stats.purged_zero_lamports.fetch_add(
2948 zero_lamport_single_ref_pubkeys.len() as u64,
2949 Ordering::Relaxed,
2950 );
2951
2952 self.accounts_index.scan(
2956 zero_lamport_single_ref_pubkeys.iter().cloned(),
2957 |_pubkey, _slots_refs| AccountsIndexScanResult::Unref,
2958 if do_assert {
2959 Some(AccountsIndexScanResult::UnrefAssert0)
2960 } else {
2961 Some(AccountsIndexScanResult::UnrefLog0)
2962 },
2963 ScanFilter::All,
2964 );
2965
2966 zero_lamport_single_ref_pubkeys.iter().for_each(|k| {
2967 _ = self.purge_keys_exact([(**k, slot)]);
2968 });
2969 }
2970
2971 pub(crate) fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
2974 &self,
2975 shrink_collect: &ShrinkCollect<'a, T>,
2976 stats: &ShrinkStats,
2977 shrink_in_progress: Option<ShrinkInProgress>,
2978 shrink_can_be_active: bool,
2979 ) {
2980 let mut time = Measure::start("remove_old_stores_shrink");
2981
2982 self.remove_zero_lamport_single_ref_accounts_after_shrink(
2986 &shrink_collect.zero_lamport_single_ref_pubkeys,
2987 shrink_collect.slot,
2988 stats,
2989 false,
2990 );
2991
2992 let dead_storages = self.mark_dirty_dead_stores(
2996 shrink_collect.slot,
2997 shrink_collect.all_are_zero_lamports,
3000 shrink_in_progress,
3001 shrink_can_be_active,
3002 );
3003 let dead_storages_len = dead_storages.len();
3004
3005 if !shrink_collect.all_are_zero_lamports {
3006 self.add_uncleaned_pubkeys_after_shrink(
3007 shrink_collect.slot,
3008 shrink_collect.pubkeys_to_unref.iter().cloned().cloned(),
3009 );
3010 }
3011
3012 let (_, drop_storage_entries_elapsed) = measure_us!(drop(dead_storages));
3013 time.stop();
3014
3015 self.stats
3016 .dropped_stores
3017 .fetch_add(dead_storages_len as u64, Ordering::Relaxed);
3018 stats
3019 .drop_storage_entries_elapsed
3020 .fetch_add(drop_storage_entries_elapsed, Ordering::Relaxed);
3021 stats
3022 .remove_old_stores_shrink_us
3023 .fetch_add(time.as_us(), Ordering::Relaxed);
3024 }
3025
3026 pub(crate) fn unref_shrunk_dead_accounts<'a>(
3027 &self,
3028 pubkeys: impl Iterator<Item = &'a Pubkey>,
3029 slot: Slot,
3030 ) {
3031 self.accounts_index.scan(
3032 pubkeys,
3033 |pubkey, slot_refs| {
3034 match slot_refs {
3035 Some((slot_list, ref_count)) => {
3036 if slot_list.len() == 1 && ref_count == 2 {
3038 if let Some((slot_alive, acct_info)) = slot_list.first() {
3039 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
3040 self.zero_lamport_single_ref_found(
3041 *slot_alive,
3042 acct_info.offset(),
3043 );
3044 }
3045 }
3046 }
3047 }
3048 None => {
3049 warn!(
3053 "pubkey {pubkey} in slot {slot} was NOT found in accounts index \
3054 during shrink"
3055 );
3056 datapoint_warn!(
3057 "accounts_db-shink_pubkey_missing_from_index",
3058 ("store_slot", slot, i64),
3059 ("pubkey", pubkey.to_string(), String),
3060 );
3061 }
3062 }
3063 AccountsIndexScanResult::Unref
3064 },
3065 None,
3066 ScanFilter::All,
3067 );
3068 }
3069
3070 pub(crate) fn zero_lamport_single_ref_found(&self, slot: Slot, offset: Offset) {
3072 if let Some(store) = self
3085 .storage
3086 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3087 {
3088 if store.insert_zero_lamport_single_ref_account_offset(offset) {
3089 self.shrink_stats
3091 .num_zero_lamport_single_ref_accounts_found
3092 .fetch_add(1, Ordering::Relaxed);
3093
3094 if store.num_zero_lamport_single_ref_accounts() == store.count() {
3095 self.dirty_stores.entry(slot).or_insert(store);
3097 self.shrink_stats
3098 .num_dead_slots_added_to_clean
3099 .fetch_add(1, Ordering::Relaxed);
3100 } else if Self::is_shrinking_productive(&store)
3101 && self.is_candidate_for_shrink(&store)
3102 {
3103 let is_new = self.shrink_candidate_slots.lock().unwrap().insert(slot);
3105 if is_new {
3106 self.shrink_stats
3107 .num_slots_with_zero_lamport_accounts_added_to_shrink
3108 .fetch_add(1, Ordering::Relaxed);
3109 }
3110 } else {
3111 self.shrink_stats
3112 .marking_zero_dead_accounts_in_non_shrinkable_store
3113 .fetch_add(1, Ordering::Relaxed);
3114 }
3115 }
3116 }
3117 }
3118
3119 fn shrink_storage(&self, store: Arc<AccountStorageEntry>) {
3121 let slot = store.slot();
3122 if self.accounts_cache.contains(slot) {
3123 return;
3135 }
3136 let mut unique_accounts =
3137 self.get_unique_accounts_from_storage_for_shrink(&store, &self.shrink_stats);
3138 debug!("do_shrink_slot_store: slot: {slot}");
3139 let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
3140 &store,
3141 &mut unique_accounts,
3142 &self.shrink_stats,
3143 );
3144
3145 if Self::should_not_shrink(
3148 shrink_collect.alive_total_bytes as u64,
3149 shrink_collect.capacity,
3150 ) || shrink_collect.alive_total_bytes == 0
3151 {
3152 if shrink_collect.alive_total_bytes == 0 {
3153 self.dirty_stores.insert(slot, store.clone());
3155 }
3156
3157 if !shrink_collect.all_are_zero_lamports {
3158 info!(
3160 "Unexpected shrink for slot {} alive {} capacity {}, likely caused by a bug \
3161 for calculating alive bytes.",
3162 slot, shrink_collect.alive_total_bytes, shrink_collect.capacity
3163 );
3164 }
3165
3166 self.shrink_stats
3167 .skipped_shrink
3168 .fetch_add(1, Ordering::Relaxed);
3169 return;
3170 }
3171
3172 self.unref_shrunk_dead_accounts(shrink_collect.pubkeys_to_unref.iter().cloned(), slot);
3173
3174 let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
3175 debug!(
3176 "shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
3177 slot,
3178 shrink_collect.total_starting_accounts,
3179 total_accounts_after_shrink,
3180 shrink_collect.alive_total_bytes,
3181 shrink_collect.capacity,
3182 );
3183
3184 let mut stats_sub = ShrinkStatsSub::default();
3185 let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
3186 let (shrink_in_progress, time_us) =
3187 measure_us!(self.get_store_for_shrink(slot, shrink_collect.alive_total_bytes as u64));
3188 stats_sub.create_and_insert_store_elapsed_us = Saturating(time_us);
3189
3190 let accounts = [(slot, &shrink_collect.alive_accounts.alive_accounts()[..])];
3194 let storable_accounts = StorableAccountsBySlot::new(slot, &accounts, self);
3195 stats_sub.store_accounts_timing = self.store_accounts_frozen(
3196 storable_accounts,
3197 shrink_in_progress.new_storage(),
3198 UpdateIndexThreadSelection::PoolWithThreshold,
3199 );
3200
3201 rewrite_elapsed.stop();
3202 stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us());
3203
3204 self.shrink_candidate_slots.lock().unwrap().remove(&slot);
3210
3211 self.remove_old_stores_shrink(
3212 &shrink_collect,
3213 &self.shrink_stats,
3214 Some(shrink_in_progress),
3215 false,
3216 );
3217
3218 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
3219
3220 Self::update_shrink_stats(&self.shrink_stats, stats_sub, true);
3221 self.shrink_stats.report();
3222 }
3223
3224 pub(crate) fn update_shrink_stats(
3225 shrink_stats: &ShrinkStats,
3226 stats_sub: ShrinkStatsSub,
3227 increment_count: bool,
3228 ) {
3229 if increment_count {
3230 shrink_stats
3231 .num_slots_shrunk
3232 .fetch_add(1, Ordering::Relaxed);
3233 }
3234 shrink_stats.create_and_insert_store_elapsed.fetch_add(
3235 stats_sub.create_and_insert_store_elapsed_us.0,
3236 Ordering::Relaxed,
3237 );
3238 shrink_stats.store_accounts_elapsed.fetch_add(
3239 stats_sub.store_accounts_timing.store_accounts_elapsed,
3240 Ordering::Relaxed,
3241 );
3242 shrink_stats.update_index_elapsed.fetch_add(
3243 stats_sub.store_accounts_timing.update_index_elapsed,
3244 Ordering::Relaxed,
3245 );
3246 shrink_stats.handle_reclaims_elapsed.fetch_add(
3247 stats_sub.store_accounts_timing.handle_reclaims_elapsed,
3248 Ordering::Relaxed,
3249 );
3250 shrink_stats
3251 .rewrite_elapsed
3252 .fetch_add(stats_sub.rewrite_elapsed_us.0, Ordering::Relaxed);
3253 shrink_stats
3254 .unpackable_slots_count
3255 .fetch_add(stats_sub.unpackable_slots_count.0 as u64, Ordering::Relaxed);
3256 shrink_stats.newest_alive_packed_count.fetch_add(
3257 stats_sub.newest_alive_packed_count.0 as u64,
3258 Ordering::Relaxed,
3259 );
3260 }
3261
3262 pub fn mark_dirty_dead_stores(
3267 &self,
3268 slot: Slot,
3269 add_dirty_stores: bool,
3270 shrink_in_progress: Option<ShrinkInProgress>,
3271 shrink_can_be_active: bool,
3272 ) -> Vec<Arc<AccountStorageEntry>> {
3273 let mut dead_storages = Vec::default();
3274
3275 let mut not_retaining_store = |store: &Arc<AccountStorageEntry>| {
3276 if add_dirty_stores {
3277 self.dirty_stores.insert(slot, store.clone());
3278 }
3279 dead_storages.push(store.clone());
3280 };
3281
3282 if let Some(shrink_in_progress) = shrink_in_progress {
3283 not_retaining_store(shrink_in_progress.old_storage());
3285 } else if let Some(store) = self.storage.remove(&slot, shrink_can_be_active) {
3287 not_retaining_store(&store);
3289 }
3290
3291 dead_storages
3292 }
3293
3294 pub(crate) fn reopen_storage_as_readonly_shrinking_in_progress_ok(&self, slot: Slot) {
3297 if let Some(storage) = self
3298 .storage
3299 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3300 {
3301 if let Some(new_storage) = storage.reopen_as_readonly(self.storage_access) {
3302 assert_eq!(storage.id(), new_storage.id());
3307 assert_eq!(storage.accounts.len(), new_storage.accounts.len());
3308 self.storage
3309 .replace_storage_with_equivalent(slot, Arc::new(new_storage));
3310 }
3311 }
3312 }
3313
3314 pub fn get_store_for_shrink(&self, slot: Slot, size: u64) -> ShrinkInProgress<'_> {
3316 let shrunken_store = self.create_store(slot, size, "shrink", self.shrink_paths.as_slice());
3317 self.storage.shrinking_in_progress(slot, shrunken_store)
3318 }
3319
3320 fn shrink_slot_forced(&self, slot: Slot) {
3323 debug!("shrink_slot_forced: slot: {slot}");
3324
3325 if let Some(store) = self
3326 .storage
3327 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3328 {
3329 if Self::is_shrinking_productive(&store) {
3330 self.shrink_storage(store)
3331 }
3332 }
3333 }
3334
3335 fn all_slots_in_storage(&self) -> Vec<Slot> {
3336 self.storage.all_slots()
3337 }
3338
3339 fn select_candidates_by_total_usage(
3347 &self,
3348 shrink_slots: &ShrinkCandidates,
3349 shrink_ratio: f64,
3350 ) -> (IntMap<Slot, Arc<AccountStorageEntry>>, ShrinkCandidates) {
3351 struct StoreUsageInfo {
3352 slot: Slot,
3353 alive_ratio: f64,
3354 store: Arc<AccountStorageEntry>,
3355 }
3356 let mut store_usage: Vec<StoreUsageInfo> = Vec::with_capacity(shrink_slots.len());
3357 let mut total_alive_bytes: u64 = 0;
3358 let mut total_bytes: u64 = 0;
3359 for slot in shrink_slots {
3360 let Some(store) = self.storage.get_slot_storage_entry(*slot) else {
3361 continue;
3362 };
3363 let alive_bytes = store.alive_bytes();
3364 total_alive_bytes += alive_bytes as u64;
3365 total_bytes += store.capacity();
3366 let alive_ratio = alive_bytes as f64 / store.capacity() as f64;
3367 store_usage.push(StoreUsageInfo {
3368 slot: *slot,
3369 alive_ratio,
3370 store: store.clone(),
3371 });
3372 }
3373 store_usage.sort_by(|a, b| {
3374 a.alive_ratio
3375 .partial_cmp(&b.alive_ratio)
3376 .unwrap_or(std::cmp::Ordering::Equal)
3377 });
3378
3379 let mut shrink_slots = IntMap::default();
3382 let mut shrink_slots_next_batch = ShrinkCandidates::default();
3383 for usage in &store_usage {
3384 let store = &usage.store;
3385 let alive_ratio = (total_alive_bytes as f64) / (total_bytes as f64);
3386 debug!(
3387 "alive_ratio: {:?} store_id: {:?}, store_ratio: {:?} requirement: {:?}, \
3388 total_bytes: {:?} total_alive_bytes: {:?}",
3389 alive_ratio,
3390 usage.store.id(),
3391 usage.alive_ratio,
3392 shrink_ratio,
3393 total_bytes,
3394 total_alive_bytes
3395 );
3396 if alive_ratio > shrink_ratio {
3397 debug!(
3399 "Shrinking goal can be achieved at slot {:?}, total_alive_bytes: {:?} \
3400 total_bytes: {:?}, alive_ratio: {:}, shrink_ratio: {:?}",
3401 usage.slot, total_alive_bytes, total_bytes, alive_ratio, shrink_ratio
3402 );
3403 if usage.alive_ratio < shrink_ratio {
3404 shrink_slots_next_batch.insert(usage.slot);
3405 } else {
3406 break;
3407 }
3408 } else {
3409 let current_store_size = store.capacity();
3410 let after_shrink_size = store.alive_bytes() as u64;
3411 let bytes_saved = current_store_size.saturating_sub(after_shrink_size);
3412 total_bytes -= bytes_saved;
3413 shrink_slots.insert(usage.slot, Arc::clone(store));
3414 }
3415 }
3416 (shrink_slots, shrink_slots_next_batch)
3417 }
3418
3419 fn get_roots_less_than(&self, slot: Slot) -> Vec<Slot> {
3420 self.accounts_index
3421 .roots_tracker
3422 .read()
3423 .unwrap()
3424 .alive_roots
3425 .get_all_less_than(slot)
3426 }
3427
3428 fn get_sorted_potential_ancient_slots(&self, oldest_non_ancient_slot: Slot) -> Vec<Slot> {
3432 let mut ancient_slots = self.get_roots_less_than(oldest_non_ancient_slot);
3433 ancient_slots.sort_unstable();
3434 ancient_slots
3435 }
3436
3437 pub fn shrink_ancient_slots(&self, epoch_schedule: &EpochSchedule) {
3440 if self.ancient_append_vec_offset.is_none() {
3441 return;
3442 }
3443
3444 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3445 let can_randomly_shrink = true;
3446 let sorted_slots = self.get_sorted_potential_ancient_slots(oldest_non_ancient_slot);
3447 self.combine_ancient_slots_packed(sorted_slots, can_randomly_shrink);
3448 }
3449
3450 pub(crate) fn handle_dropped_roots_for_ancient(
3453 &self,
3454 dropped_roots: impl Iterator<Item = Slot>,
3455 ) {
3456 dropped_roots.for_each(|slot| {
3457 self.accounts_index.clean_dead_slot(slot);
3458 assert!(self.storage.remove(&slot, false).is_none());
3460 debug_assert!(
3461 !self
3462 .accounts_index
3463 .roots_tracker
3464 .read()
3465 .unwrap()
3466 .alive_roots
3467 .contains(&slot),
3468 "slot: {slot}"
3469 );
3470 });
3471 }
3472
3473 fn add_uncleaned_pubkeys_after_shrink(
3476 &self,
3477 slot: Slot,
3478 pubkeys: impl Iterator<Item = Pubkey>,
3479 ) {
3480 let mut uncleaned_pubkeys = self.uncleaned_pubkeys.entry(slot).or_default();
3500 uncleaned_pubkeys.extend(pubkeys);
3501 }
3502
3503 pub fn shrink_candidate_slots(&self, epoch_schedule: &EpochSchedule) -> usize {
3504 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3505
3506 let shrink_candidates_slots =
3507 std::mem::take(&mut *self.shrink_candidate_slots.lock().unwrap());
3508 self.shrink_stats
3509 .initial_candidates_count
3510 .store(shrink_candidates_slots.len() as u64, Ordering::Relaxed);
3511
3512 let candidates_count = shrink_candidates_slots.len();
3513 let ((mut shrink_slots, shrink_slots_next_batch), select_time_us) = measure_us!({
3514 if let AccountShrinkThreshold::TotalSpace { shrink_ratio } = self.shrink_ratio {
3515 let (shrink_slots, shrink_slots_next_batch) =
3516 self.select_candidates_by_total_usage(&shrink_candidates_slots, shrink_ratio);
3517 (shrink_slots, Some(shrink_slots_next_batch))
3518 } else {
3519 (
3520 shrink_candidates_slots
3522 .into_iter()
3523 .filter_map(|slot| {
3524 self.storage
3525 .get_slot_storage_entry(slot)
3526 .map(|storage| (slot, storage))
3527 })
3528 .collect(),
3529 None,
3530 )
3531 }
3532 });
3533
3534 if shrink_slots.len() < SHRINK_INSERT_ANCIENT_THRESHOLD {
3537 let mut ancients = self.best_ancient_slots_to_shrink.write().unwrap();
3538 while let Some((slot, capacity)) = ancients.pop_front() {
3539 if let Some(store) = self.storage.get_slot_storage_entry(slot) {
3540 if !shrink_slots.contains(&slot)
3541 && capacity == store.capacity()
3542 && Self::is_candidate_for_shrink(self, &store)
3543 {
3544 let ancient_bytes_added_to_shrink = store.alive_bytes() as u64;
3545 shrink_slots.insert(slot, store);
3546 self.shrink_stats
3547 .ancient_bytes_added_to_shrink
3548 .fetch_add(ancient_bytes_added_to_shrink, Ordering::Relaxed);
3549 self.shrink_stats
3550 .ancient_slots_added_to_shrink
3551 .fetch_add(1, Ordering::Relaxed);
3552 break;
3553 }
3554 }
3555 }
3556 }
3557 if shrink_slots.is_empty()
3558 && shrink_slots_next_batch
3559 .as_ref()
3560 .map(|s| s.is_empty())
3561 .unwrap_or(true)
3562 {
3563 return 0;
3564 }
3565
3566 let _guard = (!shrink_slots.is_empty())
3567 .then_some(|| self.active_stats.activate(ActiveStatItem::Shrink));
3568
3569 let num_selected = shrink_slots.len();
3570 let (_, shrink_all_us) = measure_us!({
3571 self.thread_pool_background.install(|| {
3572 shrink_slots
3573 .into_par_iter()
3574 .for_each(|(slot, slot_shrink_candidate)| {
3575 if self.ancient_append_vec_offset.is_some()
3576 && slot < oldest_non_ancient_slot
3577 {
3578 self.shrink_stats
3579 .num_ancient_slots_shrunk
3580 .fetch_add(1, Ordering::Relaxed);
3581 }
3582 self.shrink_storage(slot_shrink_candidate);
3583 });
3584 })
3585 });
3586
3587 let mut pended_counts: usize = 0;
3588 if let Some(shrink_slots_next_batch) = shrink_slots_next_batch {
3589 let mut shrink_slots = self.shrink_candidate_slots.lock().unwrap();
3590 pended_counts = shrink_slots_next_batch.len();
3591 for slot in shrink_slots_next_batch {
3592 shrink_slots.insert(slot);
3593 }
3594 }
3595
3596 datapoint_info!(
3597 "shrink_candidate_slots",
3598 ("select_time_us", select_time_us, i64),
3599 ("shrink_all_us", shrink_all_us, i64),
3600 ("candidates_count", candidates_count, i64),
3601 ("selected_count", num_selected, i64),
3602 ("deferred_to_next_round_count", pended_counts, i64)
3603 );
3604
3605 num_selected
3606 }
3607
3608 pub fn shrink_all_slots(
3613 &self,
3614 is_startup: bool,
3615 epoch_schedule: &EpochSchedule,
3616 newest_slot_skip_shrink_inclusive: Option<Slot>,
3617 ) {
3618 let _guard = self.active_stats.activate(ActiveStatItem::Shrink);
3619 const DIRTY_STORES_CLEANING_THRESHOLD: usize = 10_000;
3620 const OUTER_CHUNK_SIZE: usize = 2000;
3621 let mut slots = self.all_slots_in_storage();
3622 if let Some(newest_slot_skip_shrink_inclusive) = newest_slot_skip_shrink_inclusive {
3623 slots.retain(|slot| slot < &newest_slot_skip_shrink_inclusive);
3626 }
3627
3628 let maybe_clean = || {
3637 if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
3638 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
3639 self.clean_accounts(latest_full_snapshot_slot, is_startup, epoch_schedule);
3640 }
3641 };
3642
3643 if is_startup {
3644 let threads = num_cpus::get();
3645 let inner_chunk_size = std::cmp::max(OUTER_CHUNK_SIZE / threads, 1);
3646 slots.chunks(OUTER_CHUNK_SIZE).for_each(|chunk| {
3647 chunk.par_chunks(inner_chunk_size).for_each(|slots| {
3648 for slot in slots {
3649 self.shrink_slot_forced(*slot);
3650 }
3651 });
3652 maybe_clean();
3653 });
3654 } else {
3655 for slot in slots {
3656 self.shrink_slot_forced(slot);
3657 maybe_clean();
3658 }
3659 }
3660 }
3661
3662 pub fn scan_accounts<F>(
3663 &self,
3664 ancestors: &Ancestors,
3665 bank_id: BankId,
3666 mut scan_func: F,
3667 config: &ScanConfig,
3668 ) -> ScanResult<()>
3669 where
3670 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3671 {
3672 self.accounts_index.scan_accounts(
3674 ancestors,
3675 bank_id,
3676 |pubkey, (account_info, slot)| {
3677 let mut account_accessor =
3678 self.get_account_accessor(slot, pubkey, &account_info.storage_location());
3679
3680 let account_slot = match account_accessor {
3681 LoadedAccountAccessor::Cached(None) => None,
3682 _ => account_accessor.get_loaded_account(|loaded_account| {
3683 (pubkey, loaded_account.take_account(), slot)
3684 }),
3685 };
3686 scan_func(account_slot)
3687 },
3688 config,
3689 )?;
3690
3691 Ok(())
3692 }
3693
3694 pub fn index_scan_accounts<F>(
3695 &self,
3696 ancestors: &Ancestors,
3697 bank_id: BankId,
3698 index_key: IndexKey,
3699 mut scan_func: F,
3700 config: &ScanConfig,
3701 ) -> ScanResult<bool>
3702 where
3703 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3704 {
3705 let key = match &index_key {
3706 IndexKey::ProgramId(key) => key,
3707 IndexKey::SplTokenMint(key) => key,
3708 IndexKey::SplTokenOwner(key) => key,
3709 };
3710 if !self.account_indexes.include_key(key) {
3711 let used_index = false;
3713 self.scan_accounts(ancestors, bank_id, scan_func, config)?;
3714 return Ok(used_index);
3715 }
3716
3717 self.accounts_index.index_scan_accounts(
3718 ancestors,
3719 bank_id,
3720 index_key,
3721 |pubkey, (account_info, slot)| {
3722 let account_slot = self
3723 .get_account_accessor(slot, pubkey, &account_info.storage_location())
3724 .get_loaded_account(|loaded_account| {
3725 (pubkey, loaded_account.take_account(), slot)
3726 });
3727 scan_func(account_slot)
3728 },
3729 config,
3730 )?;
3731 let used_index = true;
3732 Ok(used_index)
3733 }
3734
3735 pub(crate) fn scan_account_storage<R, B>(
3737 &self,
3738 slot: Slot,
3739 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
3740 storage_scan_func: impl for<'a, 'b, 'storage> Fn(
3741 &'b mut B,
3742 &'a StoredAccountInfoWithoutData<'storage>,
3743 Option<&'storage [u8]>, ) + Sync,
3745 scan_account_storage_data: ScanAccountStorageData,
3746 ) -> ScanStorageResult<R, B>
3747 where
3748 R: Send,
3749 B: Send + Default + Sync,
3750 {
3751 self.scan_cache_storage_fallback(slot, cache_map_func, |retval, storage| {
3752 match scan_account_storage_data {
3753 ScanAccountStorageData::NoData => {
3754 storage.scan_accounts_without_data(|_offset, account_without_data| {
3755 storage_scan_func(retval, &account_without_data, None);
3756 })
3757 }
3758 ScanAccountStorageData::DataRefForStorage => {
3759 let mut reader = append_vec::new_scan_accounts_reader();
3760 storage.scan_accounts(&mut reader, |_offset, account| {
3761 let account_without_data = StoredAccountInfoWithoutData::new_from(&account);
3762 storage_scan_func(retval, &account_without_data, Some(account.data));
3763 })
3764 }
3765 }
3766 .expect("must scan accounts storage");
3767 })
3768 }
3769
3770 pub fn scan_cache_storage_fallback<R, B>(
3772 &self,
3773 slot: Slot,
3774 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
3775 storage_fallback_func: impl Fn(&mut B, &AccountsFile) + Sync,
3776 ) -> ScanStorageResult<R, B>
3777 where
3778 R: Send,
3779 B: Send + Default + Sync,
3780 {
3781 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
3782 if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD {
3785 ScanStorageResult::Cached(self.thread_pool_foreground.install(|| {
3786 slot_cache
3787 .par_iter()
3788 .filter_map(|cached_account| {
3789 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
3790 cached_account.value(),
3791 )))
3792 })
3793 .collect()
3794 }))
3795 } else {
3796 ScanStorageResult::Cached(
3797 slot_cache
3798 .iter()
3799 .filter_map(|cached_account| {
3800 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
3801 cached_account.value(),
3802 )))
3803 })
3804 .collect(),
3805 )
3806 }
3807 } else {
3808 let mut retval = B::default();
3809 if let Some(storage) = self
3822 .storage
3823 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3824 {
3825 storage_fallback_func(&mut retval, &storage.accounts);
3826 }
3827
3828 ScanStorageResult::Stored(retval)
3829 }
3830 }
3831
3832 pub fn load(
3833 &self,
3834 ancestors: &Ancestors,
3835 pubkey: &Pubkey,
3836 load_hint: LoadHint,
3837 ) -> Option<(AccountSharedData, Slot)> {
3838 self.do_load(ancestors, pubkey, None, load_hint, LoadZeroLamports::None)
3839 }
3840
3841 pub fn load_account_into_read_cache(&self, ancestors: &Ancestors, pubkey: &Pubkey) {
3844 self.do_load_with_populate_read_cache(
3845 ancestors,
3846 pubkey,
3847 None,
3848 LoadHint::Unspecified,
3849 true,
3850 LoadZeroLamports::None,
3852 );
3853 }
3854
3855 pub fn load_with_fixed_root(
3857 &self,
3858 ancestors: &Ancestors,
3859 pubkey: &Pubkey,
3860 ) -> Option<(AccountSharedData, Slot)> {
3861 self.load(ancestors, pubkey, LoadHint::FixedMaxRoot)
3862 }
3863
3864 fn read_index_for_accessor_or_load_slow<'a>(
3865 &'a self,
3866 ancestors: &Ancestors,
3867 pubkey: &'a Pubkey,
3868 max_root: Option<Slot>,
3869 clone_in_lock: bool,
3870 ) -> Option<(Slot, StorageLocation, Option<LoadedAccountAccessor<'a>>)> {
3871 self.accounts_index.get_with_and_then(
3872 pubkey,
3873 Some(ancestors),
3874 max_root,
3875 true,
3876 |(slot, account_info)| {
3877 let storage_location = account_info.storage_location();
3878 let account_accessor = clone_in_lock
3879 .then(|| self.get_account_accessor(slot, pubkey, &storage_location));
3880 (slot, storage_location, account_accessor)
3881 },
3882 )
3883 }
3884
3885 fn retry_to_get_account_accessor<'a>(
3886 &'a self,
3887 mut slot: Slot,
3888 mut storage_location: StorageLocation,
3889 ancestors: &'a Ancestors,
3890 pubkey: &'a Pubkey,
3891 max_root: Option<Slot>,
3892 load_hint: LoadHint,
3893 ) -> Option<(LoadedAccountAccessor<'a>, Slot)> {
3894 #[cfg(test)]
3998 {
3999 sleep(Duration::from_millis(self.load_delay));
4001 }
4002
4003 let mut num_acceptable_failed_iterations = 0;
4005 loop {
4006 let account_accessor = self.get_account_accessor(slot, pubkey, &storage_location);
4007 match account_accessor {
4008 LoadedAccountAccessor::Cached(Some(_)) | LoadedAccountAccessor::Stored(Some(_)) => {
4009 return Some((account_accessor, slot));
4011 }
4012 LoadedAccountAccessor::Cached(None) => {
4013 num_acceptable_failed_iterations += 1;
4014 match load_hint {
4018 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4019 assert!(num_acceptable_failed_iterations <= 1);
4028 }
4029 LoadHint::Unspecified => {
4030 }
4034 }
4035 }
4036 LoadedAccountAccessor::Stored(None) => {
4037 match load_hint {
4038 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4039 }
4065 LoadHint::Unspecified => {
4066 num_acceptable_failed_iterations += 1;
4077 }
4078 }
4079 }
4080 }
4081 #[cfg(not(test))]
4082 let load_limit = ABSURD_CONSECUTIVE_FAILED_ITERATIONS;
4083
4084 #[cfg(test)]
4085 let load_limit = self.load_limit.load(Ordering::Relaxed);
4086
4087 let fallback_to_slow_path = if num_acceptable_failed_iterations >= load_limit {
4088 let message = format!(
4092 "do_load() failed to get key: {pubkey} from storage, latest attempt was for \
4093 slot: {slot}, storage_location: {storage_location:?}, load_hint: \
4094 {load_hint:?}",
4095 );
4096 datapoint_warn!("accounts_db-do_load_warn", ("warn", message, String));
4097 true
4098 } else {
4099 false
4100 };
4101
4102 let (new_slot, new_storage_location, maybe_account_accessor) = self
4104 .read_index_for_accessor_or_load_slow(
4105 ancestors,
4106 pubkey,
4107 max_root,
4108 fallback_to_slow_path,
4109 )?;
4110 if new_slot == slot && new_storage_location.is_store_id_equal(&storage_location) {
4113 self.accounts_index
4114 .get_and_then(pubkey, |entry| -> (_, ()) {
4115 let message = format!(
4116 "Bad index entry detected ({pubkey}, {slot}, {storage_location:?}, \
4117 {load_hint:?}, {new_storage_location:?}, {entry:?})"
4118 );
4119 assert!(
4123 new_storage_location.is_offset_equal(&storage_location),
4124 "{message}"
4125 );
4126
4127 assert!(!new_storage_location.is_cached(), "{message}");
4131
4132 assert_eq!(load_hint, LoadHint::Unspecified, "{message}");
4138
4139 panic!("{message}");
4148 });
4149 } else if fallback_to_slow_path {
4150 return Some((
4153 maybe_account_accessor.expect("must be some if clone_in_lock=true"),
4154 new_slot,
4155 ));
4156 }
4157
4158 slot = new_slot;
4159 storage_location = new_storage_location;
4160 }
4161 }
4162
4163 fn do_load(
4164 &self,
4165 ancestors: &Ancestors,
4166 pubkey: &Pubkey,
4167 max_root: Option<Slot>,
4168 load_hint: LoadHint,
4169 load_zero_lamports: LoadZeroLamports,
4170 ) -> Option<(AccountSharedData, Slot)> {
4171 self.do_load_with_populate_read_cache(
4172 ancestors,
4173 pubkey,
4174 max_root,
4175 load_hint,
4176 false,
4177 load_zero_lamports,
4178 )
4179 }
4180
4181 pub fn load_account_with(
4186 &self,
4187 ancestors: &Ancestors,
4188 pubkey: &Pubkey,
4189 should_put_in_read_cache: bool,
4190 ) -> Option<(AccountSharedData, Slot)> {
4191 let (slot, storage_location, _maybe_account_accessor) =
4192 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, None, false)?;
4193 let in_write_cache = storage_location.is_cached();
4196 if !in_write_cache {
4197 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4198 if let Some(account) = result {
4199 if account.is_zero_lamport() {
4200 return None;
4201 }
4202 return Some((account, slot));
4203 }
4204 }
4205
4206 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4207 slot,
4208 storage_location,
4209 ancestors,
4210 pubkey,
4211 None,
4212 LoadHint::Unspecified,
4213 )?;
4214
4215 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4218 let account = account_accessor.check_and_get_loaded_account_shared_data();
4219 if account.is_zero_lamport() {
4220 return None;
4221 }
4222
4223 if !in_write_cache && should_put_in_read_cache {
4224 self.read_only_accounts_cache
4237 .store(*pubkey, slot, account.clone());
4238 }
4239 Some((account, slot))
4240 }
4241
4242 fn do_load_with_populate_read_cache(
4245 &self,
4246 ancestors: &Ancestors,
4247 pubkey: &Pubkey,
4248 max_root: Option<Slot>,
4249 load_hint: LoadHint,
4250 load_into_read_cache_only: bool,
4251 load_zero_lamports: LoadZeroLamports,
4252 ) -> Option<(AccountSharedData, Slot)> {
4253 #[cfg(not(test))]
4254 assert!(max_root.is_none());
4255
4256 let starting_max_root = self.accounts_index.max_root_inclusive();
4257
4258 let (slot, storage_location, _maybe_account_accessor) =
4259 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, max_root, false)?;
4260 let in_write_cache = storage_location.is_cached();
4263 if !load_into_read_cache_only {
4264 if !in_write_cache {
4265 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4266 if let Some(account) = result {
4267 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4268 return None;
4269 }
4270 return Some((account, slot));
4271 }
4272 }
4273 } else {
4274 if in_write_cache {
4276 return None;
4278 }
4279 if self.read_only_accounts_cache.in_cache(pubkey, slot) {
4280 return None;
4282 }
4283 }
4284
4285 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4286 slot,
4287 storage_location,
4288 ancestors,
4289 pubkey,
4290 max_root,
4291 load_hint,
4292 )?;
4293 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4296 let account = account_accessor.check_and_get_loaded_account_shared_data();
4297 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4298 return None;
4299 }
4300
4301 if !in_write_cache && load_hint != LoadHint::FixedMaxRootDoNotPopulateReadCache {
4302 self.read_only_accounts_cache
4315 .store(*pubkey, slot, account.clone());
4316 }
4317 if load_hint == LoadHint::FixedMaxRoot
4318 || load_hint == LoadHint::FixedMaxRootDoNotPopulateReadCache
4319 {
4320 let ending_max_root = self.accounts_index.max_root_inclusive();
4322 if starting_max_root != ending_max_root {
4323 warn!(
4324 "do_load_with_populate_read_cache() scanning pubkey {pubkey} called with \
4325 fixed max root, but max root changed from {starting_max_root} to \
4326 {ending_max_root} during function call"
4327 );
4328 }
4329 }
4330 Some((account, slot))
4331 }
4332
4333 fn get_account_accessor<'a>(
4334 &'a self,
4335 slot: Slot,
4336 pubkey: &'a Pubkey,
4337 storage_location: &StorageLocation,
4338 ) -> LoadedAccountAccessor<'a> {
4339 match storage_location {
4340 StorageLocation::Cached => {
4341 let maybe_cached_account = self.accounts_cache.load(slot, pubkey).map(Cow::Owned);
4342 LoadedAccountAccessor::Cached(maybe_cached_account)
4343 }
4344 StorageLocation::AppendVec(store_id, offset) => {
4345 let maybe_storage_entry = self
4346 .storage
4347 .get_account_storage_entry(slot, *store_id)
4348 .map(|account_storage_entry| (account_storage_entry, *offset));
4349 LoadedAccountAccessor::Stored(maybe_storage_entry)
4350 }
4351 }
4352 }
4353
4354 fn create_store(
4355 &self,
4356 slot: Slot,
4357 size: u64,
4358 from: &str,
4359 paths: &[PathBuf],
4360 ) -> Arc<AccountStorageEntry> {
4361 self.stats
4362 .create_store_count
4363 .fetch_add(1, Ordering::Relaxed);
4364 let path_index = thread_rng().gen_range(0..paths.len());
4365 let store = Arc::new(self.new_storage_entry(slot, Path::new(&paths[path_index]), size));
4366
4367 debug!(
4368 "creating store: {} slot: {} len: {} size: {} from: {} path: {}",
4369 store.id(),
4370 slot,
4371 store.accounts.len(),
4372 store.accounts.capacity(),
4373 from,
4374 store.accounts.path().display(),
4375 );
4376
4377 store
4378 }
4379
4380 fn create_and_insert_store(
4381 &self,
4382 slot: Slot,
4383 size: u64,
4384 from: &str,
4385 ) -> Arc<AccountStorageEntry> {
4386 self.create_and_insert_store_with_paths(slot, size, from, &self.paths)
4387 }
4388
4389 fn create_and_insert_store_with_paths(
4390 &self,
4391 slot: Slot,
4392 size: u64,
4393 from: &str,
4394 paths: &[PathBuf],
4395 ) -> Arc<AccountStorageEntry> {
4396 let store = self.create_store(slot, size, from, paths);
4397 let store_for_index = store.clone();
4398
4399 self.insert_store(slot, store_for_index);
4400 store
4401 }
4402
4403 fn insert_store(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
4404 self.storage.insert(slot, store)
4405 }
4406
4407 pub fn enable_bank_drop_callback(&self) {
4408 self.is_bank_drop_callback_enabled
4409 .store(true, Ordering::Release);
4410 }
4411
4412 pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_serialized_with_abs: bool) {
4418 if self.is_bank_drop_callback_enabled.load(Ordering::Acquire) && !is_serialized_with_abs {
4419 panic!(
4420 "bad drop callpath detected; Bank::drop() must run serially with other logic in \
4421 ABS like clean_accounts()"
4422 )
4423 }
4424
4425 if self
4430 .accounts_index
4431 .removed_bank_ids
4432 .lock()
4433 .unwrap()
4434 .remove(&bank_id)
4435 {
4436 return;
4438 }
4439
4440 self.purge_slots(std::iter::once(&slot));
4441 }
4442
4443 pub fn purge_slots_from_cache_and_store<'a>(
4446 &self,
4447 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4448 purge_stats: &PurgeStats,
4449 ) {
4450 let mut remove_cache_elapsed_across_slots = 0;
4451 let mut num_cached_slots_removed = 0;
4452 let mut total_removed_cached_bytes = 0;
4453 for remove_slot in removed_slots {
4454 let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed");
4457 if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) {
4462 num_cached_slots_removed += 1;
4465 total_removed_cached_bytes += slot_cache.total_bytes();
4466 self.purge_slot_cache(*remove_slot, &slot_cache);
4467 remove_cache_elapsed.stop();
4468 remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us();
4469 assert!(self.accounts_cache.remove_slot(*remove_slot).is_some());
4471 } else {
4472 self.purge_slot_storage(*remove_slot, purge_stats);
4473 }
4474 }
4478
4479 purge_stats
4480 .remove_cache_elapsed
4481 .fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed);
4482 purge_stats
4483 .num_cached_slots_removed
4484 .fetch_add(num_cached_slots_removed, Ordering::Relaxed);
4485 purge_stats
4486 .total_removed_cached_bytes
4487 .fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
4488 }
4489
4490 fn purge_dead_slots_from_storage<'a>(
4493 &'a self,
4494 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4495 purge_stats: &PurgeStats,
4496 ) {
4497 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4504 assert!(self
4505 .accounts_index
4506 .get_rooted_from_list(removed_slots.clone())
4507 .is_empty());
4508 safety_checks_elapsed.stop();
4509 purge_stats
4510 .safety_checks_elapsed
4511 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4512
4513 let mut total_removed_stored_bytes = 0;
4514 let mut all_removed_slot_storages = vec![];
4515
4516 let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed");
4517 for remove_slot in removed_slots {
4518 if let Some(store) = self.storage.remove(remove_slot, false) {
4520 total_removed_stored_bytes += store.accounts.capacity();
4521 all_removed_slot_storages.push(store);
4522 }
4523 }
4524 remove_storage_entries_elapsed.stop();
4525 let num_stored_slots_removed = all_removed_slot_storages.len();
4526
4527 let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
4530 drop(all_removed_slot_storages);
4531 drop_storage_entries_elapsed.stop();
4532
4533 purge_stats
4534 .remove_storage_entries_elapsed
4535 .fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4536 purge_stats
4537 .drop_storage_entries_elapsed
4538 .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4539 purge_stats
4540 .num_stored_slots_removed
4541 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4542 purge_stats
4543 .total_removed_storage_entries
4544 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4545 purge_stats
4546 .total_removed_stored_bytes
4547 .fetch_add(total_removed_stored_bytes, Ordering::Relaxed);
4548 self.stats
4549 .dropped_stores
4550 .fetch_add(num_stored_slots_removed as u64, Ordering::Relaxed);
4551 }
4552
4553 fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: &SlotCache) {
4554 let pubkeys = slot_cache.iter().map(|account| *account.key());
4555 self.purge_slot_cache_pubkeys(purged_slot, pubkeys, true);
4556 }
4557
4558 fn purge_slot_cache_pubkeys(
4559 &self,
4560 purged_slot: Slot,
4561 pubkeys: impl IntoIterator<Item = Pubkey>,
4562 is_dead: bool,
4563 ) {
4564 assert!(self
4566 .storage
4567 .get_slot_storage_entry_shrinking_in_progress_ok(purged_slot)
4568 .is_none());
4569 let mut num_purged_keys = 0;
4570 let (reclaims, _) = self.purge_keys_exact(pubkeys.into_iter().map(|key| {
4571 num_purged_keys += 1;
4572 (key, purged_slot)
4573 }));
4574 assert_eq!(reclaims.len(), num_purged_keys);
4575 if is_dead {
4576 self.remove_dead_slots_metadata(std::iter::once(&purged_slot));
4577 }
4578 }
4579
4580 fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) {
4581 let mut scan_storages_elapsed = Measure::start("scan_storages_elapsed");
4588 let mut stored_keys = HashSet::new();
4589 if let Some(storage) = self
4590 .storage
4591 .get_slot_storage_entry_shrinking_in_progress_ok(remove_slot)
4592 {
4593 storage
4594 .accounts
4595 .scan_pubkeys(|pk| {
4596 stored_keys.insert((*pk, remove_slot));
4597 })
4598 .expect("must scan accounts storage");
4599 }
4600 scan_storages_elapsed.stop();
4601 purge_stats
4602 .scan_storages_elapsed
4603 .fetch_add(scan_storages_elapsed.as_us(), Ordering::Relaxed);
4604
4605 let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed");
4606 let (reclaims, pubkeys_removed_from_accounts_index) = self.purge_keys_exact(stored_keys);
4608 purge_accounts_index_elapsed.stop();
4609 purge_stats
4610 .purge_accounts_index_elapsed
4611 .fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed);
4612
4613 let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed");
4616 let expected_dead_slot = Some(remove_slot);
4619 if !reclaims.is_empty() {
4620 self.handle_reclaims(
4621 reclaims.iter(),
4622 expected_dead_slot,
4623 &pubkeys_removed_from_accounts_index,
4624 HandleReclaims::ProcessDeadSlots(purge_stats),
4625 MarkAccountsObsolete::No,
4626 );
4627 }
4628 handle_reclaims_elapsed.stop();
4629 purge_stats
4630 .handle_reclaims_elapsed
4631 .fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed);
4632 assert!(
4635 self.storage.get_slot_storage_entry(remove_slot).is_none(),
4636 "slot {remove_slot} is not none"
4637 );
4638 }
4639
4640 fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot> + Clone) {
4641 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4643 let non_roots = slots
4644 .filter(|slot| !self.accounts_index.is_alive_root(**slot));
4654 safety_checks_elapsed.stop();
4655 self.external_purge_slots_stats
4656 .safety_checks_elapsed
4657 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4658 self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats);
4659 self.external_purge_slots_stats
4660 .report("external_purge_slots_stats", Some(1000));
4661 }
4662
4663 pub fn remove_unrooted_slots(&self, remove_slots: &[(Slot, BankId)]) {
4664 let rooted_slots = self
4665 .accounts_index
4666 .get_rooted_from_list(remove_slots.iter().map(|(slot, _)| slot));
4667 assert!(
4668 rooted_slots.is_empty(),
4669 "Trying to remove accounts for rooted slots {rooted_slots:?}"
4670 );
4671
4672 let RemoveUnrootedSlotsSynchronization {
4673 slots_under_contention,
4674 signal,
4675 } = &self.remove_unrooted_slots_synchronization;
4676
4677 {
4678 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4681
4682 let mut remaining_contended_flush_slots: Vec<Slot> = remove_slots
4685 .iter()
4686 .filter_map(|(remove_slot, _)| {
4687 let is_being_flushed = !currently_contended_slots.insert(*remove_slot);
4696 is_being_flushed.then_some(remove_slot)
4698 })
4699 .cloned()
4700 .collect();
4701
4702 loop {
4704 if !remaining_contended_flush_slots.is_empty() {
4705 currently_contended_slots = signal.wait(currently_contended_slots).unwrap();
4711 } else {
4712 break;
4715 }
4716
4717 remaining_contended_flush_slots.retain(|flush_slot| {
4720 !currently_contended_slots.insert(*flush_slot)
4722 });
4723 }
4724 }
4725
4726 {
4730 let mut locked_removed_bank_ids = self.accounts_index.removed_bank_ids.lock().unwrap();
4731 for (_slot, remove_bank_id) in remove_slots.iter() {
4732 locked_removed_bank_ids.insert(*remove_bank_id);
4733 }
4734 }
4735
4736 let remove_unrooted_purge_stats = PurgeStats::default();
4737 self.purge_slots_from_cache_and_store(
4738 remove_slots.iter().map(|(slot, _)| slot),
4739 &remove_unrooted_purge_stats,
4740 );
4741 remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", None);
4742
4743 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4744 for (remove_slot, _) in remove_slots {
4745 assert!(currently_contended_slots.remove(remove_slot));
4746 }
4747 }
4748
4749 pub fn lt_hash_account(account: &impl ReadableAccount, pubkey: &Pubkey) -> AccountLtHash {
4751 if account.lamports() == 0 {
4752 return ZERO_LAMPORT_ACCOUNT_LT_HASH;
4753 }
4754
4755 let hasher = Self::hash_account_helper(account, pubkey);
4756 let lt_hash = LtHash::with(&hasher);
4757 AccountLtHash(lt_hash)
4758 }
4759
4760 fn hash_account_helper(account: &impl ReadableAccount, pubkey: &Pubkey) -> blake3::Hasher {
4762 let mut hasher = blake3::Hasher::new();
4763
4764 const META_SIZE: usize = 8 + 1 + 32 + 32 ;
4767 const DATA_SIZE: usize = 200; const BUFFER_SIZE: usize = META_SIZE + DATA_SIZE;
4769 let mut buffer = SmallVec::<[u8; BUFFER_SIZE]>::new();
4770
4771 buffer.extend_from_slice(&account.lamports().to_le_bytes());
4773
4774 let data = account.data();
4775 if data.len() > DATA_SIZE {
4776 hasher.update(&buffer);
4778 buffer.clear();
4779
4780 hasher.update(data);
4782 } else {
4783 buffer.extend_from_slice(data);
4785 }
4786
4787 buffer.push(account.executable().into());
4789 buffer.extend_from_slice(account.owner().as_ref());
4790 buffer.extend_from_slice(pubkey.as_ref());
4791 hasher.update(&buffer);
4792
4793 hasher
4794 }
4795
4796 pub fn mark_slot_frozen(&self, slot: Slot) {
4797 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
4798 slot_cache.mark_slot_frozen();
4799 slot_cache.report_slot_store_metrics();
4800 }
4801 self.accounts_cache.report_size();
4802 }
4803
4804 fn should_aggressively_flush_cache(&self) -> bool {
4806 self.write_cache_limit_bytes
4807 .unwrap_or(WRITE_CACHE_LIMIT_BYTES_DEFAULT)
4808 < self.accounts_cache.size()
4809 }
4810
4811 pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option<Slot>) {
4815 #[cfg(not(test))]
4816 assert!(requested_flush_root.is_some());
4817
4818 if !force_flush && !self.should_aggressively_flush_cache() {
4819 return;
4820 }
4821
4822 let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
4825
4826 let _guard = self.active_stats.activate(ActiveStatItem::Flush);
4827
4828 let (total_new_cleaned_roots, num_cleaned_roots_flushed, mut flush_stats) = self
4832 .flush_rooted_accounts_cache(
4833 requested_flush_root,
4834 true, );
4836 flush_roots_elapsed.stop();
4837
4838 let (total_new_excess_roots, num_excess_roots_flushed, flush_stats_aggressively) =
4844 if self.should_aggressively_flush_cache() {
4845 self.flush_rooted_accounts_cache(None, false)
4851 } else {
4852 (0, 0, FlushStats::default())
4853 };
4854 flush_stats.accumulate(&flush_stats_aggressively);
4855
4856 let mut excess_slot_count = 0;
4857 let mut unflushable_unrooted_slot_count = 0;
4858 let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
4859 if self.should_aggressively_flush_cache() {
4860 let mut old_slots = self.accounts_cache.cached_frozen_slots();
4861 old_slots.sort_unstable();
4862 excess_slot_count = old_slots.len();
4863 let mut flush_stats = FlushStats::default();
4864 old_slots.into_iter().for_each(|old_slot| {
4865 if old_slot > max_flushed_root {
4867 if self.should_aggressively_flush_cache() {
4868 if let Some(stats) = self.flush_slot_cache(old_slot) {
4869 flush_stats.accumulate(&stats);
4870 }
4871 }
4872 } else {
4873 unflushable_unrooted_slot_count += 1;
4874 }
4875 });
4876 datapoint_info!(
4877 "accounts_db-flush_accounts_cache_aggressively",
4878 (
4879 "num_accounts_flushed",
4880 flush_stats.num_accounts_flushed.0,
4881 i64
4882 ),
4883 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
4884 (
4885 "account_bytes_flushed",
4886 flush_stats.num_bytes_flushed.0,
4887 i64
4888 ),
4889 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
4890 ("total_cache_size", self.accounts_cache.size(), i64),
4891 ("total_frozen_slots", excess_slot_count, i64),
4892 ("total_slots", self.accounts_cache.num_slots(), i64),
4893 );
4894 }
4895
4896 datapoint_info!(
4897 "accounts_db-flush_accounts_cache",
4898 ("total_new_cleaned_roots", total_new_cleaned_roots, i64),
4899 ("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64),
4900 ("total_new_excess_roots", total_new_excess_roots, i64),
4901 ("num_excess_roots_flushed", num_excess_roots_flushed, i64),
4902 ("excess_slot_count", excess_slot_count, i64),
4903 (
4904 "unflushable_unrooted_slot_count",
4905 unflushable_unrooted_slot_count,
4906 i64
4907 ),
4908 ("flush_roots_elapsed", flush_roots_elapsed.as_us(), i64),
4909 (
4910 "account_bytes_flushed",
4911 flush_stats.num_bytes_flushed.0,
4912 i64
4913 ),
4914 (
4915 "num_accounts_flushed",
4916 flush_stats.num_accounts_flushed.0,
4917 i64
4918 ),
4919 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
4920 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
4921 (
4922 "store_accounts_total_us",
4923 flush_stats.store_accounts_total_us.0,
4924 i64
4925 ),
4926 (
4927 "update_index_us",
4928 flush_stats.store_accounts_timing.update_index_elapsed,
4929 i64
4930 ),
4931 (
4932 "store_accounts_elapsed_us",
4933 flush_stats.store_accounts_timing.store_accounts_elapsed,
4934 i64
4935 ),
4936 (
4937 "handle_reclaims_elapsed_us",
4938 flush_stats.store_accounts_timing.handle_reclaims_elapsed,
4939 i64
4940 ),
4941 );
4942 }
4943
4944 fn flush_rooted_accounts_cache(
4945 &self,
4946 requested_flush_root: Option<Slot>,
4947 should_clean: bool,
4948 ) -> (usize, usize, FlushStats) {
4949 let max_clean_root = should_clean
4950 .then(|| {
4951 self.max_clean_root(requested_flush_root)
4954 })
4955 .flatten();
4956
4957 let mut written_accounts = HashSet::new();
4958
4959 let mut should_flush_f = should_clean
4962 .then(|| {
4963 Some(move |&pubkey: &Pubkey| {
4964 written_accounts.insert(pubkey)
4966 })
4967 })
4968 .flatten();
4969
4970 let flushed_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(requested_flush_root);
4972
4973 let mut num_roots_flushed = 0;
4976 let mut flush_stats = FlushStats::default();
4977 for &root in flushed_roots.iter().rev() {
4978 if let Some(stats) =
4979 self.flush_slot_cache_with_clean(root, should_flush_f.as_mut(), max_clean_root)
4980 {
4981 num_roots_flushed += 1;
4982 flush_stats.accumulate(&stats);
4983 }
4984 }
4985
4986 if let Some(&root) = flushed_roots.last() {
4995 self.accounts_cache.set_max_flush_root(root);
4996 }
4997 let num_new_roots = flushed_roots.len();
4998 (num_new_roots, num_roots_flushed, flush_stats)
4999 }
5000
5001 fn do_flush_slot_cache(
5002 &self,
5003 slot: Slot,
5004 slot_cache: &SlotCache,
5005 mut should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5006 max_clean_root: Option<Slot>,
5007 ) -> FlushStats {
5008 let mut flush_stats = FlushStats::default();
5009 let iter_items: Vec<_> = slot_cache.iter().collect();
5010 let mut pubkeys: Vec<Pubkey> = vec![];
5011 if should_flush_f.is_some() {
5012 if let Some(max_clean_root) = max_clean_root {
5013 if slot > max_clean_root {
5014 should_flush_f = None;
5018 }
5019 }
5020 }
5021
5022 let accounts: Vec<(&Pubkey, &AccountSharedData)> = iter_items
5023 .iter()
5024 .filter_map(|iter_item| {
5025 let key = iter_item.key();
5026 let account = &iter_item.value().account;
5027 let should_flush = should_flush_f
5028 .as_mut()
5029 .map(|should_flush_f| should_flush_f(key))
5030 .unwrap_or(true);
5031 if should_flush {
5032 flush_stats.num_bytes_flushed +=
5033 aligned_stored_size(account.data().len()) as u64;
5034 flush_stats.num_accounts_flushed += 1;
5035 Some((key, account))
5036 } else {
5037 pubkeys.push(*key);
5040 flush_stats.num_bytes_purged +=
5041 aligned_stored_size(account.data().len()) as u64;
5042 flush_stats.num_accounts_purged += 1;
5043 None
5044 }
5045 })
5046 .collect();
5047
5048 let is_dead_slot = accounts.is_empty();
5049 self.purge_slot_cache_pubkeys(slot, pubkeys, is_dead_slot);
5052
5053 if !is_dead_slot {
5054 let flushed_store = self.create_and_insert_store(
5058 slot,
5059 flush_stats.num_bytes_flushed.0,
5060 "flush_slot_cache",
5061 );
5062
5063 let reclaim_method = if self.mark_obsolete_accounts == MarkObsoleteAccounts::Enabled
5069 && should_flush_f.is_some()
5070 {
5071 UpsertReclaim::ReclaimOldSlots
5072 } else {
5073 UpsertReclaim::IgnoreReclaims
5074 };
5075
5076 let (store_accounts_timing_inner, store_accounts_total_inner_us) = measure_us!(self
5077 ._store_accounts_frozen(
5078 (slot, &accounts[..]),
5079 &flushed_store,
5080 reclaim_method,
5081 UpdateIndexThreadSelection::PoolWithThreshold,
5082 ));
5083 flush_stats.store_accounts_timing = store_accounts_timing_inner;
5084 flush_stats.store_accounts_total_us = Saturating(store_accounts_total_inner_us);
5085
5086 assert!(self.storage.get_slot_storage_entry(slot).is_some());
5089 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
5090 }
5091
5092 assert!(self.accounts_cache.remove_slot(slot).is_some());
5097
5098 self.uncleaned_pubkeys
5101 .entry(slot)
5102 .or_default()
5103 .extend(accounts.into_iter().map(|(pubkey, _account)| *pubkey));
5104
5105 flush_stats
5106 }
5107
5108 fn flush_slot_cache(&self, slot: Slot) -> Option<FlushStats> {
5110 self.flush_slot_cache_with_clean(slot, None::<&mut fn(&_) -> bool>, None)
5111 }
5112
5113 fn flush_slot_cache_with_clean(
5117 &self,
5118 slot: Slot,
5119 should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5120 max_clean_root: Option<Slot>,
5121 ) -> Option<FlushStats> {
5122 if self
5123 .remove_unrooted_slots_synchronization
5124 .slots_under_contention
5125 .lock()
5126 .unwrap()
5127 .insert(slot)
5128 {
5129 let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| {
5131 #[cfg(test)]
5132 {
5133 sleep(Duration::from_millis(self.load_delay));
5135 }
5136 self.do_flush_slot_cache(slot, &slot_cache, should_flush_f, max_clean_root)
5141 });
5142
5143 assert!(self
5146 .remove_unrooted_slots_synchronization
5147 .slots_under_contention
5148 .lock()
5149 .unwrap()
5150 .remove(&slot));
5151
5152 self.remove_unrooted_slots_synchronization
5155 .signal
5156 .notify_all();
5157 flush_stats
5158 } else {
5159 None
5161 }
5162 }
5163
5164 fn report_store_stats(&self) {
5165 let mut total_count = 0;
5166 let mut newest_slot = 0;
5167 let mut oldest_slot = u64::MAX;
5168 let mut total_bytes = 0;
5169 let mut total_alive_bytes = 0;
5170 for (slot, store) in self.storage.iter() {
5171 total_count += 1;
5172 newest_slot = std::cmp::max(newest_slot, slot);
5173
5174 oldest_slot = std::cmp::min(oldest_slot, slot);
5175
5176 total_alive_bytes += store.alive_bytes();
5177 total_bytes += store.capacity();
5178 }
5179 info!(
5180 "total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}"
5181 );
5182
5183 let total_alive_ratio = if total_bytes > 0 {
5184 total_alive_bytes as f64 / total_bytes as f64
5185 } else {
5186 0.
5187 };
5188
5189 datapoint_info!(
5190 "accounts_db-stores",
5191 ("total_count", total_count, i64),
5192 ("total_bytes", total_bytes, i64),
5193 ("total_alive_bytes", total_alive_bytes, i64),
5194 ("total_alive_ratio", total_alive_ratio, f64),
5195 );
5196 }
5197
5198 pub fn calculate_accounts_lt_hash_at_startup_from_index(
5203 &self,
5204 ancestors: &Ancestors,
5205 startup_slot: Slot,
5206 ) -> AccountsLtHash {
5207 let lt_hash = self
5215 .accounts_index
5216 .account_maps
5217 .par_iter()
5218 .fold(
5219 LtHash::identity,
5220 |mut accumulator_lt_hash, accounts_index_bin| {
5221 for pubkey in accounts_index_bin.keys() {
5222 let account_lt_hash = self
5223 .accounts_index
5224 .get_with_and_then(
5225 &pubkey,
5226 Some(ancestors),
5227 Some(startup_slot),
5228 false,
5229 |(slot, account_info)| {
5230 (!account_info.is_zero_lamport()).then(|| {
5231 self.get_account_accessor(
5232 slot,
5233 &pubkey,
5234 &account_info.storage_location(),
5235 )
5236 .get_loaded_account(|loaded_account| {
5237 Self::lt_hash_account(&loaded_account, &pubkey)
5238 })
5239 .unwrap()
5242 })
5243 },
5244 )
5245 .flatten();
5246 if let Some(account_lt_hash) = account_lt_hash {
5247 accumulator_lt_hash.mix_in(&account_lt_hash.0);
5248 }
5249 }
5250 accumulator_lt_hash
5251 },
5252 )
5253 .reduce(LtHash::identity, |mut accum, elem| {
5254 accum.mix_in(&elem);
5255 accum
5256 });
5257
5258 AccountsLtHash(lt_hash)
5259 }
5260
5261 pub fn calculate_capitalization_at_startup_from_index(
5270 &self,
5271 ancestors: &Ancestors,
5272 startup_slot: Slot,
5273 ) -> u64 {
5274 self.accounts_index
5275 .account_maps
5276 .par_iter()
5277 .map(|accounts_index_bin| {
5278 accounts_index_bin
5279 .keys()
5280 .into_iter()
5281 .map(|pubkey| {
5282 self.accounts_index
5283 .get_with_and_then(
5284 &pubkey,
5285 Some(ancestors),
5286 Some(startup_slot),
5287 false,
5288 |(slot, account_info)| {
5289 (!account_info.is_zero_lamport()).then(|| {
5290 self.get_account_accessor(
5291 slot,
5292 &pubkey,
5293 &account_info.storage_location(),
5294 )
5295 .get_loaded_account(|loaded_account| {
5296 loaded_account.lamports()
5297 })
5298 .unwrap()
5301 })
5302 },
5303 )
5304 .flatten()
5305 .unwrap_or(0)
5306 })
5307 .try_fold(0, u64::checked_add)
5308 })
5309 .try_reduce(|| 0, u64::checked_add)
5310 .expect("capitalization cannot overflow")
5311 }
5312
5313 fn apply_offset_to_slot(slot: Slot, offset: i64) -> Slot {
5315 if offset > 0 {
5316 slot.saturating_add(offset as u64)
5317 } else {
5318 slot.saturating_sub(offset.unsigned_abs())
5319 }
5320 }
5321
5322 pub fn get_pubkeys_for_slot(&self, slot: Slot) -> Vec<Pubkey> {
5324 let scan_result = self.scan_cache_storage_fallback(
5325 slot,
5326 |loaded_account| Some(*loaded_account.pubkey()),
5327 |accum: &mut HashSet<Pubkey>, storage| {
5328 storage
5329 .scan_pubkeys(|pubkey| {
5330 accum.insert(*pubkey);
5331 })
5332 .expect("must scan accounts storage");
5333 },
5334 );
5335 match scan_result {
5336 ScanStorageResult::Cached(cached_result) => cached_result,
5337 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5338 }
5339 }
5340
5341 pub fn get_pubkey_account_for_slot(&self, slot: Slot) -> Vec<(Pubkey, AccountSharedData)> {
5343 let scan_result = self.scan_account_storage(
5344 slot,
5345 |loaded_account| {
5346 Some((*loaded_account.pubkey(), loaded_account.take_account()))
5348 },
5349 |accum: &mut HashMap<_, _>, stored_account, data| {
5350 let data = data.unwrap();
5353 let loaded_account =
5354 LoadedAccount::Stored(StoredAccountInfo::new_from(stored_account, data));
5355 accum.insert(*loaded_account.pubkey(), loaded_account.take_account());
5357 },
5358 ScanAccountStorageData::DataRefForStorage,
5359 );
5360
5361 match scan_result {
5362 ScanStorageResult::Cached(cached_result) => cached_result,
5363 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5364 }
5365 }
5366
5367 fn update_index<'a>(
5371 &self,
5372 infos: Vec<AccountInfo>,
5373 accounts: &impl StorableAccounts<'a>,
5374 reclaim: UpsertReclaim,
5375 update_index_thread_selection: UpdateIndexThreadSelection,
5376 thread_pool: &ThreadPool,
5377 ) -> Vec<ReclaimsSlotList<AccountInfo>> {
5378 let target_slot = accounts.target_slot();
5379 let len = std::cmp::min(accounts.len(), infos.len());
5380
5381 if reclaim == UpsertReclaim::ReclaimOldSlots {
5386 assert!(target_slot <= self.accounts_index.max_root_inclusive());
5387 }
5388
5389 let update = |start, end| {
5390 let mut reclaims = ReclaimsSlotList::with_capacity((end - start) / 2);
5391
5392 (start..end).for_each(|i| {
5393 let info = infos[i];
5394 accounts.account(i, |account| {
5395 let old_slot = accounts.slot(i);
5396 self.accounts_index.upsert(
5397 target_slot,
5398 old_slot,
5399 account.pubkey(),
5400 &account,
5401 &self.account_indexes,
5402 info,
5403 &mut reclaims,
5404 reclaim,
5405 );
5406 });
5407 });
5408 reclaims
5409 };
5410
5411 let threshold = 1;
5412 if matches!(
5413 update_index_thread_selection,
5414 UpdateIndexThreadSelection::PoolWithThreshold,
5415 ) && len > threshold
5416 {
5417 let chunk_size = std::cmp::max(1, len / quarter_thread_count()); let batches = 1 + len / chunk_size;
5419 thread_pool.install(|| {
5420 (0..batches)
5421 .into_par_iter()
5422 .map(|batch| {
5423 let start = batch * chunk_size;
5424 let end = std::cmp::min(start + chunk_size, len);
5425 update(start, end)
5426 })
5427 .filter(|reclaims| !reclaims.is_empty())
5428 .collect()
5429 })
5430 } else {
5431 let reclaims = update(0, len);
5432 if reclaims.is_empty() {
5433 vec![]
5435 } else {
5436 vec![reclaims]
5437 }
5438 }
5439 }
5440
5441 fn should_not_shrink(alive_bytes: u64, total_bytes: u64) -> bool {
5442 alive_bytes >= total_bytes
5443 }
5444
5445 fn is_shrinking_productive(store: &AccountStorageEntry) -> bool {
5446 let alive_count = store.count();
5447 let total_bytes = store.capacity();
5448 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5449 if Self::should_not_shrink(alive_bytes, total_bytes) {
5450 trace!(
5451 "shrink_slot_forced ({}): not able to shrink at all: num alive: {}, bytes alive: \
5452 {}, bytes total: {}, bytes saved: {}",
5453 store.slot(),
5454 alive_count,
5455 alive_bytes,
5456 total_bytes,
5457 total_bytes.saturating_sub(alive_bytes),
5458 );
5459 return false;
5460 }
5461
5462 true
5463 }
5464
5465 pub(crate) fn is_candidate_for_shrink(&self, store: &AccountStorageEntry) -> bool {
5468 let total_bytes = store.capacity();
5471
5472 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5473 match self.shrink_ratio {
5474 AccountShrinkThreshold::TotalSpace { shrink_ratio: _ } => alive_bytes < total_bytes,
5475 AccountShrinkThreshold::IndividualStore { shrink_ratio } => {
5476 (alive_bytes as f64 / total_bytes as f64) < shrink_ratio
5477 }
5478 }
5479 }
5480
5481 fn remove_dead_accounts<'a, I>(
5483 &'a self,
5484 reclaims: I,
5485 expected_slot: Option<Slot>,
5486 mark_accounts_obsolete: MarkAccountsObsolete,
5487 ) -> (IntSet<Slot>, SlotOffsets)
5488 where
5489 I: Iterator<Item = &'a (Slot, AccountInfo)>,
5490 {
5491 let mut reclaimed_offsets = SlotOffsets::default();
5492
5493 assert!(self.storage.no_shrink_in_progress());
5494
5495 let mut dead_slots = IntSet::default();
5496 let mut new_shrink_candidates = ShrinkCandidates::default();
5497 let mut measure = Measure::start("remove");
5498 for (slot, account_info) in reclaims {
5499 assert!(!account_info.is_cached());
5501 reclaimed_offsets
5502 .entry(*slot)
5503 .or_default()
5504 .insert(account_info.offset());
5505 }
5506 if let Some(expected_slot) = expected_slot {
5507 assert_eq!(reclaimed_offsets.len(), 1);
5508 assert!(reclaimed_offsets.contains_key(&expected_slot));
5509 }
5510
5511 self.clean_accounts_stats
5512 .slots_cleaned
5513 .fetch_add(reclaimed_offsets.len() as u64, Ordering::Relaxed);
5514
5515 reclaimed_offsets.iter().for_each(|(slot, offsets)| {
5516 if let Some(store) = self.storage.get_slot_storage_entry(*slot) {
5517 assert_eq!(
5518 *slot,
5519 store.slot(),
5520 "AccountsDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, \
5521 should only point to one slot",
5522 store.slot(),
5523 *slot
5524 );
5525
5526 let remaining_accounts = if offsets.len() == store.count() {
5527 store.remove_accounts(store.alive_bytes(), offsets.len())
5529 } else {
5530 let (remaining_accounts, us) = measure_us!({
5532 let mut offsets = offsets.iter().cloned().collect::<Vec<_>>();
5533 offsets.sort_unstable();
5535 let data_lens = store.accounts.get_account_data_lens(&offsets);
5536 let dead_bytes = data_lens
5537 .iter()
5538 .map(|len| store.accounts.calculate_stored_size(*len))
5539 .sum();
5540 let remaining_accounts = store.remove_accounts(dead_bytes, offsets.len());
5541
5542 if let MarkAccountsObsolete::Yes(slot_marked_obsolete) =
5543 mark_accounts_obsolete
5544 {
5545 store
5546 .obsolete_accounts
5547 .write()
5548 .unwrap()
5549 .mark_accounts_obsolete(
5550 offsets.into_iter().zip(data_lens),
5551 slot_marked_obsolete,
5552 );
5553 }
5554 remaining_accounts
5555 });
5556 self.clean_accounts_stats
5557 .get_account_sizes_us
5558 .fetch_add(us, Ordering::Relaxed);
5559 remaining_accounts
5560 };
5561
5562 if remaining_accounts == 0 {
5566 self.dirty_stores.insert(*slot, store);
5567 dead_slots.insert(*slot);
5568 } else if Self::is_shrinking_productive(&store)
5569 && self.is_candidate_for_shrink(&store)
5570 {
5571 new_shrink_candidates.insert(*slot);
5576 };
5577 }
5578 });
5579 measure.stop();
5580 self.clean_accounts_stats
5581 .remove_dead_accounts_remove_us
5582 .fetch_add(measure.as_us(), Ordering::Relaxed);
5583
5584 let mut measure = Measure::start("shrink");
5585 let mut shrink_candidate_slots = self.shrink_candidate_slots.lock().unwrap();
5586 for slot in new_shrink_candidates {
5587 shrink_candidate_slots.insert(slot);
5588 }
5589 drop(shrink_candidate_slots);
5590 measure.stop();
5591 self.clean_accounts_stats
5592 .remove_dead_accounts_shrink_us
5593 .fetch_add(measure.as_us(), Ordering::Relaxed);
5594
5595 dead_slots.retain(|slot| {
5596 if let Some(slot_store) = self.storage.get_slot_storage_entry(*slot) {
5597 if slot_store.count() != 0 {
5598 return false;
5599 }
5600 }
5601 true
5602 });
5603
5604 (dead_slots, reclaimed_offsets)
5605 }
5606
5607 fn remove_dead_slots_metadata<'a>(&'a self, dead_slots_iter: impl Iterator<Item = &'a Slot>) {
5608 let mut measure = Measure::start("remove_dead_slots_metadata-ms");
5609 self.clean_dead_slots_from_accounts_index(dead_slots_iter);
5610 measure.stop();
5611 inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
5612 }
5613
5614 fn unref_pubkeys<'a>(
5617 &'a self,
5618 pubkeys: impl Iterator<Item = &'a Pubkey> + Clone + Send + Sync,
5619 num_pubkeys: usize,
5620 pubkeys_removed_from_accounts_index: &'a PubkeysRemovedFromAccountsIndex,
5621 ) {
5622 let batches = 1 + (num_pubkeys / UNREF_ACCOUNTS_BATCH_SIZE);
5623 self.thread_pool_background.install(|| {
5624 (0..batches).into_par_iter().for_each(|batch| {
5625 let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE;
5626 self.accounts_index.scan(
5627 pubkeys
5628 .clone()
5629 .skip(skip)
5630 .take(UNREF_ACCOUNTS_BATCH_SIZE)
5631 .filter(|pubkey| {
5632 let already_removed =
5634 pubkeys_removed_from_accounts_index.contains(pubkey);
5635 !already_removed
5636 }),
5637 |_pubkey, slots_refs| {
5638 if let Some((slot_list, ref_count)) = slots_refs {
5639 if slot_list.len() == 1 && ref_count == 2 {
5641 if let Some((slot_alive, acct_info)) = slot_list.first() {
5642 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
5643 self.zero_lamport_single_ref_found(
5644 *slot_alive,
5645 acct_info.offset(),
5646 );
5647 }
5648 }
5649 }
5650 }
5651 AccountsIndexScanResult::Unref
5652 },
5653 None,
5654 ScanFilter::All,
5655 )
5656 });
5657 });
5658 }
5659
5660 fn unref_accounts(
5665 &self,
5666 purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
5667 purged_stored_account_slots: &mut AccountSlots,
5668 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
5669 ) {
5670 self.unref_pubkeys(
5671 purged_slot_pubkeys.iter().map(|(_slot, pubkey)| pubkey),
5672 purged_slot_pubkeys.len(),
5673 pubkeys_removed_from_accounts_index,
5674 );
5675 for (slot, pubkey) in purged_slot_pubkeys {
5676 purged_stored_account_slots
5677 .entry(pubkey)
5678 .or_default()
5679 .insert(slot);
5680 }
5681 }
5682
5683 fn clean_dead_slots_from_accounts_index<'a>(
5684 &'a self,
5685 dead_slots_iter: impl Iterator<Item = &'a Slot>,
5686 ) {
5687 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
5688 let mut measure = Measure::start("clean_dead_slot");
5689 let mut rooted_cleaned_count = 0;
5690 let mut unrooted_cleaned_count = 0;
5691 let dead_slots: Vec<_> = dead_slots_iter
5692 .map(|slot| {
5693 if self.accounts_index.clean_dead_slot(*slot) {
5694 rooted_cleaned_count += 1;
5695 } else {
5696 unrooted_cleaned_count += 1;
5697 }
5698 *slot
5699 })
5700 .collect();
5701 measure.stop();
5702 accounts_index_root_stats.clean_dead_slot_us += measure.as_us();
5703 if self.log_dead_slots.load(Ordering::Relaxed) {
5704 info!(
5705 "remove_dead_slots_metadata: {} dead slots",
5706 dead_slots.len()
5707 );
5708 trace!("remove_dead_slots_metadata: dead_slots: {dead_slots:?}");
5709 }
5710 self.accounts_index
5711 .update_roots_stats(&mut accounts_index_root_stats);
5712 accounts_index_root_stats.rooted_cleaned_count += rooted_cleaned_count;
5713 accounts_index_root_stats.unrooted_cleaned_count += unrooted_cleaned_count;
5714
5715 self.clean_accounts_stats
5716 .latest_accounts_index_roots_stats
5717 .update(&accounts_index_root_stats);
5718 }
5719
5720 fn clean_stored_dead_slots(
5723 &self,
5724 dead_slots: &IntSet<Slot>,
5725 purged_account_slots: Option<&mut AccountSlots>,
5726 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
5727 ) {
5728 let mut measure = Measure::start("clean_stored_dead_slots-ms");
5729 let mut stores = vec![];
5730 for slot in dead_slots.iter() {
5732 if let Some(slot_storage) = self.storage.get_slot_storage_entry(*slot) {
5733 stores.push(slot_storage);
5734 }
5735 }
5736 let purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = {
5738 self.thread_pool_background.install(|| {
5739 stores
5740 .into_par_iter()
5741 .map(|store| {
5742 let slot = store.slot();
5743 let mut pubkeys = Vec::with_capacity(store.count());
5744 let obsolete_accounts: HashSet<_> = store
5747 .obsolete_accounts_read_lock()
5748 .filter_obsolete_accounts(None)
5749 .collect();
5750 store
5751 .accounts
5752 .scan_accounts_without_data(|offset, account| {
5753 if !obsolete_accounts.contains(&(offset, account.data_len)) {
5754 pubkeys.push((slot, *account.pubkey));
5755 }
5756 })
5757 .expect("must scan accounts storage");
5758 pubkeys
5759 })
5760 .flatten()
5761 .collect::<HashSet<_>>()
5762 })
5763 };
5764
5765 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
5767 let mut measure_unref = Measure::start("unref_from_storage");
5768
5769 if let Some(purged_account_slots) = purged_account_slots {
5770 self.unref_accounts(
5771 purged_slot_pubkeys,
5772 purged_account_slots,
5773 pubkeys_removed_from_accounts_index,
5774 );
5775 }
5776 measure_unref.stop();
5777 accounts_index_root_stats.clean_unref_from_storage_us += measure_unref.as_us();
5778
5779 self.clean_accounts_stats
5780 .latest_accounts_index_roots_stats
5781 .update(&accounts_index_root_stats);
5782
5783 measure.stop();
5784 self.clean_accounts_stats
5785 .clean_stored_dead_slots_us
5786 .fetch_add(measure.as_us(), Ordering::Relaxed);
5787 }
5788
5789 pub(crate) fn store_accounts_unfrozen<'a>(
5792 &self,
5793 accounts: impl StorableAccounts<'a>,
5794 transactions: Option<&'a [&'a SanitizedTransaction]>,
5795 update_index_thread_selection: UpdateIndexThreadSelection,
5796 ) {
5797 if accounts.is_empty() {
5800 return;
5801 }
5802
5803 let mut total_data = 0;
5804 (0..accounts.len()).for_each(|index| {
5805 total_data += accounts.data_len(index);
5806 });
5807
5808 self.stats
5809 .store_total_data
5810 .fetch_add(total_data as u64, Ordering::Relaxed);
5811
5812 let mut store_accounts_time = Measure::start("store_accounts");
5814 let infos = self.write_accounts_to_cache(accounts.target_slot(), &accounts, transactions);
5815 store_accounts_time.stop();
5816 self.stats
5817 .store_accounts
5818 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
5819
5820 let mut update_index_time = Measure::start("update_index");
5822
5823 self.update_index(
5824 infos,
5825 &accounts,
5826 UpsertReclaim::PreviousSlotEntryWasCached,
5827 update_index_thread_selection,
5828 &self.thread_pool_foreground,
5829 );
5830
5831 update_index_time.stop();
5832 self.stats
5833 .store_update_index
5834 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
5835 self.stats
5836 .store_num_accounts
5837 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
5838 self.report_store_timings();
5839 }
5840
5841 pub fn store_accounts_frozen<'a>(
5847 &self,
5848 accounts: impl StorableAccounts<'a>,
5849 storage: &Arc<AccountStorageEntry>,
5850 update_index_thread_selection: UpdateIndexThreadSelection,
5851 ) -> StoreAccountsTiming {
5852 self._store_accounts_frozen(
5853 accounts,
5854 storage,
5855 UpsertReclaim::IgnoreReclaims,
5856 update_index_thread_selection,
5857 )
5858 }
5859
5860 fn _store_accounts_frozen<'a>(
5864 &self,
5865 accounts: impl StorableAccounts<'a>,
5866 storage: &Arc<AccountStorageEntry>,
5867 reclaim_handling: UpsertReclaim,
5868 update_index_thread_selection: UpdateIndexThreadSelection,
5869 ) -> StoreAccountsTiming {
5870 let slot = accounts.target_slot();
5871 let mut store_accounts_time = Measure::start("store_accounts");
5872
5873 if self.read_only_accounts_cache.can_slot_be_in_cache(slot) {
5875 (0..accounts.len()).for_each(|index| {
5876 self.read_only_accounts_cache
5879 .remove_assume_not_present(accounts.pubkey(index));
5880 });
5881 }
5882
5883 let infos = self.write_accounts_to_storage(slot, storage, &accounts);
5885 store_accounts_time.stop();
5886 self.stats
5887 .store_accounts
5888 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
5889
5890 self.mark_zero_lamport_single_ref_accounts(&infos, storage, reclaim_handling);
5891
5892 let mut update_index_time = Measure::start("update_index");
5893
5894 let reclaims = self.update_index(
5899 infos,
5900 &accounts,
5901 reclaim_handling,
5902 update_index_thread_selection,
5903 &self.thread_pool_background,
5904 );
5905
5906 update_index_time.stop();
5907 self.stats
5908 .store_update_index
5909 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
5910 self.stats
5911 .store_num_accounts
5912 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
5913
5914 let mut handle_reclaims_elapsed = 0;
5917
5918 if !reclaims.is_empty() {
5922 let reclaims_len = reclaims.iter().map(|r| r.len()).sum::<usize>();
5923 self.stats
5924 .num_reclaims
5925 .fetch_add(reclaims_len as u64, Ordering::Relaxed);
5926 let purge_stats = PurgeStats::default();
5927 let mut handle_reclaims_time = Measure::start("handle_reclaims");
5928 self.handle_reclaims(
5929 reclaims.iter().flatten(),
5930 None,
5931 &HashSet::default(),
5932 HandleReclaims::ProcessDeadSlots(&purge_stats),
5933 MarkAccountsObsolete::Yes(slot),
5934 );
5935 handle_reclaims_time.stop();
5936 handle_reclaims_elapsed = handle_reclaims_time.as_us();
5937 self.stats.num_obsolete_slots_removed.fetch_add(
5938 purge_stats.num_stored_slots_removed.load(Ordering::Relaxed),
5939 Ordering::Relaxed,
5940 );
5941 self.stats.num_obsolete_bytes_removed.fetch_add(
5942 purge_stats
5943 .total_removed_stored_bytes
5944 .load(Ordering::Relaxed),
5945 Ordering::Relaxed,
5946 );
5947 self.stats
5948 .store_handle_reclaims
5949 .fetch_add(handle_reclaims_elapsed, Ordering::Relaxed);
5950 }
5951
5952 StoreAccountsTiming {
5953 store_accounts_elapsed: store_accounts_time.as_us(),
5954 update_index_elapsed: update_index_time.as_us(),
5955 handle_reclaims_elapsed,
5956 }
5957 }
5958
5959 fn write_accounts_to_cache<'a, 'b>(
5960 &self,
5961 slot: Slot,
5962 accounts_and_meta_to_store: &impl StorableAccounts<'b>,
5963 txs: Option<&[&SanitizedTransaction]>,
5964 ) -> Vec<AccountInfo> {
5965 let mut current_write_version = if self.accounts_update_notifier.is_some() {
5966 self.write_version
5967 .fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel)
5968 } else {
5969 0
5970 };
5971
5972 (0..accounts_and_meta_to_store.len())
5973 .map(|index| {
5974 let txn = txs.map(|txs| *txs.get(index).expect("txs must be present if provided"));
5975 accounts_and_meta_to_store.account_default_if_zero_lamport(index, |account| {
5976 let account_shared_data = account.take_account();
5977 let pubkey = account.pubkey();
5978 let account_info =
5979 AccountInfo::new(StorageLocation::Cached, account.is_zero_lamport());
5980
5981 self.notify_account_at_accounts_update(
5982 slot,
5983 &account_shared_data,
5984 &txn,
5985 pubkey,
5986 current_write_version,
5987 );
5988 current_write_version = current_write_version.saturating_add(1);
5989
5990 self.accounts_cache.store(slot, pubkey, account_shared_data);
5991 account_info
5992 })
5993 })
5994 .collect()
5995 }
5996
5997 fn write_accounts_to_storage<'a>(
5998 &self,
5999 slot: Slot,
6000 storage: &AccountStorageEntry,
6001 accounts_and_meta_to_store: &impl StorableAccounts<'a>,
6002 ) -> Vec<AccountInfo> {
6003 let mut infos: Vec<AccountInfo> = Vec::with_capacity(accounts_and_meta_to_store.len());
6004 let mut total_append_accounts_us = 0;
6005 while infos.len() < accounts_and_meta_to_store.len() {
6006 let mut append_accounts = Measure::start("append_accounts");
6007 let stored_accounts_info = storage
6008 .accounts
6009 .write_accounts(accounts_and_meta_to_store, infos.len());
6010 append_accounts.stop();
6011 total_append_accounts_us += append_accounts.as_us();
6012 let Some(stored_accounts_info) = stored_accounts_info else {
6013 let data_len = accounts_and_meta_to_store.data_len(infos.len());
6015 let data_len = (data_len + STORE_META_OVERHEAD) as u64;
6016 if data_len > storage.accounts.remaining_bytes() {
6017 info!(
6018 "write_accounts_to_storage, no space: {}, {}, {}, {}, {}",
6019 storage.accounts.capacity(),
6020 storage.accounts.remaining_bytes(),
6021 data_len,
6022 infos.len(),
6023 accounts_and_meta_to_store.len()
6024 );
6025 let special_store_size = std::cmp::max(data_len * 2, self.file_size);
6026 self.create_and_insert_store(slot, special_store_size, "large create");
6027 }
6028 continue;
6029 };
6030
6031 let store_id = storage.id();
6032 for (i, offset) in stored_accounts_info.offsets.iter().enumerate() {
6033 infos.push(AccountInfo::new(
6034 StorageLocation::AppendVec(store_id, *offset),
6035 accounts_and_meta_to_store.is_zero_lamport(i),
6036 ));
6037 }
6038 storage.add_accounts(
6039 stored_accounts_info.offsets.len(),
6040 stored_accounts_info.size,
6041 );
6042 }
6043
6044 self.stats
6045 .store_append_accounts
6046 .fetch_add(total_append_accounts_us, Ordering::Relaxed);
6047
6048 infos
6049 }
6050
6051 fn mark_zero_lamport_single_ref_accounts(
6053 &self,
6054 account_infos: &[AccountInfo],
6055 storage: &AccountStorageEntry,
6056 reclaim_handling: UpsertReclaim,
6057 ) {
6058 if reclaim_handling == UpsertReclaim::ReclaimOldSlots {
6064 let mut add_zero_lamport_accounts = Measure::start("add_zero_lamport_accounts");
6065 let mut num_zero_lamport_accounts_added = 0;
6066
6067 for account_info in account_infos {
6068 if account_info.is_zero_lamport() {
6069 storage.insert_zero_lamport_single_ref_account_offset(account_info.offset());
6070 num_zero_lamport_accounts_added += 1;
6071 }
6072 }
6073
6074 if num_zero_lamport_accounts_added > 0
6076 && self.is_candidate_for_shrink(storage)
6077 && Self::is_shrinking_productive(storage)
6078 {
6079 self.shrink_candidate_slots
6080 .lock()
6081 .unwrap()
6082 .insert(storage.slot);
6083 }
6084
6085 add_zero_lamport_accounts.stop();
6086 self.stats
6087 .add_zero_lamport_accounts_us
6088 .fetch_add(add_zero_lamport_accounts.as_us(), Ordering::Relaxed);
6089 self.stats
6090 .num_zero_lamport_accounts_added
6091 .fetch_add(num_zero_lamport_accounts_added, Ordering::Relaxed);
6092 }
6093 }
6094
6095 fn report_store_timings(&self) {
6096 if self.stats.last_store_report.should_update(1000) {
6097 let read_cache_stats = self.read_only_accounts_cache.get_and_reset_stats();
6098 datapoint_info!(
6099 "accounts_db_store_timings",
6100 (
6101 "store_accounts",
6102 self.stats.store_accounts.swap(0, Ordering::Relaxed),
6103 i64
6104 ),
6105 (
6106 "update_index",
6107 self.stats.store_update_index.swap(0, Ordering::Relaxed),
6108 i64
6109 ),
6110 (
6111 "handle_reclaims",
6112 self.stats.store_handle_reclaims.swap(0, Ordering::Relaxed),
6113 i64
6114 ),
6115 (
6116 "append_accounts",
6117 self.stats.store_append_accounts.swap(0, Ordering::Relaxed),
6118 i64
6119 ),
6120 (
6121 "stakes_cache_check_and_store_us",
6122 self.stats
6123 .stakes_cache_check_and_store_us
6124 .swap(0, Ordering::Relaxed),
6125 i64
6126 ),
6127 (
6128 "num_accounts",
6129 self.stats.store_num_accounts.swap(0, Ordering::Relaxed),
6130 i64
6131 ),
6132 (
6133 "total_data",
6134 self.stats.store_total_data.swap(0, Ordering::Relaxed),
6135 i64
6136 ),
6137 (
6138 "num_reclaims",
6139 self.stats.num_reclaims.swap(0, Ordering::Relaxed),
6140 i64
6141 ),
6142 (
6143 "read_only_accounts_cache_entries",
6144 self.read_only_accounts_cache.cache_len(),
6145 i64
6146 ),
6147 (
6148 "read_only_accounts_cache_data_size",
6149 self.read_only_accounts_cache.data_size(),
6150 i64
6151 ),
6152 ("read_only_accounts_cache_hits", read_cache_stats.hits, i64),
6153 (
6154 "read_only_accounts_cache_misses",
6155 read_cache_stats.misses,
6156 i64
6157 ),
6158 (
6159 "read_only_accounts_cache_evicts",
6160 read_cache_stats.evicts,
6161 i64
6162 ),
6163 (
6164 "read_only_accounts_cache_load_us",
6165 read_cache_stats.load_us,
6166 i64
6167 ),
6168 (
6169 "read_only_accounts_cache_store_us",
6170 read_cache_stats.store_us,
6171 i64
6172 ),
6173 (
6174 "read_only_accounts_cache_evict_us",
6175 read_cache_stats.evict_us,
6176 i64
6177 ),
6178 (
6179 "read_only_accounts_cache_evictor_wakeup_count_all",
6180 read_cache_stats.evictor_wakeup_count_all,
6181 i64
6182 ),
6183 (
6184 "read_only_accounts_cache_evictor_wakeup_count_productive",
6185 read_cache_stats.evictor_wakeup_count_productive,
6186 i64
6187 ),
6188 (
6189 "handle_dead_keys_us",
6190 self.stats.handle_dead_keys_us.swap(0, Ordering::Relaxed),
6191 i64
6192 ),
6193 (
6194 "purge_exact_us",
6195 self.stats.purge_exact_us.swap(0, Ordering::Relaxed),
6196 i64
6197 ),
6198 (
6199 "purge_exact_count",
6200 self.stats.purge_exact_count.swap(0, Ordering::Relaxed),
6201 i64
6202 ),
6203 (
6204 "num_obsolete_slots_removed",
6205 self.stats
6206 .num_obsolete_slots_removed
6207 .swap(0, Ordering::Relaxed),
6208 i64
6209 ),
6210 (
6211 "num_obsolete_bytes_removed",
6212 self.stats
6213 .num_obsolete_bytes_removed
6214 .swap(0, Ordering::Relaxed),
6215 i64
6216 ),
6217 (
6218 "add_zero_lamport_accounts_us",
6219 self.stats
6220 .add_zero_lamport_accounts_us
6221 .swap(0, Ordering::Relaxed),
6222 i64
6223 ),
6224 (
6225 "num_zero_lamport_accounts_added",
6226 self.stats
6227 .num_zero_lamport_accounts_added
6228 .swap(0, Ordering::Relaxed),
6229 i64
6230 ),
6231 );
6232
6233 datapoint_info!(
6234 "accounts_db_store_timings2",
6235 (
6236 "create_store_count",
6237 self.stats.create_store_count.swap(0, Ordering::Relaxed),
6238 i64
6239 ),
6240 (
6241 "dropped_stores",
6242 self.stats.dropped_stores.swap(0, Ordering::Relaxed),
6243 i64
6244 ),
6245 );
6246 }
6247 }
6248
6249 pub fn add_root(&self, slot: Slot) -> AccountsAddRootTiming {
6250 let mut index_time = Measure::start("index_add_root");
6251 self.accounts_index.add_root(slot);
6252 index_time.stop();
6253 let mut cache_time = Measure::start("cache_add_root");
6254 self.accounts_cache.add_root(slot);
6255 cache_time.stop();
6256
6257 AccountsAddRootTiming {
6258 index_us: index_time.as_us(),
6259 cache_us: cache_time.as_us(),
6260 }
6261 }
6262
6263 pub fn get_storages(
6265 &self,
6266 requested_slots: impl RangeBounds<Slot> + Sync,
6267 ) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
6268 let start = Instant::now();
6269 let (slots, storages) = self
6270 .storage
6271 .get_if(|slot, storage| requested_slots.contains(slot) && storage.has_accounts())
6272 .into_vec()
6273 .into_iter()
6274 .unzip();
6275 let duration = start.elapsed();
6276 debug!("get_snapshot_storages: {duration:?}");
6277 (storages, slots)
6278 }
6279
6280 pub fn latest_full_snapshot_slot(&self) -> Option<Slot> {
6282 self.latest_full_snapshot_slot.read()
6283 }
6284
6285 pub fn set_latest_full_snapshot_slot(&self, slot: Slot) {
6287 *self.latest_full_snapshot_slot.lock_write() = Some(slot);
6288 }
6289
6290 fn generate_index_for_slot<'a>(
6291 &self,
6292 reader: &mut impl RequiredLenBufFileRead<'a>,
6293 storage: &'a AccountStorageEntry,
6294 slot: Slot,
6295 store_id: AccountsFileId,
6296 storage_info: &StorageSizeAndCountMap,
6297 ) -> SlotIndexGenerationInfo {
6298 if storage.accounts.get_account_data_lens(&[0]).is_empty() {
6299 return SlotIndexGenerationInfo::default();
6300 }
6301
6302 let mut accounts_data_len = 0;
6303 let mut stored_size_alive = 0;
6304 let mut zero_lamport_pubkeys = vec![];
6305 let mut zero_lamport_offsets = vec![];
6306 let mut all_accounts_are_zero_lamports = true;
6307 let mut slot_lt_hash = SlotLtHash::default();
6308 let mut keyed_account_infos = vec![];
6309
6310 let geyser_notifier = self
6311 .accounts_update_notifier
6312 .as_ref()
6313 .filter(|notifier| notifier.snapshot_notifications_enabled());
6314
6315 let mut write_version_for_geyser = 0;
6325
6326 let obsolete_accounts: IntSet<_> = storage
6331 .obsolete_accounts_read_lock()
6332 .filter_obsolete_accounts(None)
6333 .map(|(offset, _)| offset)
6334 .collect();
6335 let mut num_obsolete_accounts_skipped = 0;
6336
6337 storage
6338 .accounts
6339 .scan_accounts(reader, |offset, account| {
6340 if obsolete_accounts.contains(&offset) {
6341 num_obsolete_accounts_skipped += 1;
6342 return;
6343 }
6344
6345 let data_len = account.data.len();
6346 stored_size_alive += storage.accounts.calculate_stored_size(data_len);
6347 let is_account_zero_lamport = account.is_zero_lamport();
6348 if !is_account_zero_lamport {
6349 accounts_data_len += data_len as u64;
6350 all_accounts_are_zero_lamports = false;
6351 } else {
6352 if self.mark_obsolete_accounts == MarkObsoleteAccounts::Enabled {
6356 zero_lamport_offsets.push(offset);
6357 }
6358 zero_lamport_pubkeys.push(*account.pubkey);
6359 }
6360 keyed_account_infos.push((
6361 *account.pubkey,
6362 AccountInfo::new(
6363 StorageLocation::AppendVec(store_id, offset), is_account_zero_lamport,
6365 ),
6366 ));
6367
6368 if !self.account_indexes.is_empty() {
6369 self.accounts_index.update_secondary_indexes(
6370 account.pubkey,
6371 &account,
6372 &self.account_indexes,
6373 );
6374 }
6375
6376 let account_lt_hash = Self::lt_hash_account(&account, account.pubkey());
6377 slot_lt_hash.0.mix_in(&account_lt_hash.0);
6378
6379 if let Some(geyser_notifier) = geyser_notifier {
6380 debug_assert!(geyser_notifier.snapshot_notifications_enabled());
6381 let account_for_geyser = AccountForGeyser {
6382 pubkey: account.pubkey(),
6383 lamports: account.lamports(),
6384 owner: account.owner(),
6385 executable: account.executable(),
6386 rent_epoch: account.rent_epoch(),
6387 data: account.data(),
6388 };
6389 geyser_notifier.notify_account_restore_from_snapshot(
6390 slot,
6391 write_version_for_geyser,
6392 &account_for_geyser,
6393 );
6394 write_version_for_geyser += 1;
6395 }
6396 })
6397 .expect("must scan accounts storage");
6398
6399 let (insert_time_us, insert_info) = self
6400 .accounts_index
6401 .insert_new_if_missing_into_primary_index(slot, keyed_account_infos);
6402
6403 {
6404 let mut info = storage_info.entry(store_id).or_default();
6406 info.stored_size += stored_size_alive;
6407 info.count += insert_info.count;
6408
6409 assert!(
6412 info.stored_size <= u64_align!(storage.accounts.len()),
6413 "Stored size ({}) is larger than the size of the accounts file ({}) for store_id: \
6414 {}",
6415 info.stored_size,
6416 storage.accounts.len(),
6417 store_id
6418 );
6419 }
6420 if !zero_lamport_pubkeys.is_empty() {
6424 let old = self
6425 .uncleaned_pubkeys
6426 .insert(slot, zero_lamport_pubkeys.clone());
6427 assert!(old.is_none());
6428 }
6429
6430 if self.mark_obsolete_accounts == MarkObsoleteAccounts::Enabled {
6435 storage.batch_insert_zero_lamport_single_ref_account_offsets(&zero_lamport_offsets);
6436 zero_lamport_pubkeys = Vec::new();
6437 }
6438 SlotIndexGenerationInfo {
6439 insert_time_us,
6440 num_accounts: insert_info.count as u64,
6441 accounts_data_len,
6442 zero_lamport_pubkeys,
6443 all_accounts_are_zero_lamports,
6444 num_did_not_exist: insert_info.num_did_not_exist,
6445 num_existed_in_mem: insert_info.num_existed_in_mem,
6446 num_existed_on_disk: insert_info.num_existed_on_disk,
6447 slot_lt_hash,
6448 num_obsolete_accounts_skipped,
6449 }
6450 }
6451
6452 pub fn generate_index(
6453 &self,
6454 limit_load_slot_count_from_snapshot: Option<usize>,
6455 verify: bool,
6456 ) -> IndexGenerationInfo {
6457 let mut total_time = Measure::start("generate_index");
6458
6459 let mut storages = self.storage.all_storages();
6460 storages.sort_unstable_by_key(|storage| storage.slot);
6461 if let Some(limit) = limit_load_slot_count_from_snapshot {
6462 storages.truncate(limit); }
6464 let num_storages = storages.len();
6465
6466 self.accounts_index
6467 .set_startup(Startup::StartupWithExtraThreads);
6468 let storage_info = StorageSizeAndCountMap::default();
6469
6470 #[derive(Debug)]
6472 struct IndexGenerationAccumulator {
6473 insert_us: u64,
6474 num_accounts: u64,
6475 accounts_data_len: u64,
6476 zero_lamport_pubkeys: Vec<Pubkey>,
6477 all_accounts_are_zero_lamports_slots: u64,
6478 all_zeros_slots: Vec<(Slot, Arc<AccountStorageEntry>)>,
6479 num_did_not_exist: u64,
6480 num_existed_in_mem: u64,
6481 num_existed_on_disk: u64,
6482 lt_hash: LtHash,
6483 num_obsolete_accounts_skipped: u64,
6484 }
6485 impl IndexGenerationAccumulator {
6486 const fn new() -> Self {
6487 Self {
6488 insert_us: 0,
6489 num_accounts: 0,
6490 accounts_data_len: 0,
6491 zero_lamport_pubkeys: Vec::new(),
6492 all_accounts_are_zero_lamports_slots: 0,
6493 all_zeros_slots: Vec::new(),
6494 num_did_not_exist: 0,
6495 num_existed_in_mem: 0,
6496 num_existed_on_disk: 0,
6497 lt_hash: LtHash::identity(),
6498 num_obsolete_accounts_skipped: 0,
6499 }
6500 }
6501 fn accumulate(&mut self, other: Self) {
6502 self.insert_us += other.insert_us;
6503 self.num_accounts += other.num_accounts;
6504 self.accounts_data_len += other.accounts_data_len;
6505 self.zero_lamport_pubkeys.extend(other.zero_lamport_pubkeys);
6506 self.all_accounts_are_zero_lamports_slots +=
6507 other.all_accounts_are_zero_lamports_slots;
6508 self.all_zeros_slots.extend(other.all_zeros_slots);
6509 self.num_did_not_exist += other.num_did_not_exist;
6510 self.num_existed_in_mem += other.num_existed_in_mem;
6511 self.num_existed_on_disk += other.num_existed_on_disk;
6512 self.lt_hash.mix_in(&other.lt_hash);
6513 self.num_obsolete_accounts_skipped += other.num_obsolete_accounts_skipped;
6514 }
6515 }
6516
6517 let mut total_accum = IndexGenerationAccumulator::new();
6518 let storages_orderer =
6519 AccountStoragesOrderer::with_random_order(&storages).into_concurrent_consumer();
6520 let exit_logger = AtomicBool::new(false);
6521 let num_processed = AtomicU64::new(0);
6522 let num_threads = num_cpus::get();
6523 let mut index_time = Measure::start("index");
6524 thread::scope(|s| {
6525 let thread_handles = (0..num_threads)
6526 .map(|i| {
6527 thread::Builder::new()
6528 .name(format!("solGenIndex{i:02}"))
6529 .spawn_scoped(s, || {
6530 let mut thread_accum = IndexGenerationAccumulator::new();
6531 let mut reader = append_vec::new_scan_accounts_reader();
6532 while let Some(next_item) = storages_orderer.next() {
6533 self.maybe_throttle_index_generation();
6534 let storage = next_item.storage;
6535 let store_id = storage.id();
6536 let slot = storage.slot();
6537 let slot_info = self.generate_index_for_slot(
6538 &mut reader,
6539 storage,
6540 slot,
6541 store_id,
6542 &storage_info,
6543 );
6544 thread_accum.insert_us += slot_info.insert_time_us;
6545 thread_accum.num_accounts += slot_info.num_accounts;
6546 thread_accum.accounts_data_len += slot_info.accounts_data_len;
6547 thread_accum
6548 .zero_lamport_pubkeys
6549 .extend(slot_info.zero_lamport_pubkeys);
6550 if slot_info.all_accounts_are_zero_lamports {
6551 thread_accum.all_accounts_are_zero_lamports_slots += 1;
6552 thread_accum.all_zeros_slots.push((
6553 slot,
6554 Arc::clone(&storages[next_item.original_index]),
6555 ));
6556 }
6557 thread_accum.num_did_not_exist += slot_info.num_did_not_exist;
6558 thread_accum.num_existed_in_mem += slot_info.num_existed_in_mem;
6559 thread_accum.num_existed_on_disk += slot_info.num_existed_on_disk;
6560 thread_accum.lt_hash.mix_in(&slot_info.slot_lt_hash.0);
6561 thread_accum.num_obsolete_accounts_skipped +=
6562 slot_info.num_obsolete_accounts_skipped;
6563 num_processed.fetch_add(1, Ordering::Relaxed);
6564 }
6565 thread_accum
6566 })
6567 })
6568 .collect::<Result<Vec<_>, _>>()
6569 .expect("spawn threads");
6570 let logger_thread_handle = thread::Builder::new()
6571 .name("solGenIndexLog".to_string())
6572 .spawn_scoped(s, || {
6573 let mut last_update = Instant::now();
6574 loop {
6575 if exit_logger.load(Ordering::Relaxed) {
6576 break;
6577 }
6578 let num_processed = num_processed.load(Ordering::Relaxed);
6579 if num_processed == num_storages as u64 {
6580 info!("generating index: processed all slots");
6581 break;
6582 }
6583 let now = Instant::now();
6584 if now - last_update > Duration::from_secs(2) {
6585 info!(
6586 "generating index: processed {num_processed}/{num_storages} \
6587 slots..."
6588 );
6589 last_update = now;
6590 }
6591 thread::sleep(Duration::from_millis(500))
6592 }
6593 })
6594 .expect("spawn thread");
6595 for thread_handle in thread_handles {
6596 let Ok(thread_accum) = thread_handle.join() else {
6597 exit_logger.store(true, Ordering::Relaxed);
6598 panic!("index generation failed");
6599 };
6600 total_accum.accumulate(thread_accum);
6601 }
6602 logger_thread_handle.join().expect("join thread");
6606 });
6607 index_time.stop();
6608
6609 {
6610 let index_stats = self.accounts_index.stats();
6612
6613 index_stats.inc_insert_count(total_accum.num_did_not_exist);
6615 index_stats.add_mem_count(total_accum.num_did_not_exist as usize);
6616
6617 index_stats
6619 .entries_from_mem
6620 .fetch_add(total_accum.num_existed_in_mem, Ordering::Relaxed);
6621 index_stats
6622 .updates_in_mem
6623 .fetch_add(total_accum.num_existed_in_mem, Ordering::Relaxed);
6624
6625 index_stats.add_mem_count(total_accum.num_existed_on_disk as usize);
6627 index_stats
6628 .entries_missing
6629 .fetch_add(total_accum.num_existed_on_disk, Ordering::Relaxed);
6630 index_stats
6631 .updates_in_mem
6632 .fetch_add(total_accum.num_existed_on_disk, Ordering::Relaxed);
6633 }
6634
6635 if let Some(geyser_notifier) = &self.accounts_update_notifier {
6636 geyser_notifier.notify_end_of_restore_from_snapshot();
6639 }
6640
6641 if verify {
6642 info!("Verifying index...");
6643 let start = Instant::now();
6644 storages.par_iter().for_each(|storage| {
6645 let store_id = storage.id();
6646 let slot = storage.slot();
6647 storage
6648 .accounts
6649 .scan_accounts_without_data(|offset, account| {
6650 let key = account.pubkey();
6651 self.accounts_index.get_and_then(key, |entry| {
6652 let index_entry = entry.unwrap();
6653 let slot_list = index_entry.slot_list_read_lock();
6654 let mut count = 0;
6655 for (slot2, account_info2) in slot_list.iter() {
6656 if *slot2 == slot {
6657 count += 1;
6658 let ai = AccountInfo::new(
6659 StorageLocation::AppendVec(store_id, offset), account.is_zero_lamport(),
6661 );
6662 assert_eq!(&ai, account_info2);
6663 }
6664 }
6665 assert_eq!(1, count);
6666 (false, ())
6667 });
6668 })
6669 .expect("must scan accounts storage");
6670 });
6671 info!("Verifying index... Done in {:?}", start.elapsed());
6672 }
6673
6674 let total_duplicate_slot_keys = AtomicU64::default();
6675 let total_num_unique_duplicate_keys = AtomicU64::default();
6676
6677 let unique_pubkeys_by_bin = Mutex::new(Vec::<Vec<Pubkey>>::default());
6680 let mut m = Measure::start("accounts_index_idle_us");
6682 self.accounts_index.set_startup(Startup::Normal);
6683 m.stop();
6684 let index_flush_us = m.as_us();
6685
6686 let populate_duplicate_keys_us = measure_us!({
6687 self.accounts_index
6690 .populate_and_retrieve_duplicate_keys_from_startup(|slot_keys| {
6691 total_duplicate_slot_keys.fetch_add(slot_keys.len() as u64, Ordering::Relaxed);
6692 let unique_keys =
6693 HashSet::<Pubkey>::from_iter(slot_keys.iter().map(|(_, key)| *key));
6694 for (slot, key) in slot_keys {
6695 self.uncleaned_pubkeys.entry(slot).or_default().push(key);
6696 }
6697 let unique_pubkeys_by_bin_inner = unique_keys.into_iter().collect::<Vec<_>>();
6698 total_num_unique_duplicate_keys
6699 .fetch_add(unique_pubkeys_by_bin_inner.len() as u64, Ordering::Relaxed);
6700 unique_pubkeys_by_bin
6702 .lock()
6703 .unwrap()
6704 .push(unique_pubkeys_by_bin_inner);
6705 });
6706 })
6707 .1;
6708 let unique_pubkeys_by_bin = unique_pubkeys_by_bin.into_inner().unwrap();
6709
6710 let mut timings = GenerateIndexTimings {
6711 index_flush_us,
6712 index_time: index_time.as_us(),
6713 insertion_time_us: total_accum.insert_us,
6714 total_duplicate_slot_keys: total_duplicate_slot_keys.load(Ordering::Relaxed),
6715 total_num_unique_duplicate_keys: total_num_unique_duplicate_keys
6716 .load(Ordering::Relaxed),
6717 populate_duplicate_keys_us,
6718 total_including_duplicates: total_accum.num_accounts,
6719 total_slots: num_storages as u64,
6720 all_accounts_are_zero_lamports_slots: total_accum.all_accounts_are_zero_lamports_slots,
6721 num_obsolete_accounts_skipped: total_accum.num_obsolete_accounts_skipped,
6722 ..GenerateIndexTimings::default()
6723 };
6724
6725 #[derive(Debug, Default)]
6726 struct DuplicatePubkeysVisitedInfo {
6727 accounts_data_len_from_duplicates: u64,
6728 num_duplicate_accounts: u64,
6729 duplicates_lt_hash: Box<DuplicatesLtHash>,
6730 }
6731 impl DuplicatePubkeysVisitedInfo {
6732 fn reduce(mut self, other: Self) -> Self {
6733 self.accounts_data_len_from_duplicates += other.accounts_data_len_from_duplicates;
6734 self.num_duplicate_accounts += other.num_duplicate_accounts;
6735 self.duplicates_lt_hash
6736 .0
6737 .mix_in(&other.duplicates_lt_hash.0);
6738 self
6739 }
6740 }
6741
6742 let (num_zero_lamport_single_refs, visit_zero_lamports_us) = measure_us!(
6743 self.visit_zero_lamport_pubkeys_during_startup(total_accum.zero_lamport_pubkeys)
6744 );
6745 timings.visit_zero_lamports_us = visit_zero_lamports_us;
6746 timings.num_zero_lamport_single_refs = num_zero_lamport_single_refs;
6747
6748 let mut visit_duplicate_accounts_timer = Measure::start("visit duplicate accounts");
6749 let DuplicatePubkeysVisitedInfo {
6750 accounts_data_len_from_duplicates,
6751 num_duplicate_accounts,
6752 duplicates_lt_hash,
6753 } = unique_pubkeys_by_bin
6754 .par_iter()
6755 .fold(
6756 DuplicatePubkeysVisitedInfo::default,
6757 |accum, pubkeys_by_bin| {
6758 let intermediate = pubkeys_by_bin
6759 .par_chunks(4096)
6760 .fold(DuplicatePubkeysVisitedInfo::default, |accum, pubkeys| {
6761 let (
6762 accounts_data_len_from_duplicates,
6763 accounts_duplicates_num,
6764 duplicates_lt_hash,
6765 ) = self.visit_duplicate_pubkeys_during_startup(pubkeys);
6766 let intermediate = DuplicatePubkeysVisitedInfo {
6767 accounts_data_len_from_duplicates,
6768 num_duplicate_accounts: accounts_duplicates_num,
6769 duplicates_lt_hash,
6770 };
6771 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
6772 })
6773 .reduce(
6774 DuplicatePubkeysVisitedInfo::default,
6775 DuplicatePubkeysVisitedInfo::reduce,
6776 );
6777 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
6778 },
6779 )
6780 .reduce(
6781 DuplicatePubkeysVisitedInfo::default,
6782 DuplicatePubkeysVisitedInfo::reduce,
6783 );
6784 visit_duplicate_accounts_timer.stop();
6785 timings.visit_duplicate_accounts_time_us = visit_duplicate_accounts_timer.as_us();
6786 timings.num_duplicate_accounts = num_duplicate_accounts;
6787
6788 total_accum.lt_hash.mix_out(&duplicates_lt_hash.0);
6789 total_accum.accounts_data_len -= accounts_data_len_from_duplicates;
6790 info!("accounts data len: {}", total_accum.accounts_data_len);
6791
6792 info!(
6794 "insert all zero slots to clean at startup {}",
6795 total_accum.all_zeros_slots.len()
6796 );
6797 for (slot, storage) in total_accum.all_zeros_slots {
6798 self.dirty_stores.insert(slot, storage);
6799 }
6800
6801 for storage in &storages {
6803 self.accounts_index.add_root(storage.slot());
6804 }
6805
6806 self.set_storage_count_and_alive_bytes(storage_info, &mut timings);
6807
6808 if self.mark_obsolete_accounts == MarkObsoleteAccounts::Enabled {
6809 let mut mark_obsolete_accounts_time = Measure::start("mark_obsolete_accounts_time");
6810 let slot_marked_obsolete = storages.last().unwrap().slot();
6816 let obsolete_account_stats =
6817 self.mark_obsolete_accounts_at_startup(slot_marked_obsolete, unique_pubkeys_by_bin);
6818
6819 mark_obsolete_accounts_time.stop();
6820 timings.mark_obsolete_accounts_us = mark_obsolete_accounts_time.as_us();
6821 timings.num_obsolete_accounts_marked = obsolete_account_stats.accounts_marked_obsolete;
6822 timings.num_slots_removed_as_obsolete = obsolete_account_stats.slots_removed;
6823 }
6824 total_time.stop();
6825 timings.total_time_us = total_time.as_us();
6826 timings.report(self.accounts_index.get_startup_stats());
6827
6828 self.accounts_index.log_secondary_indexes();
6829
6830 let index_capacity = self
6835 .accounts_index
6836 .account_maps
6837 .iter()
6838 .map(|bin| bin.capacity_for_startup())
6839 .sum();
6840 self.accounts_index
6841 .stats()
6842 .capacity_in_mem
6843 .store(index_capacity, Ordering::Relaxed);
6844
6845 IndexGenerationInfo {
6846 accounts_data_len: total_accum.accounts_data_len,
6847 calculated_accounts_lt_hash: AccountsLtHash(total_accum.lt_hash),
6848 }
6849 }
6850
6851 fn mark_obsolete_accounts_at_startup(
6854 &self,
6855 slot_marked_obsolete: Slot,
6856 pubkeys_with_duplicates_by_bin: Vec<Vec<Pubkey>>,
6857 ) -> ObsoleteAccountsStats {
6858 let stats: ObsoleteAccountsStats = pubkeys_with_duplicates_by_bin
6859 .par_iter()
6860 .map(|pubkeys_by_bin| {
6861 let reclaims = self
6862 .accounts_index
6863 .clean_and_unref_rooted_entries_by_bin(pubkeys_by_bin);
6864 let stats = PurgeStats::default();
6865
6866 if !reclaims.is_empty() {
6868 self.handle_reclaims(
6869 reclaims.iter(),
6870 None,
6871 &HashSet::new(),
6872 HandleReclaims::ProcessDeadSlots(&stats),
6873 MarkAccountsObsolete::Yes(slot_marked_obsolete),
6874 );
6875 }
6876 ObsoleteAccountsStats {
6877 accounts_marked_obsolete: reclaims.len() as u64,
6878 slots_removed: stats.total_removed_storage_entries.load(Ordering::Relaxed)
6879 as u64,
6880 }
6881 })
6882 .sum();
6883 stats
6884 }
6885
6886 fn maybe_throttle_index_generation(&self) {
6889 if !self.accounts_index.is_disk_index_enabled() {
6891 return;
6892 }
6893 const LIMIT: usize = 10_000_000;
6901 while self
6902 .accounts_index
6903 .get_startup_remaining_items_to_flush_estimate()
6904 > LIMIT
6905 {
6906 sleep(Duration::from_millis(10));
6909 }
6910 }
6911
6912 fn visit_zero_lamport_pubkeys_during_startup(&self, mut pubkeys: Vec<Pubkey>) -> u64 {
6916 let mut slot_offsets = HashMap::<_, Vec<_>>::default();
6917 let orig_len = pubkeys.len();
6921 pubkeys.sort_unstable();
6922 pubkeys.dedup();
6923 let uniq_len = pubkeys.len();
6924 info!(
6925 "visit_zero_lamport_pubkeys_during_startup: {orig_len} pubkeys, {uniq_len} after dedup",
6926 );
6927
6928 self.accounts_index.scan(
6929 pubkeys.iter(),
6930 |_pubkey, slots_refs| {
6931 let (slot_list, ref_count) = slots_refs.unwrap();
6932 if ref_count == 1 {
6933 assert_eq!(slot_list.len(), 1);
6934 let (slot_alive, account_info) = slot_list.first().unwrap();
6935 assert!(!account_info.is_cached());
6936 if account_info.is_zero_lamport() {
6937 slot_offsets
6938 .entry(*slot_alive)
6939 .or_default()
6940 .push(account_info.offset());
6941 }
6942 }
6943 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
6944 },
6945 None,
6946 ScanFilter::All,
6947 );
6948
6949 let mut count = 0;
6950 let mut dead_stores = 0;
6951 let mut shrink_stores = 0;
6952 let mut non_shrink_stores = 0;
6953 for (slot, offsets) in slot_offsets {
6954 if let Some(store) = self.storage.get_slot_storage_entry(slot) {
6955 count += store.batch_insert_zero_lamport_single_ref_account_offsets(&offsets);
6956 if store.num_zero_lamport_single_ref_accounts() == store.count() {
6957 self.dirty_stores.entry(slot).or_insert(store);
6959 dead_stores += 1;
6960 } else if Self::is_shrinking_productive(&store)
6961 && self.is_candidate_for_shrink(&store)
6962 {
6963 if self.shrink_candidate_slots.lock().unwrap().insert(slot) {
6965 shrink_stores += 1;
6966 }
6967 } else {
6968 non_shrink_stores += 1;
6969 }
6970 }
6971 }
6972 self.shrink_stats
6973 .num_zero_lamport_single_ref_accounts_found
6974 .fetch_add(count, Ordering::Relaxed);
6975
6976 self.shrink_stats
6977 .num_dead_slots_added_to_clean
6978 .fetch_add(dead_stores, Ordering::Relaxed);
6979
6980 self.shrink_stats
6981 .num_slots_with_zero_lamport_accounts_added_to_shrink
6982 .fetch_add(shrink_stores, Ordering::Relaxed);
6983
6984 self.shrink_stats
6985 .marking_zero_dead_accounts_in_non_shrinkable_store
6986 .fetch_add(non_shrink_stores, Ordering::Relaxed);
6987
6988 count
6989 }
6990
6991 fn visit_duplicate_pubkeys_during_startup(
7003 &self,
7004 pubkeys: &[Pubkey],
7005 ) -> (u64, u64, Box<DuplicatesLtHash>) {
7006 let mut accounts_data_len_from_duplicates = 0;
7007 let mut num_duplicate_accounts = 0_u64;
7008 let mut duplicates_lt_hash = Box::new(DuplicatesLtHash::default());
7009 self.accounts_index.scan(
7010 pubkeys.iter(),
7011 |pubkey, slots_refs| {
7012 if let Some((slot_list, _ref_count)) = slots_refs {
7013 if slot_list.len() > 1 {
7014 let max = slot_list.iter().map(|(slot, _)| slot).max().unwrap();
7020 slot_list.iter().for_each(|(slot, account_info)| {
7021 if slot == max {
7022 return;
7024 }
7025 let maybe_storage_entry = self
7026 .storage
7027 .get_account_storage_entry(*slot, account_info.store_id());
7028 let mut accessor = LoadedAccountAccessor::Stored(
7029 maybe_storage_entry.map(|entry| (entry, account_info.offset())),
7030 );
7031 accessor.check_and_get_loaded_account(|loaded_account| {
7032 let data_len = loaded_account.data_len();
7033 if loaded_account.lamports() > 0 {
7034 accounts_data_len_from_duplicates += data_len;
7035 }
7036 num_duplicate_accounts += 1;
7037 let account_lt_hash =
7038 Self::lt_hash_account(&loaded_account, pubkey);
7039 duplicates_lt_hash.0.mix_in(&account_lt_hash.0);
7040 });
7041 });
7042 }
7043 }
7044 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
7045 },
7046 None,
7047 ScanFilter::All,
7048 );
7049 (
7050 accounts_data_len_from_duplicates as u64,
7051 num_duplicate_accounts,
7052 duplicates_lt_hash,
7053 )
7054 }
7055
7056 fn set_storage_count_and_alive_bytes(
7057 &self,
7058 stored_sizes_and_counts: StorageSizeAndCountMap,
7059 timings: &mut GenerateIndexTimings,
7060 ) {
7061 let mut storage_size_storages_time = Measure::start("storage_size_storages");
7063 for (_slot, store) in self.storage.iter() {
7064 let id = store.id();
7065 assert_eq!(store.alive_bytes(), 0);
7067 if let Some(entry) = stored_sizes_and_counts.get(&id) {
7068 trace!(
7069 "id: {} setting count: {} cur: {}",
7070 id,
7071 entry.count,
7072 store.count(),
7073 );
7074 {
7075 let prev_count = store.count.swap(entry.count, Ordering::Release);
7076 assert_eq!(prev_count, 0);
7077 }
7078 store
7079 .alive_bytes
7080 .store(entry.stored_size, Ordering::Release);
7081 } else {
7082 trace!("id: {id} clearing count");
7083 store.count.store(0, Ordering::Release);
7084 }
7085 }
7086 storage_size_storages_time.stop();
7087 timings.storage_size_storages_us = storage_size_storages_time.as_us();
7088 }
7089
7090 pub fn print_accounts_stats(&self, label: &str) {
7091 self.print_index(label);
7092 self.print_count_and_status(label);
7093 }
7094
7095 fn print_index(&self, label: &str) {
7096 let mut alive_roots: Vec<_> = self.accounts_index.all_alive_roots();
7097 #[allow(clippy::stable_sort_primitive)]
7098 alive_roots.sort();
7099 info!("{label}: accounts_index alive_roots: {alive_roots:?}");
7100 self.accounts_index.account_maps.iter().for_each(|map| {
7101 for pubkey in map.keys() {
7102 self.accounts_index.get_and_then(&pubkey, |account_entry| {
7103 if let Some(account_entry) = account_entry {
7104 let list_r = account_entry.slot_list_read_lock();
7105 info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),);
7106 info!(" slots: {list_r:?}");
7107 }
7108 let add_to_in_mem_cache = false;
7109 (add_to_in_mem_cache, ())
7110 });
7111 }
7112 });
7113 }
7114
7115 pub fn print_count_and_status(&self, label: &str) {
7116 let mut slots: Vec<_> = self.storage.all_slots();
7117 #[allow(clippy::stable_sort_primitive)]
7118 slots.sort();
7119 info!("{}: count_and status for {} slots:", label, slots.len());
7120 for slot in &slots {
7121 let entry = self.storage.get_slot_storage_entry(*slot).unwrap();
7122 info!(
7123 " slot: {} id: {} count: {} len: {} capacity: {}",
7124 slot,
7125 entry.id(),
7126 entry.count(),
7127 entry.accounts.len(),
7128 entry.accounts.capacity(),
7129 );
7130 }
7131 }
7132}
7133
7134#[derive(Debug, Copy, Clone)]
7135enum HandleReclaims<'a> {
7136 ProcessDeadSlots(&'a PurgeStats),
7137}
7138
7139#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7143enum MarkAccountsObsolete {
7144 Yes(Slot),
7145 No,
7146}
7147
7148pub enum UpdateIndexThreadSelection {
7149 Inline,
7151 PoolWithThreshold,
7153}
7154
7155#[cfg(feature = "dev-context-only-utils")]
7157impl AccountStorageEntry {
7158 fn accounts_count(&self) -> usize {
7159 let mut count = 0;
7160 self.accounts
7161 .scan_pubkeys(|_| {
7162 count += 1;
7163 })
7164 .expect("must scan accounts storage");
7165 count
7166 }
7167}
7168
7169#[cfg(test)]
7170impl AccountStorageEntry {
7171 pub(crate) fn obsolete_accounts(&self) -> &RwLock<ObsoleteAccounts> {
7173 &self.obsolete_accounts
7174 }
7175}
7176
7177#[cfg(feature = "dev-context-only-utils")]
7179impl AccountsDb {
7180 pub fn default_for_tests() -> Self {
7181 Self::new_single_for_tests()
7182 }
7183
7184 pub fn new_single_for_tests() -> Self {
7185 AccountsDb::new_for_tests(Vec::new())
7186 }
7187
7188 pub fn new_single_for_tests_with_provider_and_config(
7189 file_provider: AccountsFileProvider,
7190 accounts_db_config: AccountsDbConfig,
7191 ) -> Self {
7192 AccountsDb::new_for_tests_with_provider_and_config(
7193 Vec::new(),
7194 file_provider,
7195 accounts_db_config,
7196 )
7197 }
7198
7199 pub fn new_for_tests(paths: Vec<PathBuf>) -> Self {
7200 Self::new_for_tests_with_provider_and_config(
7201 paths,
7202 AccountsFileProvider::default(),
7203 ACCOUNTS_DB_CONFIG_FOR_TESTING,
7204 )
7205 }
7206
7207 fn new_for_tests_with_provider_and_config(
7208 paths: Vec<PathBuf>,
7209 accounts_file_provider: AccountsFileProvider,
7210 accounts_db_config: AccountsDbConfig,
7211 ) -> Self {
7212 let mut db = AccountsDb::new_with_config(paths, accounts_db_config, None, Arc::default());
7213 db.accounts_file_provider = accounts_file_provider;
7214 db
7215 }
7216
7217 pub fn get_len_of_slots_with_uncleaned_pubkeys(&self) -> usize {
7220 self.uncleaned_pubkeys.len()
7221 }
7222
7223 #[cfg(test)]
7224 pub fn storage_access(&self) -> StorageAccess {
7225 self.storage_access
7226 }
7227
7228 pub fn clean_accounts_for_tests(&self) {
7230 self.clean_accounts(None, false, &EpochSchedule::default())
7231 }
7232
7233 pub fn flush_accounts_cache_slot_for_tests(&self, slot: Slot) {
7234 self.flush_slot_cache(slot);
7235 }
7236
7237 pub fn add_root_and_flush_write_cache(&self, slot: Slot) {
7240 self.add_root(slot);
7241 self.flush_root_write_cache(slot);
7242 }
7243
7244 pub fn load_without_fixed_root(
7245 &self,
7246 ancestors: &Ancestors,
7247 pubkey: &Pubkey,
7248 ) -> Option<(AccountSharedData, Slot)> {
7249 self.do_load(
7250 ancestors,
7251 pubkey,
7252 None,
7253 LoadHint::Unspecified,
7254 LoadZeroLamports::SomeWithZeroLamportAccountForTests,
7256 )
7257 }
7258
7259 pub fn assert_load_account(&self, slot: Slot, pubkey: Pubkey, expected_lamports: u64) {
7260 let ancestors = vec![(slot, 0)].into_iter().collect();
7261 let (account, slot) = self.load_without_fixed_root(&ancestors, &pubkey).unwrap();
7262 assert_eq!((account.lamports(), slot), (expected_lamports, slot));
7263 }
7264
7265 pub fn assert_not_load_account(&self, slot: Slot, pubkey: Pubkey) {
7266 let ancestors = vec![(slot, 0)].into_iter().collect();
7267 let load = self.load_without_fixed_root(&ancestors, &pubkey);
7268 assert!(load.is_none(), "{load:?}");
7269 }
7270
7271 pub fn check_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7272 let ancestors = vec![(slot, 0)].into_iter().collect();
7273 for _ in 0..num {
7274 let idx = thread_rng().gen_range(0..num);
7275 let account = self.load_without_fixed_root(&ancestors, &pubkeys[idx]);
7276 let account1 = Some((
7277 AccountSharedData::new(
7278 (idx + count) as u64,
7279 0,
7280 AccountSharedData::default().owner(),
7281 ),
7282 slot,
7283 ));
7284 assert_eq!(account, account1);
7285 }
7286 }
7287
7288 pub fn scan_accounts_from_storages(
7294 storages: &[Arc<AccountStorageEntry>],
7295 mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>),
7296 ) {
7297 let mut reader = append_vec::new_scan_accounts_reader();
7298 for storage in storages {
7299 storage
7300 .accounts
7301 .scan_accounts(&mut reader, &mut callback)
7302 .expect("must scan accounts storage");
7303 }
7304 }
7305
7306 pub fn store_for_tests<'a>(&self, accounts: impl StorableAccounts<'a>) {
7308 self.store_accounts_unfrozen(
7309 accounts,
7310 None,
7311 UpdateIndexThreadSelection::PoolWithThreshold,
7312 );
7313 }
7314
7315 #[allow(clippy::needless_range_loop)]
7316 pub fn modify_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7317 for idx in 0..num {
7318 let account = AccountSharedData::new(
7319 (idx + count) as u64,
7320 0,
7321 AccountSharedData::default().owner(),
7322 );
7323 self.store_for_tests((slot, [(&pubkeys[idx], &account)].as_slice()));
7324 }
7325 }
7326
7327 pub fn check_storage(&self, slot: Slot, alive_count: usize, total_count: usize) {
7328 let store = self.storage.get_slot_storage_entry(slot).unwrap();
7329 assert_eq!(store.count(), alive_count);
7330 assert_eq!(store.accounts_count(), total_count);
7331 }
7332
7333 pub fn create_account(
7334 &self,
7335 pubkeys: &mut Vec<Pubkey>,
7336 slot: Slot,
7337 num: usize,
7338 space: usize,
7339 num_vote: usize,
7340 ) {
7341 let ancestors = vec![(slot, 0)].into_iter().collect();
7342 for t in 0..num {
7343 let pubkey = solana_pubkey::new_rand();
7344 let account =
7345 AccountSharedData::new((t + 1) as u64, space, AccountSharedData::default().owner());
7346 pubkeys.push(pubkey);
7347 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7348 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7349 }
7350 for t in 0..num_vote {
7351 let pubkey = solana_pubkey::new_rand();
7352 let account =
7353 AccountSharedData::new((num + t + 1) as u64, space, &solana_vote_program::id());
7354 pubkeys.push(pubkey);
7355 let ancestors = vec![(slot, 0)].into_iter().collect();
7356 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7357 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7358 }
7359 }
7360
7361 pub fn assert_ref_count(&self, pubkey: &Pubkey, expected_ref_count: RefCount) {
7365 let expected_ref_count = match self.mark_obsolete_accounts {
7366 MarkObsoleteAccounts::Disabled => expected_ref_count,
7367 MarkObsoleteAccounts::Enabled => expected_ref_count.min(1),
7369 };
7370
7371 assert_eq!(
7372 expected_ref_count,
7373 self.accounts_index.ref_count_from_storage(pubkey)
7374 );
7375 }
7376
7377 pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize {
7378 self.storage
7379 .get_slot_storage_entry(slot)
7380 .map(|storage| storage.count())
7381 .unwrap_or(0)
7382 .saturating_add(
7383 self.accounts_cache
7384 .slot_cache(slot)
7385 .map(|slot_cache| slot_cache.len())
7386 .unwrap_or_default(),
7387 )
7388 }
7389
7390 pub fn flush_root_write_cache(&self, root: Slot) {
7393 assert!(
7394 self.accounts_index
7395 .roots_tracker
7396 .read()
7397 .unwrap()
7398 .alive_roots
7399 .contains(&root),
7400 "slot: {root}"
7401 );
7402 self.flush_accounts_cache(true, Some(root));
7403 }
7404
7405 pub fn all_account_count_in_accounts_file(&self, slot: Slot) -> usize {
7406 let store = self.storage.get_slot_storage_entry(slot);
7407 if let Some(store) = store {
7408 store.accounts_count()
7409 } else {
7410 0
7411 }
7412 }
7413
7414 pub fn uncleaned_pubkeys(&self) -> &DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>> {
7415 &self.uncleaned_pubkeys
7416 }
7417}