1mod geyser_plugin_utils;
22pub mod stats;
23pub mod tests;
24
25#[cfg(test)]
26use crate::append_vec::StoredAccountMeta;
27#[cfg(feature = "dev-context-only-utils")]
28use qualifier_attr::qualifiers;
29use {
30 crate::{
31 account_info::{AccountInfo, Offset, StorageLocation},
32 account_storage::{
33 stored_account_info::{StoredAccountInfo, StoredAccountInfoWithoutData},
34 AccountStorage, AccountStorageStatus, AccountStoragesOrderer, ShrinkInProgress,
35 },
36 accounts_cache::{AccountsCache, CachedAccount, SlotCache},
37 accounts_db::stats::{
38 AccountsStats, CleanAccountsStats, FlushStats, ObsoleteAccountsStats, PurgeStats,
39 ShrinkAncientStats, ShrinkStats, ShrinkStatsSub, StoreAccountsTiming,
40 },
41 accounts_file::{AccountsFile, AccountsFileError, AccountsFileProvider, StorageAccess},
42 accounts_hash::{AccountLtHash, AccountsLtHash, ZERO_LAMPORT_ACCOUNT_LT_HASH},
43 accounts_index::{
44 in_mem_accounts_index::StartupStats, AccountSecondaryIndexes, AccountsIndex,
45 AccountsIndexConfig, AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue,
46 IndexKey, IndexValue, IsCached, RefCount, ScanConfig, ScanFilter, ScanResult, SlotList,
47 UpsertReclaim, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING,
48 },
49 accounts_index_storage::Startup,
50 accounts_update_notifier_interface::AccountsUpdateNotifier,
51 active_stats::{ActiveStatItem, ActiveStats},
52 ancestors::Ancestors,
53 append_vec::{self, aligned_stored_size, IndexInfo, IndexInfoInner, STORE_META_OVERHEAD},
54 buffered_reader::RequiredLenBufFileRead,
55 contains::Contains,
56 is_zero_lamport::IsZeroLamport,
57 partitioned_rewards::{
58 PartitionedEpochRewardsConfig, DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
59 },
60 read_only_accounts_cache::ReadOnlyAccountsCache,
61 storable_accounts::{StorableAccounts, StorableAccountsBySlot},
62 u64_align, utils,
63 verify_accounts_hash_in_background::VerifyAccountsHashInBackground,
64 },
65 dashmap::{DashMap, DashSet},
66 log::*,
67 rand::{thread_rng, Rng},
68 rayon::{prelude::*, ThreadPool},
69 seqlock::SeqLock,
70 smallvec::SmallVec,
71 solana_account::{Account, AccountSharedData, ReadableAccount},
72 solana_clock::{BankId, Epoch, Slot},
73 solana_epoch_schedule::EpochSchedule,
74 solana_lattice_hash::lt_hash::LtHash,
75 solana_measure::{meas_dur, measure::Measure, measure_us},
76 solana_nohash_hasher::{BuildNoHashHasher, IntMap, IntSet},
77 solana_pubkey::Pubkey,
78 solana_rayon_threadlimit::get_thread_count,
79 solana_transaction::sanitized::SanitizedTransaction,
80 std::{
81 borrow::Cow,
82 boxed::Box,
83 collections::{BTreeSet, HashMap, HashSet, VecDeque},
84 io, iter, mem,
85 num::{NonZeroUsize, Saturating},
86 ops::RangeBounds,
87 path::{Path, PathBuf},
88 sync::{
89 atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering},
90 Arc, Condvar, Mutex, RwLock,
91 },
92 thread::{self, sleep},
93 time::{Duration, Instant},
94 },
95 tempfile::TempDir,
96};
97
98const WRITE_CACHE_LIMIT_BYTES_DEFAULT: u64 = 15_000_000_000;
101const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
102
103const UNREF_ACCOUNTS_BATCH_SIZE: usize = 10_000;
104
105const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
106const DEFAULT_NUM_DIRS: u32 = 4;
107
108const SHRINK_COLLECT_CHUNK_SIZE: usize = 50;
111
112const SHRINK_INSERT_ANCIENT_THRESHOLD: usize = 10;
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub(crate) enum ScanAccountStorageData {
119 #[cfg_attr(not(test), allow(dead_code))]
122 NoData,
123 DataRefForStorage,
126}
127
128#[derive(Default, Debug)]
129pub(crate) struct AliveAccounts<'a> {
132 pub(crate) slot: Slot,
134 pub(crate) accounts: Vec<&'a AccountFromStorage>,
135 pub(crate) bytes: usize,
136}
137
138#[derive(Debug)]
140pub(crate) struct ShrinkCollectAliveSeparatedByRefs<'a> {
141 pub(crate) one_ref: AliveAccounts<'a>,
143 pub(crate) many_refs_this_is_newest_alive: AliveAccounts<'a>,
145 pub(crate) many_refs_old_alive: AliveAccounts<'a>,
147}
148
149pub(crate) trait ShrinkCollectRefs<'a>: Sync + Send {
150 fn with_capacity(capacity: usize, slot: Slot) -> Self;
151 fn collect(&mut self, other: Self);
152 fn add(
153 &mut self,
154 ref_count: u64,
155 account: &'a AccountFromStorage,
156 slot_list: &[(Slot, AccountInfo)],
157 );
158 fn len(&self) -> usize;
159 fn alive_bytes(&self) -> usize;
160 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage>;
161}
162
163impl<'a> ShrinkCollectRefs<'a> for AliveAccounts<'a> {
164 fn collect(&mut self, mut other: Self) {
165 self.bytes = self.bytes.saturating_add(other.bytes);
166 self.accounts.append(&mut other.accounts);
167 }
168 fn with_capacity(capacity: usize, slot: Slot) -> Self {
169 Self {
170 accounts: Vec::with_capacity(capacity),
171 bytes: 0,
172 slot,
173 }
174 }
175 fn add(
176 &mut self,
177 _ref_count: u64,
178 account: &'a AccountFromStorage,
179 _slot_list: &[(Slot, AccountInfo)],
180 ) {
181 self.accounts.push(account);
182 self.bytes = self.bytes.saturating_add(account.stored_size());
183 }
184 fn len(&self) -> usize {
185 self.accounts.len()
186 }
187 fn alive_bytes(&self) -> usize {
188 self.bytes
189 }
190 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
191 &self.accounts
192 }
193}
194
195impl<'a> ShrinkCollectRefs<'a> for ShrinkCollectAliveSeparatedByRefs<'a> {
196 fn collect(&mut self, other: Self) {
197 self.one_ref.collect(other.one_ref);
198 self.many_refs_this_is_newest_alive
199 .collect(other.many_refs_this_is_newest_alive);
200 self.many_refs_old_alive.collect(other.many_refs_old_alive);
201 }
202 fn with_capacity(capacity: usize, slot: Slot) -> Self {
203 Self {
204 one_ref: AliveAccounts::with_capacity(capacity, slot),
205 many_refs_this_is_newest_alive: AliveAccounts::with_capacity(0, slot),
206 many_refs_old_alive: AliveAccounts::with_capacity(0, slot),
207 }
208 }
209 fn add(
210 &mut self,
211 ref_count: u64,
212 account: &'a AccountFromStorage,
213 slot_list: &[(Slot, AccountInfo)],
214 ) {
215 let other = if ref_count == 1 {
216 &mut self.one_ref
217 } else if slot_list.len() == 1
218 || !slot_list
219 .iter()
220 .any(|(slot_list_slot, _info)| slot_list_slot > &self.many_refs_old_alive.slot)
221 {
222 &mut self.many_refs_this_is_newest_alive
224 } else {
225 &mut self.many_refs_old_alive
228 };
229 other.add(ref_count, account, slot_list);
230 }
231 fn len(&self) -> usize {
232 self.one_ref
233 .len()
234 .saturating_add(self.many_refs_old_alive.len())
235 .saturating_add(self.many_refs_this_is_newest_alive.len())
236 }
237 fn alive_bytes(&self) -> usize {
238 self.one_ref
239 .alive_bytes()
240 .saturating_add(self.many_refs_old_alive.alive_bytes())
241 .saturating_add(self.many_refs_this_is_newest_alive.alive_bytes())
242 }
243 fn alive_accounts(&self) -> &Vec<&'a AccountFromStorage> {
244 unimplemented!("illegal use");
245 }
246}
247
248pub enum StoreReclaims {
249 Default,
251 Ignore,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257enum LoadZeroLamports {
258 None,
260 #[cfg(feature = "dev-context-only-utils")]
266 SomeWithZeroLamportAccountForTests,
267}
268
269#[derive(Debug)]
270pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
271 pub(crate) slot: Slot,
272 pub(crate) capacity: u64,
273 pub(crate) pubkeys_to_unref: Vec<&'a Pubkey>,
274 pub(crate) zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
275 pub(crate) alive_accounts: T,
276 pub(crate) alive_total_bytes: usize,
278 pub(crate) total_starting_accounts: usize,
279 pub(crate) all_are_zero_lamports: bool,
281}
282
283pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
284 index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING),
285 account_indexes: None,
286 base_working_path: None,
287 shrink_paths: None,
288 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION,
289 read_cache_limit_bytes: None,
290 read_cache_evict_sample_size: None,
291 write_cache_limit_bytes: None,
292 ancient_append_vec_offset: None,
293 ancient_storage_ideal_size: None,
294 max_ancient_storages: None,
295 skip_initial_hash_calc: false,
296 exhaustively_verify_refcounts: false,
297 partitioned_epoch_rewards_config: DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
298 storage_access: StorageAccess::File,
299 scan_filter_for_shrinking: ScanFilter::OnlyAbnormalTest,
300 mark_obsolete_accounts: false,
301 num_background_threads: None,
302 num_foreground_threads: None,
303 num_hash_threads: None,
304};
305pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
306 index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
307 account_indexes: None,
308 base_working_path: None,
309 shrink_paths: None,
310 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION,
311 read_cache_limit_bytes: None,
312 read_cache_evict_sample_size: None,
313 write_cache_limit_bytes: None,
314 ancient_append_vec_offset: None,
315 ancient_storage_ideal_size: None,
316 max_ancient_storages: None,
317 skip_initial_hash_calc: false,
318 exhaustively_verify_refcounts: false,
319 partitioned_epoch_rewards_config: DEFAULT_PARTITIONED_EPOCH_REWARDS_CONFIG,
320 storage_access: StorageAccess::File,
321 scan_filter_for_shrinking: ScanFilter::OnlyAbnormal,
322 mark_obsolete_accounts: false,
323 num_background_threads: None,
324 num_foreground_threads: None,
325 num_hash_threads: None,
326};
327
328struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
329 alive_accounts: T,
331 pubkeys_to_unref: Vec<&'a Pubkey>,
334 zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
336 all_are_zero_lamports: bool,
338}
339
340#[derive(Debug, PartialEq, Copy, Clone)]
343pub struct AccountFromStorage {
344 pub index_info: AccountInfo,
345 pub data_len: u64,
346 pub pubkey: Pubkey,
347}
348
349impl IsZeroLamport for AccountFromStorage {
350 fn is_zero_lamport(&self) -> bool {
351 self.index_info.is_zero_lamport()
352 }
353}
354
355impl AccountFromStorage {
356 pub fn pubkey(&self) -> &Pubkey {
357 &self.pubkey
358 }
359 pub fn stored_size(&self) -> usize {
360 aligned_stored_size(self.data_len as usize)
361 }
362 pub fn data_len(&self) -> usize {
363 self.data_len as usize
364 }
365 #[cfg(test)]
366 pub fn new(account: &StoredAccountMeta) -> Self {
367 let storage_id = 0;
371 AccountFromStorage {
372 index_info: AccountInfo::new(
373 StorageLocation::AppendVec(storage_id, account.offset()),
374 account.is_zero_lamport(),
375 ),
376 pubkey: *account.pubkey(),
377 data_len: account.data_len() as u64,
378 }
379 }
380}
381
382pub struct GetUniqueAccountsResult {
383 pub stored_accounts: Vec<AccountFromStorage>,
384 pub capacity: u64,
385 pub num_duplicated_accounts: usize,
386}
387
388pub struct AccountsAddRootTiming {
389 pub index_us: u64,
390 pub cache_us: u64,
391 pub store_us: u64,
392}
393
394const ANCIENT_APPEND_VEC_DEFAULT_OFFSET: Option<i64> = Some(100_000);
412const DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE: u64 = 100_000;
416pub const DEFAULT_MAX_ANCIENT_STORAGES: usize = 100_000;
419
420#[derive(Debug, Default, Clone)]
421pub struct AccountsDbConfig {
422 pub index: Option<AccountsIndexConfig>,
423 pub account_indexes: Option<AccountSecondaryIndexes>,
424 pub base_working_path: Option<PathBuf>,
426 pub shrink_paths: Option<Vec<PathBuf>>,
427 pub shrink_ratio: AccountShrinkThreshold,
428 pub read_cache_limit_bytes: Option<(usize, usize)>,
431 pub read_cache_evict_sample_size: Option<usize>,
434 pub write_cache_limit_bytes: Option<u64>,
435 pub ancient_append_vec_offset: Option<i64>,
438 pub ancient_storage_ideal_size: Option<u64>,
439 pub max_ancient_storages: Option<usize>,
440 pub skip_initial_hash_calc: bool,
441 pub exhaustively_verify_refcounts: bool,
442 pub partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig,
443 pub storage_access: StorageAccess,
444 pub scan_filter_for_shrinking: ScanFilter,
445 pub mark_obsolete_accounts: bool,
446 pub num_background_threads: Option<NonZeroUsize>,
448 pub num_foreground_threads: Option<NonZeroUsize>,
450 pub num_hash_threads: Option<NonZeroUsize>,
452}
453
454#[cfg(not(test))]
455const ABSURD_CONSECUTIVE_FAILED_ITERATIONS: usize = 100;
456
457#[derive(Debug, Clone, Copy)]
458pub enum AccountShrinkThreshold {
459 TotalSpace { shrink_ratio: f64 },
464 IndividualStore { shrink_ratio: f64 },
467}
468pub const DEFAULT_ACCOUNTS_SHRINK_OPTIMIZE_TOTAL_SPACE: bool = true;
469pub const DEFAULT_ACCOUNTS_SHRINK_RATIO: f64 = 0.80;
470const DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION: AccountShrinkThreshold =
472 AccountShrinkThreshold::TotalSpace {
473 shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_RATIO,
474 };
475
476impl Default for AccountShrinkThreshold {
477 fn default() -> AccountShrinkThreshold {
478 DEFAULT_ACCOUNTS_SHRINK_THRESHOLD_OPTION
479 }
480}
481
482pub enum ScanStorageResult<R, B> {
483 Cached(Vec<R>),
484 Stored(B),
485}
486
487#[derive(Debug, Default)]
488pub struct IndexGenerationInfo {
489 pub accounts_data_len: u64,
490 pub duplicates_lt_hash: Option<Box<DuplicatesLtHash>>,
493}
494
495#[derive(Debug, Default)]
496struct SlotIndexGenerationInfo {
497 insert_time_us: u64,
498 num_accounts: u64,
499 accounts_data_len: u64,
500 zero_lamport_pubkeys: Vec<Pubkey>,
501 all_accounts_are_zero_lamports: bool,
502 num_did_not_exist: u64,
504 num_existed_in_mem: u64,
506 num_existed_on_disk: u64,
508}
509
510#[derive(Debug, Clone, Eq, PartialEq)]
517pub struct DuplicatesLtHash(pub LtHash);
518
519impl Default for DuplicatesLtHash {
520 fn default() -> Self {
521 Self(LtHash::identity())
522 }
523}
524
525#[derive(Default, Debug)]
526struct GenerateIndexTimings {
527 pub total_time_us: u64,
528 pub index_time: u64,
529 pub scan_time: u64,
530 pub insertion_time_us: u64,
531 pub storage_size_storages_us: u64,
532 pub index_flush_us: u64,
533 pub total_including_duplicates: u64,
534 pub accounts_data_len_dedup_time_us: u64,
535 pub total_duplicate_slot_keys: u64,
536 pub total_num_unique_duplicate_keys: u64,
537 pub num_duplicate_accounts: u64,
538 pub populate_duplicate_keys_us: u64,
539 pub total_slots: u64,
540 pub par_duplicates_lt_hash_us: AtomicU64,
541 pub visit_zero_lamports_us: u64,
542 pub num_zero_lamport_single_refs: u64,
543 pub all_accounts_are_zero_lamports_slots: u64,
544 pub mark_obsolete_accounts_us: u64,
545 pub num_obsolete_accounts_marked: u64,
546 pub num_slots_removed_as_obsolete: u64,
547}
548
549#[derive(Default, Debug, PartialEq, Eq)]
550struct StorageSizeAndCount {
551 pub stored_size: usize,
553 pub count: usize,
555}
556type StorageSizeAndCountMap =
557 DashMap<AccountsFileId, StorageSizeAndCount, BuildNoHashHasher<AccountsFileId>>;
558
559impl GenerateIndexTimings {
560 pub fn report(&self, startup_stats: &StartupStats) {
561 datapoint_info!(
562 "generate_index",
563 ("overall_us", self.total_time_us, i64),
564 ("total_us", self.index_time, i64),
566 ("scan_stores_us", self.scan_time, i64),
567 ("insertion_time_us", self.insertion_time_us, i64),
568 (
569 "storage_size_storages_us",
570 self.storage_size_storages_us,
571 i64
572 ),
573 ("index_flush_us", self.index_flush_us, i64),
574 (
575 "total_items_including_duplicates",
576 self.total_including_duplicates,
577 i64
578 ),
579 (
580 "accounts_data_len_dedup_time_us",
581 self.accounts_data_len_dedup_time_us,
582 i64
583 ),
584 (
585 "total_duplicate_slot_keys",
586 self.total_duplicate_slot_keys,
587 i64
588 ),
589 (
590 "total_num_unique_duplicate_keys",
591 self.total_num_unique_duplicate_keys,
592 i64
593 ),
594 ("num_duplicate_accounts", self.num_duplicate_accounts, i64),
595 (
596 "populate_duplicate_keys_us",
597 self.populate_duplicate_keys_us,
598 i64
599 ),
600 ("total_slots", self.total_slots, i64),
601 (
602 "copy_data_us",
603 startup_stats.copy_data_us.swap(0, Ordering::Relaxed),
604 i64
605 ),
606 (
607 "par_duplicates_lt_hash_us",
608 self.par_duplicates_lt_hash_us.load(Ordering::Relaxed),
609 i64
610 ),
611 (
612 "num_zero_lamport_single_refs",
613 self.num_zero_lamport_single_refs,
614 i64
615 ),
616 ("visit_zero_lamports_us", self.visit_zero_lamports_us, i64),
617 (
618 "all_accounts_are_zero_lamports_slots",
619 self.all_accounts_are_zero_lamports_slots,
620 i64
621 ),
622 (
623 "mark_obsolete_accounts_us",
624 self.mark_obsolete_accounts_us,
625 i64
626 ),
627 (
628 "num_obsolete_accounts_marked",
629 self.num_obsolete_accounts_marked,
630 i64
631 ),
632 (
633 "num_slots_removed_as_obsolete",
634 self.num_slots_removed_as_obsolete,
635 i64
636 ),
637 );
638 }
639}
640
641impl IndexValue for AccountInfo {}
642impl DiskIndexValue for AccountInfo {}
643
644impl IsZeroLamport for AccountSharedData {
645 fn is_zero_lamport(&self) -> bool {
646 self.lamports() == 0
647 }
648}
649
650impl IsZeroLamport for Account {
651 fn is_zero_lamport(&self) -> bool {
652 self.lamports() == 0
653 }
654}
655
656struct MultiThreadProgress<'a> {
657 last_update: Instant,
658 my_last_report_count: u64,
659 total_count: &'a AtomicU64,
660 report_delay_secs: u64,
661 first_caller: bool,
662 ultimate_count: u64,
663 start_time: Instant,
664}
665
666impl<'a> MultiThreadProgress<'a> {
667 fn new(total_count: &'a AtomicU64, report_delay_secs: u64, ultimate_count: u64) -> Self {
668 Self {
669 last_update: Instant::now(),
670 my_last_report_count: 0,
671 total_count,
672 report_delay_secs,
673 first_caller: false,
674 ultimate_count,
675 start_time: Instant::now(),
676 }
677 }
678 fn report(&mut self, my_current_count: u64) {
679 let now = Instant::now();
680 if now.duration_since(self.last_update).as_secs() >= self.report_delay_secs {
681 let my_total_newly_processed_slots_since_last_report =
682 my_current_count - self.my_last_report_count;
683
684 self.my_last_report_count = my_current_count;
685 let previous_total_processed_slots_across_all_threads = self.total_count.fetch_add(
686 my_total_newly_processed_slots_since_last_report,
687 Ordering::Relaxed,
688 );
689 self.first_caller =
690 self.first_caller || 0 == previous_total_processed_slots_across_all_threads;
691 if self.first_caller {
692 let total = previous_total_processed_slots_across_all_threads
693 + my_total_newly_processed_slots_since_last_report;
694 info!(
695 "generating index: {}/{} slots... ({}/s)",
696 total,
697 self.ultimate_count,
698 total / self.start_time.elapsed().as_secs().max(1),
699 );
700 }
701 self.last_update = now;
702 }
703 }
704}
705
706pub type AtomicAccountsFileId = AtomicU32;
708pub type AccountsFileId = u32;
709
710type AccountSlots = HashMap<Pubkey, IntSet<Slot>>;
711type SlotOffsets = IntMap<Slot, IntSet<Offset>>;
712type ReclaimResult = (AccountSlots, SlotOffsets);
713type PubkeysRemovedFromAccountsIndex = HashSet<Pubkey>;
714type ShrinkCandidates = IntSet<Slot>;
715
716#[derive(Clone, Copy, Debug, PartialEq, Eq)]
721pub enum LoadHint {
722 FixedMaxRoot,
730 FixedMaxRootDoNotPopulateReadCache,
732 Unspecified,
736}
737
738#[derive(Debug)]
739pub enum LoadedAccountAccessor<'a> {
740 Stored(Option<(Arc<AccountStorageEntry>, usize)>),
743 Cached(Option<Cow<'a, Arc<CachedAccount>>>),
745}
746
747impl LoadedAccountAccessor<'_> {
748 fn check_and_get_loaded_account_shared_data(&mut self) -> AccountSharedData {
749 match self {
753 LoadedAccountAccessor::Stored(Some((maybe_storage_entry, offset))) => {
754 maybe_storage_entry
760 .accounts
761 .get_account_shared_data(*offset)
762 .expect(
763 "If a storage entry was found in the storage map, it must not have been \
764 reset yet",
765 )
766 }
767 _ => self.check_and_get_loaded_account(|loaded_account| loaded_account.take_account()),
768 }
769 }
770
771 fn check_and_get_loaded_account<T>(
772 &mut self,
773 callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
774 ) -> T {
775 match self {
779 LoadedAccountAccessor::Cached(None) | LoadedAccountAccessor::Stored(None) => {
780 panic!(
781 "Should have already been taken care of when creating this \
782 LoadedAccountAccessor"
783 );
784 }
785 LoadedAccountAccessor::Cached(Some(_cached_account)) => {
786 self.get_loaded_account(callback).unwrap()
789 }
790 LoadedAccountAccessor::Stored(Some(_maybe_storage_entry)) => {
791 self.get_loaded_account(callback).expect(
797 "If a storage entry was found in the storage map, it must not have been reset \
798 yet",
799 )
800 }
801 }
802 }
803
804 fn get_loaded_account<T>(
805 &mut self,
806 mut callback: impl for<'local> FnMut(LoadedAccount<'local>) -> T,
807 ) -> Option<T> {
808 match self {
809 LoadedAccountAccessor::Cached(cached_account) => {
810 let cached_account = cached_account.take().expect(
811 "Cache flushed/purged should be handled before trying to fetch account",
812 );
813 Some(callback(LoadedAccount::Cached(cached_account)))
814 }
815 LoadedAccountAccessor::Stored(maybe_storage_entry) => {
816 maybe_storage_entry
820 .as_ref()
821 .and_then(|(storage_entry, offset)| {
822 storage_entry
823 .accounts
824 .get_stored_account_callback(*offset, |account| {
825 callback(LoadedAccount::Stored(account))
826 })
827 })
828 }
829 }
830 }
831}
832
833pub enum LoadedAccount<'a> {
834 Stored(StoredAccountInfo<'a>),
835 Cached(Cow<'a, Arc<CachedAccount>>),
836}
837
838impl LoadedAccount<'_> {
839 pub fn pubkey(&self) -> &Pubkey {
840 match self {
841 LoadedAccount::Stored(stored_account) => stored_account.pubkey(),
842 LoadedAccount::Cached(cached_account) => cached_account.pubkey(),
843 }
844 }
845
846 pub fn take_account(&self) -> AccountSharedData {
847 match self {
848 LoadedAccount::Stored(stored_account) => stored_account.to_account_shared_data(),
849 LoadedAccount::Cached(cached_account) => match cached_account {
850 Cow::Owned(cached_account) => cached_account.account.clone(),
851 Cow::Borrowed(cached_account) => cached_account.account.clone(),
852 },
853 }
854 }
855
856 pub fn is_cached(&self) -> bool {
857 match self {
858 LoadedAccount::Stored(_) => false,
859 LoadedAccount::Cached(_) => true,
860 }
861 }
862
863 pub fn data_len(&self) -> usize {
865 self.data().len()
866 }
867}
868
869impl ReadableAccount for LoadedAccount<'_> {
870 fn lamports(&self) -> u64 {
871 match self {
872 LoadedAccount::Stored(stored_account) => stored_account.lamports(),
873 LoadedAccount::Cached(cached_account) => cached_account.account.lamports(),
874 }
875 }
876 fn data(&self) -> &[u8] {
877 match self {
878 LoadedAccount::Stored(stored_account) => stored_account.data(),
879 LoadedAccount::Cached(cached_account) => cached_account.account.data(),
880 }
881 }
882 fn owner(&self) -> &Pubkey {
883 match self {
884 LoadedAccount::Stored(stored_account) => stored_account.owner(),
885 LoadedAccount::Cached(cached_account) => cached_account.account.owner(),
886 }
887 }
888 fn executable(&self) -> bool {
889 match self {
890 LoadedAccount::Stored(stored_account) => stored_account.executable(),
891 LoadedAccount::Cached(cached_account) => cached_account.account.executable(),
892 }
893 }
894 fn rent_epoch(&self) -> Epoch {
895 match self {
896 LoadedAccount::Stored(stored_account) => stored_account.rent_epoch(),
897 LoadedAccount::Cached(cached_account) => cached_account.account.rent_epoch(),
898 }
899 }
900 fn to_account_shared_data(&self) -> AccountSharedData {
901 self.take_account()
902 }
903}
904
905#[derive(Default)]
906struct CleanKeyTimings {
907 collect_delta_keys_us: u64,
908 delta_insert_us: u64,
909 dirty_store_processing_us: u64,
910 delta_key_count: u64,
911 dirty_pubkeys_count: u64,
912 oldest_dirty_slot: Slot,
913 dirty_ancient_stores: usize,
915}
916
917#[derive(Debug)]
919pub struct AccountStorageEntry {
920 pub(crate) id: AccountsFileId,
921
922 pub(crate) slot: Slot,
923
924 pub accounts: AccountsFile,
926
927 count_and_status: SeqLock<(usize, AccountStorageStatus)>,
933
934 alive_bytes: AtomicUsize,
935
936 zero_lamport_single_ref_offsets: RwLock<IntSet<Offset>>,
947
948 obsolete_accounts: RwLock<Vec<(Offset, usize, Slot)>>,
957}
958
959impl AccountStorageEntry {
960 pub fn new(
961 path: &Path,
962 slot: Slot,
963 id: AccountsFileId,
964 file_size: u64,
965 provider: AccountsFileProvider,
966 ) -> Self {
967 let tail = AccountsFile::file_name(slot, id);
968 let path = Path::new(path).join(tail);
969 let accounts = provider.new_writable(path, file_size);
970
971 Self {
972 id,
973 slot,
974 accounts,
975 count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
976 alive_bytes: AtomicUsize::new(0),
977 zero_lamport_single_ref_offsets: RwLock::default(),
978 obsolete_accounts: RwLock::default(),
979 }
980 }
981
982 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
984 fn reopen_as_readonly(&self, storage_access: StorageAccess) -> Option<Self> {
985 if storage_access != StorageAccess::File {
986 return None;
988 }
989
990 let count_and_status = self.count_and_status.lock_write();
991 self.accounts.reopen_as_readonly().map(|accounts| Self {
992 id: self.id,
993 slot: self.slot,
994 count_and_status: SeqLock::new(*count_and_status),
995 alive_bytes: AtomicUsize::new(self.alive_bytes()),
996 accounts,
997 zero_lamport_single_ref_offsets: RwLock::new(
998 self.zero_lamport_single_ref_offsets.read().unwrap().clone(),
999 ),
1000 obsolete_accounts: RwLock::new(self.obsolete_accounts.read().unwrap().clone()),
1001 })
1002 }
1003
1004 pub fn new_existing(slot: Slot, id: AccountsFileId, accounts: AccountsFile) -> Self {
1005 Self {
1006 id,
1007 slot,
1008 accounts,
1009 count_and_status: SeqLock::new((0, AccountStorageStatus::Available)),
1010 alive_bytes: AtomicUsize::new(0),
1011 zero_lamport_single_ref_offsets: RwLock::default(),
1012 obsolete_accounts: RwLock::default(),
1013 }
1014 }
1015
1016 pub fn set_status(&self, mut status: AccountStorageStatus) {
1017 let mut count_and_status = self.count_and_status.lock_write();
1018
1019 let count = count_and_status.0;
1020
1021 if status == AccountStorageStatus::Full && count == 0 {
1022 self.accounts.reset();
1031 status = AccountStorageStatus::Available;
1032 }
1033
1034 *count_and_status = (count, status);
1035 }
1036
1037 pub fn status(&self) -> AccountStorageStatus {
1038 self.count_and_status.read().1
1039 }
1040
1041 pub fn count(&self) -> usize {
1042 self.count_and_status.read().0
1043 }
1044
1045 pub fn alive_bytes(&self) -> usize {
1046 self.alive_bytes.load(Ordering::Acquire)
1047 }
1048
1049 pub fn mark_accounts_obsolete(
1051 &self,
1052 newly_obsolete_accounts: impl ExactSizeIterator<Item = (Offset, usize)>,
1053 slot: Slot,
1054 ) {
1055 let mut obsolete_accounts_list = self.obsolete_accounts.write().unwrap();
1056 obsolete_accounts_list.reserve(newly_obsolete_accounts.len());
1057
1058 for (offset, data_len) in newly_obsolete_accounts {
1059 obsolete_accounts_list.push((offset, data_len, slot));
1060 }
1061 }
1062
1063 pub fn get_obsolete_accounts(&self, slot: Option<Slot>) -> Vec<(Offset, usize)> {
1067 self.obsolete_accounts
1068 .read()
1069 .unwrap()
1070 .iter()
1071 .filter(|(_, _, obsolete_slot)| slot.is_none_or(|s| *obsolete_slot <= s))
1072 .map(|(offset, data_len, _)| (*offset, *data_len))
1073 .collect()
1074 }
1075
1076 pub fn get_obsolete_bytes(&self, slot: Option<Slot>) -> usize {
1080 let obsolete_accounts = self.obsolete_accounts.read().unwrap();
1081 let obsolete_bytes = obsolete_accounts
1082 .iter()
1083 .filter(|(_, _, obsolete_slot)| slot.is_none_or(|s| *obsolete_slot <= s))
1084 .map(|(offset, data_len, _)| {
1085 self.accounts
1086 .calculate_stored_size(*data_len)
1087 .min(self.accounts.len() - offset)
1088 })
1089 .sum();
1090 obsolete_bytes
1091 }
1092
1093 fn insert_zero_lamport_single_ref_account_offset(&self, offset: usize) -> bool {
1096 let mut zero_lamport_single_ref_offsets =
1097 self.zero_lamport_single_ref_offsets.write().unwrap();
1098 zero_lamport_single_ref_offsets.insert(offset)
1099 }
1100
1101 fn batch_insert_zero_lamport_single_ref_account_offsets(&self, offsets: &[Offset]) -> u64 {
1104 let mut zero_lamport_single_ref_offsets =
1105 self.zero_lamport_single_ref_offsets.write().unwrap();
1106 let mut count = 0;
1107 for offset in offsets {
1108 if zero_lamport_single_ref_offsets.insert(*offset) {
1109 count += 1;
1110 }
1111 }
1112 count
1113 }
1114
1115 fn num_zero_lamport_single_ref_accounts(&self) -> usize {
1117 self.zero_lamport_single_ref_offsets.read().unwrap().len()
1118 }
1119
1120 fn alive_bytes_exclude_zero_lamport_single_ref_accounts(&self) -> usize {
1122 let zero_lamport_dead_bytes = self
1123 .accounts
1124 .dead_bytes_due_to_zero_lamport_single_ref(self.num_zero_lamport_single_ref_accounts());
1125 self.alive_bytes().saturating_sub(zero_lamport_dead_bytes)
1126 }
1127
1128 pub fn written_bytes(&self) -> u64 {
1129 self.accounts.len() as u64
1130 }
1131
1132 pub fn capacity(&self) -> u64 {
1133 self.accounts.capacity()
1134 }
1135
1136 pub fn has_accounts(&self) -> bool {
1137 self.count() > 0
1138 }
1139
1140 pub fn slot(&self) -> Slot {
1141 self.slot
1142 }
1143
1144 pub fn id(&self) -> AccountsFileId {
1145 self.id
1146 }
1147
1148 pub fn flush(&self) -> Result<(), AccountsFileError> {
1149 self.accounts.flush()
1150 }
1151
1152 fn add_accounts(&self, num_accounts: usize, num_bytes: usize) {
1153 let mut count_and_status = self.count_and_status.lock_write();
1154 *count_and_status = (count_and_status.0 + num_accounts, count_and_status.1);
1155 self.alive_bytes.fetch_add(num_bytes, Ordering::Release);
1156 }
1157
1158 fn remove_accounts(&self, num_bytes: usize, num_accounts: usize) -> usize {
1160 let mut count_and_status = self.count_and_status.lock_write();
1161 let (mut count, mut status) = *count_and_status;
1162
1163 if count == num_accounts && status == AccountStorageStatus::Full {
1164 self.accounts.reset();
1176 status = AccountStorageStatus::Available;
1177 }
1178
1179 assert!(
1182 count >= num_accounts,
1183 "double remove of account in slot: {}/store: {}!!",
1184 self.slot(),
1185 self.id(),
1186 );
1187
1188 self.alive_bytes.fetch_sub(num_bytes, Ordering::Release);
1189 count = count.saturating_sub(num_accounts);
1190 *count_and_status = (count, status);
1191 count
1192 }
1193
1194 pub fn path(&self) -> &Path {
1196 self.accounts.path()
1197 }
1198}
1199
1200pub fn get_temp_accounts_paths(count: u32) -> io::Result<(Vec<TempDir>, Vec<PathBuf>)> {
1201 let temp_dirs: io::Result<Vec<TempDir>> = (0..count).map(|_| TempDir::new()).collect();
1202 let temp_dirs = temp_dirs?;
1203
1204 let paths: io::Result<Vec<_>> = temp_dirs
1205 .iter()
1206 .map(|temp_dir| {
1207 utils::create_accounts_run_and_snapshot_dirs(temp_dir)
1208 .map(|(run_dir, _snapshot_dir)| run_dir)
1209 })
1210 .collect();
1211 let paths = paths?;
1212 Ok((temp_dirs, paths))
1213}
1214
1215#[derive(Default, Debug)]
1216struct CleaningInfo {
1217 slot_list: SlotList<AccountInfo>,
1218 ref_count: u64,
1219 might_contain_zero_lamport_entry: bool,
1223}
1224
1225type CleaningCandidates = (Box<[RwLock<HashMap<Pubkey, CleaningInfo>>]>, Option<Slot>);
1231
1232#[derive(Debug, Default)]
1236struct RemoveUnrootedSlotsSynchronization {
1237 slots_under_contention: Mutex<IntSet<Slot>>,
1239 signal: Condvar,
1240}
1241
1242type AccountInfoAccountsIndex = AccountsIndex<AccountInfo, AccountInfo>;
1243
1244#[derive(Debug)]
1246pub struct AccountsDb {
1247 pub accounts_index: AccountInfoAccountsIndex,
1249
1250 pub ancient_append_vec_offset: Option<i64>,
1253 pub ancient_storage_ideal_size: u64,
1254 pub max_ancient_storages: usize,
1255 pub skip_initial_hash_calc: bool,
1257
1258 pub storage: AccountStorage,
1259
1260 pub accounts_cache: AccountsCache,
1261
1262 write_cache_limit_bytes: Option<u64>,
1263
1264 read_only_accounts_cache: ReadOnlyAccountsCache,
1265
1266 pub next_id: AtomicAccountsFileId,
1268
1269 pub shrink_candidate_slots: Mutex<ShrinkCandidates>,
1271
1272 pub write_version: AtomicU64,
1273
1274 pub paths: Vec<PathBuf>,
1276
1277 base_working_path: PathBuf,
1279 #[allow(dead_code)]
1281 base_working_temp_dir: Option<TempDir>,
1282
1283 shrink_paths: Vec<PathBuf>,
1284
1285 #[allow(dead_code)]
1287 pub temp_paths: Option<Vec<TempDir>>,
1288
1289 file_size: u64,
1291
1292 pub thread_pool_foreground: ThreadPool,
1294 pub thread_pool_background: ThreadPool,
1296 pub num_hash_threads: Option<NonZeroUsize>,
1298
1299 pub stats: AccountsStats,
1300
1301 clean_accounts_stats: CleanAccountsStats,
1302
1303 external_purge_slots_stats: PurgeStats,
1305
1306 pub shrink_stats: ShrinkStats,
1307
1308 pub(crate) shrink_ancient_stats: ShrinkAncientStats,
1309
1310 pub account_indexes: AccountSecondaryIndexes,
1311
1312 uncleaned_pubkeys: DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>>,
1316
1317 #[cfg(test)]
1318 load_delay: u64,
1319
1320 #[cfg(test)]
1321 load_limit: AtomicU64,
1322
1323 is_bank_drop_callback_enabled: AtomicBool,
1325
1326 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization,
1330
1331 shrink_ratio: AccountShrinkThreshold,
1332
1333 dirty_stores: DashMap<Slot, Arc<AccountStorageEntry>, BuildNoHashHasher<Slot>>,
1337
1338 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet<(Slot, Pubkey)>,
1341
1342 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1344
1345 pub(crate) active_stats: ActiveStats,
1346
1347 pub verify_accounts_hash_in_bg: VerifyAccountsHashInBackground,
1348
1349 pub log_dead_slots: AtomicBool,
1352
1353 exhaustively_verify_refcounts: bool,
1355
1356 accounts_file_provider: AccountsFileProvider,
1358
1359 storage_access: StorageAccess,
1361
1362 scan_filter_for_shrinking: ScanFilter,
1364
1365 pub partitioned_epoch_rewards_config: PartitionedEpochRewardsConfig,
1368
1369 latest_full_snapshot_slot: SeqLock<Option<Slot>>,
1372
1373 pub(crate) best_ancient_slots_to_shrink: RwLock<VecDeque<(Slot, u64)>>,
1379
1380 pub mark_obsolete_accounts: bool,
1384}
1385
1386pub fn quarter_thread_count() -> usize {
1387 std::cmp::max(2, num_cpus::get() / 4)
1388}
1389
1390pub fn default_num_hash_threads() -> NonZeroUsize {
1392 let num_threads = (num_cpus::get() / 8).clamp(2, 6);
1394 NonZeroUsize::new(num_threads).unwrap()
1395}
1396pub fn default_num_foreground_threads() -> usize {
1397 get_thread_count()
1398}
1399
1400#[cfg(feature = "frozen-abi")]
1401impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
1402 fn example() -> Self {
1403 let accounts_db = AccountsDb::new_single_for_tests();
1404 let key = Pubkey::default();
1405 let some_data_len = 5;
1406 let some_slot: Slot = 0;
1407 let account = AccountSharedData::new(1, some_data_len, &key);
1408 accounts_db.store_for_tests((some_slot, [(&key, &account)].as_slice()));
1409 accounts_db.add_root_and_flush_write_cache(0);
1410 accounts_db
1411 }
1412}
1413
1414impl AccountsDb {
1415 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1422 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO: usize = 3_000_000_000;
1423 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1424 const DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI: usize = 3_100_000_000;
1425
1426 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
1428 const DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE: usize = 8;
1429
1430 pub fn default_for_tests() -> Self {
1431 Self::new_single_for_tests()
1432 }
1433
1434 pub fn new_single_for_tests() -> Self {
1435 AccountsDb::new_for_tests(Vec::new())
1436 }
1437
1438 pub fn new_single_for_tests_with_provider(file_provider: AccountsFileProvider) -> Self {
1439 AccountsDb::new_for_tests_with_provider(Vec::new(), file_provider)
1440 }
1441
1442 pub fn new_for_tests(paths: Vec<PathBuf>) -> Self {
1443 Self::new_for_tests_with_provider(paths, AccountsFileProvider::default())
1444 }
1445
1446 fn new_for_tests_with_provider(
1447 paths: Vec<PathBuf>,
1448 accounts_file_provider: AccountsFileProvider,
1449 ) -> Self {
1450 let mut db = AccountsDb::new_with_config(
1451 paths,
1452 Some(ACCOUNTS_DB_CONFIG_FOR_TESTING),
1453 None,
1454 Arc::default(),
1455 );
1456 db.accounts_file_provider = accounts_file_provider;
1457 db
1458 }
1459
1460 pub fn new_with_config(
1461 paths: Vec<PathBuf>,
1462 accounts_db_config: Option<AccountsDbConfig>,
1463 accounts_update_notifier: Option<AccountsUpdateNotifier>,
1464 exit: Arc<AtomicBool>,
1465 ) -> Self {
1466 let accounts_db_config = accounts_db_config.unwrap_or_default();
1467 let accounts_index_config = accounts_db_config.index.unwrap_or_default();
1468 let accounts_index = AccountsIndex::new(&accounts_index_config, exit);
1469
1470 let base_working_path = accounts_db_config.base_working_path.clone();
1471 let (base_working_path, base_working_temp_dir) =
1472 if let Some(base_working_path) = base_working_path {
1473 (base_working_path, None)
1474 } else {
1475 let base_working_temp_dir = TempDir::new().unwrap();
1476 let base_working_path = base_working_temp_dir.path().to_path_buf();
1477 (base_working_path, Some(base_working_temp_dir))
1478 };
1479
1480 let (paths, temp_paths) = if paths.is_empty() {
1481 let (temp_dirs, temp_paths) = get_temp_accounts_paths(DEFAULT_NUM_DIRS).unwrap();
1484 (temp_paths, Some(temp_dirs))
1485 } else {
1486 (paths, None)
1487 };
1488
1489 let shrink_paths = accounts_db_config
1490 .shrink_paths
1491 .clone()
1492 .unwrap_or_else(|| paths.clone());
1493
1494 let read_cache_size = accounts_db_config.read_cache_limit_bytes.unwrap_or((
1495 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_LO,
1496 Self::DEFAULT_MAX_READ_ONLY_CACHE_DATA_SIZE_HI,
1497 ));
1498 let read_cache_evict_sample_size = accounts_db_config
1499 .read_cache_evict_sample_size
1500 .unwrap_or(Self::DEFAULT_READ_ONLY_CACHE_EVICT_SAMPLE_SIZE);
1501
1502 const ACCOUNTS_STACK_SIZE: usize = 8 * 1024 * 1024;
1505 let num_foreground_threads = accounts_db_config
1506 .num_foreground_threads
1507 .map(Into::into)
1508 .unwrap_or_else(default_num_foreground_threads);
1509 let thread_pool_foreground = rayon::ThreadPoolBuilder::new()
1510 .num_threads(num_foreground_threads)
1511 .thread_name(|i| format!("solAcctsDbFg{i:02}"))
1512 .stack_size(ACCOUNTS_STACK_SIZE)
1513 .build()
1514 .expect("new rayon threadpool");
1515
1516 let num_background_threads = accounts_db_config
1517 .num_background_threads
1518 .map(Into::into)
1519 .unwrap_or_else(quarter_thread_count);
1520 let thread_pool_background = rayon::ThreadPoolBuilder::new()
1521 .thread_name(|i| format!("solAcctsDbBg{i:02}"))
1522 .num_threads(num_background_threads)
1523 .build()
1524 .expect("new rayon threadpool");
1525
1526 let new = Self {
1527 accounts_index,
1528 paths,
1529 base_working_path,
1530 base_working_temp_dir,
1531 temp_paths,
1532 shrink_paths,
1533 skip_initial_hash_calc: accounts_db_config.skip_initial_hash_calc,
1534 ancient_append_vec_offset: accounts_db_config
1535 .ancient_append_vec_offset
1536 .or(ANCIENT_APPEND_VEC_DEFAULT_OFFSET),
1537 ancient_storage_ideal_size: accounts_db_config
1538 .ancient_storage_ideal_size
1539 .unwrap_or(DEFAULT_ANCIENT_STORAGE_IDEAL_SIZE),
1540 max_ancient_storages: accounts_db_config
1541 .max_ancient_storages
1542 .unwrap_or(DEFAULT_MAX_ANCIENT_STORAGES),
1543 account_indexes: accounts_db_config.account_indexes.unwrap_or_default(),
1544 shrink_ratio: accounts_db_config.shrink_ratio,
1545 accounts_update_notifier,
1546 read_only_accounts_cache: ReadOnlyAccountsCache::new(
1547 read_cache_size.0,
1548 read_cache_size.1,
1549 read_cache_evict_sample_size,
1550 ),
1551 write_cache_limit_bytes: accounts_db_config.write_cache_limit_bytes,
1552 partitioned_epoch_rewards_config: accounts_db_config.partitioned_epoch_rewards_config,
1553 exhaustively_verify_refcounts: accounts_db_config.exhaustively_verify_refcounts,
1554 storage_access: accounts_db_config.storage_access,
1555 scan_filter_for_shrinking: accounts_db_config.scan_filter_for_shrinking,
1556 thread_pool_foreground,
1557 thread_pool_background,
1558 num_hash_threads: accounts_db_config.num_hash_threads,
1559 verify_accounts_hash_in_bg: VerifyAccountsHashInBackground::default(),
1560 active_stats: ActiveStats::default(),
1561 storage: AccountStorage::default(),
1562 accounts_cache: AccountsCache::default(),
1563 uncleaned_pubkeys: DashMap::default(),
1564 next_id: AtomicAccountsFileId::new(0),
1565 shrink_candidate_slots: Mutex::new(ShrinkCandidates::default()),
1566 write_version: AtomicU64::new(0),
1567 file_size: DEFAULT_FILE_SIZE,
1568 external_purge_slots_stats: PurgeStats::default(),
1569 clean_accounts_stats: CleanAccountsStats::default(),
1570 shrink_stats: ShrinkStats::default(),
1571 shrink_ancient_stats: ShrinkAncientStats::default(),
1572 stats: AccountsStats::default(),
1573 #[cfg(test)]
1574 load_delay: u64::default(),
1575 #[cfg(test)]
1576 load_limit: AtomicU64::default(),
1577 is_bank_drop_callback_enabled: AtomicBool::default(),
1578 remove_unrooted_slots_synchronization: RemoveUnrootedSlotsSynchronization::default(),
1579 dirty_stores: DashMap::default(),
1580 zero_lamport_accounts_to_purge_after_full_snapshot: DashSet::default(),
1581 log_dead_slots: AtomicBool::new(true),
1582 accounts_file_provider: AccountsFileProvider::default(),
1583 latest_full_snapshot_slot: SeqLock::new(None),
1584 best_ancient_slots_to_shrink: RwLock::default(),
1585 mark_obsolete_accounts: accounts_db_config.mark_obsolete_accounts,
1586 };
1587
1588 {
1589 for path in new.paths.iter() {
1590 std::fs::create_dir_all(path).expect("Create directory failed.");
1591 }
1592 }
1593 new
1594 }
1595
1596 pub fn file_size(&self) -> u64 {
1597 self.file_size
1598 }
1599
1600 pub fn get_base_working_path(&self) -> PathBuf {
1602 self.base_working_path.clone()
1603 }
1604
1605 pub fn has_accounts_update_notifier(&self) -> bool {
1607 self.accounts_update_notifier.is_some()
1608 }
1609
1610 fn next_id(&self) -> AccountsFileId {
1611 let next_id = self.next_id.fetch_add(1, Ordering::AcqRel);
1612 assert!(
1613 next_id != AccountsFileId::MAX,
1614 "We've run out of storage ids!"
1615 );
1616 next_id
1617 }
1618
1619 fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
1620 AccountStorageEntry::new(
1621 path,
1622 slot,
1623 self.next_id(),
1624 size,
1625 self.accounts_file_provider,
1626 )
1627 }
1628
1629 fn collect_reclaims(
1633 &self,
1634 pubkey: &Pubkey,
1635 max_clean_root_inclusive: Option<Slot>,
1636 ancient_account_cleans: &AtomicU64,
1637 epoch_schedule: &EpochSchedule,
1638 pubkeys_removed_from_accounts_index: &Mutex<PubkeysRemovedFromAccountsIndex>,
1639 ) -> SlotList<AccountInfo> {
1640 let one_epoch_old = self.get_oldest_non_ancient_slot(epoch_schedule);
1641 let mut clean_rooted = Measure::start("clean_old_root-ms");
1642 let mut reclaims = Vec::new();
1643 let removed_from_index = self.accounts_index.clean_rooted_entries(
1644 pubkey,
1645 &mut reclaims,
1646 max_clean_root_inclusive,
1647 );
1648 if removed_from_index {
1649 pubkeys_removed_from_accounts_index
1650 .lock()
1651 .unwrap()
1652 .insert(*pubkey);
1653 }
1654 if !reclaims.is_empty() {
1655 let old_reclaims = reclaims
1657 .iter()
1658 .filter_map(|(slot, _)| (slot < &one_epoch_old).then_some(1))
1659 .sum();
1660 ancient_account_cleans.fetch_add(old_reclaims, Ordering::Relaxed);
1661 }
1662 clean_rooted.stop();
1663 self.clean_accounts_stats
1664 .clean_old_root_us
1665 .fetch_add(clean_rooted.as_us(), Ordering::Relaxed);
1666 reclaims
1667 }
1668
1669 fn clean_accounts_older_than_root(
1673 &self,
1674 reclaims: &SlotList<AccountInfo>,
1675 pubkeys_removed_from_accounts_index: &HashSet<Pubkey>,
1676 ) -> ReclaimResult {
1677 let mut measure = Measure::start("clean_old_root_reclaims");
1678
1679 let reclaim_result = self.handle_reclaims(
1680 (!reclaims.is_empty()).then(|| reclaims.iter()),
1681 None,
1682 pubkeys_removed_from_accounts_index,
1683 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
1684 MarkAccountsObsolete::No,
1685 );
1686 measure.stop();
1687 debug!("{measure}");
1688 self.clean_accounts_stats
1689 .clean_old_root_reclaim_us
1690 .fetch_add(measure.as_us(), Ordering::Relaxed);
1691 reclaim_result
1692 }
1693
1694 fn calc_delete_dependencies(
1699 &self,
1700 candidates: &[HashMap<Pubkey, CleaningInfo>],
1701 store_counts: &mut HashMap<Slot, (usize, HashSet<Pubkey>)>,
1702 min_slot: Option<Slot>,
1703 ) {
1704 let mut already_counted = IntSet::default();
1708 for (bin_index, bin) in candidates.iter().enumerate() {
1709 for (pubkey, cleaning_info) in bin.iter() {
1710 let slot_list = &cleaning_info.slot_list;
1711 let ref_count = &cleaning_info.ref_count;
1712 let mut failed_slot = None;
1713 let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count;
1714 if all_stores_being_deleted {
1715 let mut delete = true;
1716 for (slot, _account_info) in slot_list {
1717 if let Some(count) = store_counts.get(slot).map(|s| s.0) {
1718 debug!("calc_delete_dependencies() slot: {slot}, count len: {count}");
1719 if count == 0 {
1720 continue;
1722 }
1723 }
1724 failed_slot = Some(*slot);
1727 delete = false;
1728 break;
1729 }
1730 if delete {
1731 continue;
1733 }
1734 } else {
1735 debug!(
1737 "calc_delete_dependencies(), pubkey: {pubkey}, slot list len: {}, ref \
1738 count: {ref_count}, slot list: {slot_list:?}",
1739 slot_list.len(),
1740 );
1741 }
1742
1743 let mut pending_stores = IntSet::default();
1745 for (slot, _account_info) in slot_list {
1746 if !already_counted.contains(slot) {
1747 pending_stores.insert(*slot);
1748 }
1749 }
1750 while !pending_stores.is_empty() {
1751 let slot = pending_stores.iter().next().cloned().unwrap();
1752 if Some(slot) == min_slot {
1753 if let Some(failed_slot) = failed_slot.take() {
1754 info!(
1755 "calc_delete_dependencies, oldest slot is not able to be deleted \
1756 because of {pubkey} in slot {failed_slot}"
1757 );
1758 } else {
1759 info!(
1760 "calc_delete_dependencies, oldest slot is not able to be deleted \
1761 because of {pubkey}, slot list len: {}, ref count: {ref_count}",
1762 slot_list.len()
1763 );
1764 }
1765 }
1766
1767 pending_stores.remove(&slot);
1768 if !already_counted.insert(slot) {
1769 continue;
1770 }
1771 if let Some(store_count) = store_counts.remove(&slot) {
1773 let affected_pubkeys = &store_count.1;
1775 for key in affected_pubkeys {
1776 let candidates_bin_index =
1777 self.accounts_index.bin_calculator.bin_from_pubkey(key);
1778 let mut update_pending_stores =
1779 |bin: &HashMap<Pubkey, CleaningInfo>| {
1780 for (slot, _account_info) in &bin.get(key).unwrap().slot_list {
1781 if !already_counted.contains(slot) {
1782 pending_stores.insert(*slot);
1783 }
1784 }
1785 };
1786 if candidates_bin_index == bin_index {
1787 update_pending_stores(bin);
1788 } else {
1789 update_pending_stores(&candidates[candidates_bin_index]);
1790 }
1791 }
1792 }
1793 }
1794 }
1795 }
1796 }
1797
1798 #[must_use]
1799 pub fn purge_keys_exact<'a, C>(
1800 &'a self,
1801 pubkey_to_slot_set: impl Iterator<Item = &'a (Pubkey, C)>,
1802 ) -> (Vec<(Slot, AccountInfo)>, PubkeysRemovedFromAccountsIndex)
1803 where
1804 C: Contains<'a, Slot> + 'a,
1805 {
1806 let mut reclaims = Vec::new();
1807 let mut dead_keys = Vec::new();
1808
1809 let mut purge_exact_count = 0;
1810 let (_, purge_exact_us) = measure_us!(for (pubkey, slots_set) in pubkey_to_slot_set {
1811 purge_exact_count += 1;
1812 let is_empty = self
1813 .accounts_index
1814 .purge_exact(pubkey, slots_set, &mut reclaims);
1815 if is_empty {
1816 dead_keys.push(pubkey);
1817 }
1818 });
1819
1820 let (pubkeys_removed_from_accounts_index, handle_dead_keys_us) = measure_us!(self
1821 .accounts_index
1822 .handle_dead_keys(&dead_keys, &self.account_indexes));
1823
1824 self.stats
1825 .purge_exact_count
1826 .fetch_add(purge_exact_count, Ordering::Relaxed);
1827 self.stats
1828 .handle_dead_keys_us
1829 .fetch_add(handle_dead_keys_us, Ordering::Relaxed);
1830 self.stats
1831 .purge_exact_us
1832 .fetch_add(purge_exact_us, Ordering::Relaxed);
1833 (reclaims, pubkeys_removed_from_accounts_index)
1834 }
1835
1836 fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
1837 match (
1838 self.accounts_index.min_ongoing_scan_root(),
1839 proposed_clean_root,
1840 ) {
1841 (None, None) => None,
1842 (Some(min_scan_root), None) => Some(min_scan_root),
1843 (None, Some(proposed_clean_root)) => Some(proposed_clean_root),
1844 (Some(min_scan_root), Some(proposed_clean_root)) => {
1845 Some(std::cmp::min(min_scan_root, proposed_clean_root))
1846 }
1847 }
1848 }
1849
1850 fn get_oldest_non_ancient_slot(&self, epoch_schedule: &EpochSchedule) -> Slot {
1853 self.get_oldest_non_ancient_slot_from_slot(
1854 epoch_schedule,
1855 self.accounts_index.max_root_inclusive(),
1856 )
1857 }
1858
1859 fn get_oldest_non_ancient_slot_from_slot(
1862 &self,
1863 epoch_schedule: &EpochSchedule,
1864 max_root_inclusive: Slot,
1865 ) -> Slot {
1866 let mut result = max_root_inclusive;
1867 if let Some(offset) = self.ancient_append_vec_offset {
1868 result = Self::apply_offset_to_slot(result, offset);
1869 }
1870 result = Self::apply_offset_to_slot(
1871 result,
1872 -((epoch_schedule.slots_per_epoch as i64).saturating_sub(1)),
1873 );
1874 result.min(max_root_inclusive)
1875 }
1876
1877 fn collect_uncleaned_slots_up_to_slot(&self, max_slot_inclusive: Slot) -> Vec<Slot> {
1881 self.uncleaned_pubkeys
1882 .iter()
1883 .filter_map(|entry| {
1884 let slot = *entry.key();
1885 (slot <= max_slot_inclusive).then_some(slot)
1886 })
1887 .collect()
1888 }
1889
1890 fn remove_uncleaned_slots_up_to_slot_and_move_pubkeys(
1894 &self,
1895 max_slot_inclusive: Slot,
1896 candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
1897 ) {
1898 let uncleaned_slots = self.collect_uncleaned_slots_up_to_slot(max_slot_inclusive);
1899 for uncleaned_slot in uncleaned_slots.into_iter() {
1900 if let Some((_removed_slot, mut removed_pubkeys)) =
1901 self.uncleaned_pubkeys.remove(&uncleaned_slot)
1902 {
1903 removed_pubkeys.sort_by(|a, b| {
1906 self.accounts_index
1907 .bin_calculator
1908 .bin_from_pubkey(a)
1909 .cmp(&self.accounts_index.bin_calculator.bin_from_pubkey(b))
1910 });
1911 if let Some(first_removed_pubkey) = removed_pubkeys.first() {
1912 let mut prev_bin = self
1913 .accounts_index
1914 .bin_calculator
1915 .bin_from_pubkey(first_removed_pubkey);
1916 let mut candidates_bin = candidates[prev_bin].write().unwrap();
1917 for removed_pubkey in removed_pubkeys {
1918 let curr_bin = self
1919 .accounts_index
1920 .bin_calculator
1921 .bin_from_pubkey(&removed_pubkey);
1922 if curr_bin != prev_bin {
1923 candidates_bin = candidates[curr_bin].write().unwrap();
1924 prev_bin = curr_bin;
1925 }
1926 candidates_bin.insert(
1932 removed_pubkey,
1933 CleaningInfo {
1934 might_contain_zero_lamport_entry: true,
1935 ..Default::default()
1936 },
1937 );
1938 }
1939 }
1940 }
1941 }
1942 }
1943
1944 fn count_pubkeys(candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>]) -> u64 {
1945 candidates
1946 .iter()
1947 .map(|x| x.read().unwrap().len())
1948 .sum::<usize>() as u64
1949 }
1950
1951 fn construct_candidate_clean_keys(
1957 &self,
1958 max_clean_root_inclusive: Option<Slot>,
1959 is_startup: bool,
1960 timings: &mut CleanKeyTimings,
1961 epoch_schedule: &EpochSchedule,
1962 ) -> CleaningCandidates {
1963 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
1964 let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
1965 let max_root_inclusive = self.accounts_index.max_root_inclusive();
1966 let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);
1967 let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
1968 let mut min_dirty_slot = None::<u64>;
1971 self.dirty_stores.retain(|slot, store| {
1972 if *slot > max_slot_inclusive {
1973 true
1974 } else {
1975 min_dirty_slot = min_dirty_slot.map(|min| min.min(*slot)).or(Some(*slot));
1976 dirty_stores.push((*slot, store.clone()));
1977 false
1978 }
1979 });
1980 let dirty_stores_len = dirty_stores.len();
1981 let num_bins = self.accounts_index.bins();
1982 let candidates: Box<_> =
1983 std::iter::repeat_with(|| RwLock::new(HashMap::<Pubkey, CleaningInfo>::new()))
1984 .take(num_bins)
1985 .collect();
1986
1987 let insert_candidate = |pubkey, is_zero_lamport| {
1988 let index = self.accounts_index.bin_calculator.bin_from_pubkey(&pubkey);
1989 let mut candidates_bin = candidates[index].write().unwrap();
1990 candidates_bin
1991 .entry(pubkey)
1992 .or_default()
1993 .might_contain_zero_lamport_entry |= is_zero_lamport;
1994 };
1995
1996 let dirty_ancient_stores = AtomicUsize::default();
1997 let mut dirty_store_routine = || {
1998 let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads()));
1999 let oldest_dirty_slots: Vec<u64> = dirty_stores
2000 .par_chunks(chunk_size)
2001 .map(|dirty_store_chunk| {
2002 let mut oldest_dirty_slot = max_slot_inclusive.saturating_add(1);
2003 dirty_store_chunk.iter().for_each(|(slot, store)| {
2004 if *slot < oldest_non_ancient_slot {
2005 dirty_ancient_stores.fetch_add(1, Ordering::Relaxed);
2006 }
2007 oldest_dirty_slot = oldest_dirty_slot.min(*slot);
2008
2009 store
2010 .accounts
2011 .scan_accounts_without_data(|_offset, account| {
2012 let pubkey = *account.pubkey();
2013 let is_zero_lamport = account.is_zero_lamport();
2014 insert_candidate(pubkey, is_zero_lamport);
2015 })
2016 .expect("must scan accounts storage");
2017 });
2018 oldest_dirty_slot
2019 })
2020 .collect();
2021 timings.oldest_dirty_slot = *oldest_dirty_slots
2022 .iter()
2023 .min()
2024 .unwrap_or(&max_slot_inclusive.saturating_add(1));
2025 };
2026
2027 if is_startup {
2028 dirty_store_routine();
2030 } else {
2031 self.thread_pool_background.install(|| {
2032 dirty_store_routine();
2033 });
2034 }
2035 timings.dirty_pubkeys_count = Self::count_pubkeys(&candidates);
2036 trace!(
2037 "dirty_stores.len: {} pubkeys.len: {}",
2038 dirty_stores_len,
2039 timings.dirty_pubkeys_count,
2040 );
2041 dirty_store_processing_time.stop();
2042 timings.dirty_store_processing_us += dirty_store_processing_time.as_us();
2043 timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed);
2044
2045 let mut collect_delta_keys = Measure::start("key_create");
2046 self.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(max_slot_inclusive, &candidates);
2047 collect_delta_keys.stop();
2048 timings.collect_delta_keys_us += collect_delta_keys.as_us();
2049
2050 timings.delta_key_count = Self::count_pubkeys(&candidates);
2051
2052 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2056 assert!(
2057 latest_full_snapshot_slot.is_some()
2058 || self
2059 .zero_lamport_accounts_to_purge_after_full_snapshot
2060 .is_empty(),
2061 "if snapshots are disabled, then zero_lamport_accounts_to_purge_later should always \
2062 be empty"
2063 );
2064 if let Some(latest_full_snapshot_slot) = latest_full_snapshot_slot {
2065 self.zero_lamport_accounts_to_purge_after_full_snapshot
2066 .retain(|(slot, pubkey)| {
2067 let is_candidate_for_clean =
2068 max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot;
2069 if is_candidate_for_clean {
2070 insert_candidate(*pubkey, true);
2071 }
2072 !is_candidate_for_clean
2073 });
2074 }
2075
2076 (candidates, min_dirty_slot)
2077 }
2078
2079 pub fn clean_accounts_for_tests(&self) {
2081 self.clean_accounts(None, false, &EpochSchedule::default())
2082 }
2083
2084 fn exhaustively_verify_refcounts(&self, max_slot_inclusive: Option<Slot>) {
2089 let max_slot_inclusive =
2090 max_slot_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
2091 info!("exhaustively verifying refcounts as of slot: {max_slot_inclusive}");
2092 let pubkey_refcount = DashMap::<Pubkey, Vec<Slot>>::default();
2093 let mut storages = self.storage.all_storages();
2094 storages.retain(|s| s.slot() <= max_slot_inclusive);
2095 storages.par_iter().for_each_init(
2097 || Box::new(append_vec::new_scan_accounts_reader()),
2098 |reader, storage| {
2099 let slot = storage.slot();
2100 storage
2101 .accounts
2102 .scan_accounts(reader.as_mut(), |_offset, account| {
2103 let pk = account.pubkey();
2104 match pubkey_refcount.entry(*pk) {
2105 dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
2106 if !occupied_entry.get().iter().any(|s| s == &slot) {
2107 occupied_entry.get_mut().push(slot);
2108 }
2109 }
2110 dashmap::mapref::entry::Entry::Vacant(vacant_entry) => {
2111 vacant_entry.insert(vec![slot]);
2112 }
2113 }
2114 })
2115 .expect("must scan accounts storage")
2116 },
2117 );
2118 let total = pubkey_refcount.len();
2119 let failed = AtomicBool::default();
2120 let threads = quarter_thread_count();
2121 let per_batch = total / threads;
2122 (0..=threads).into_par_iter().for_each(|attempt| {
2123 pubkey_refcount
2124 .iter()
2125 .skip(attempt * per_batch)
2126 .take(per_batch)
2127 .for_each(|entry| {
2128 if failed.load(Ordering::Relaxed) {
2129 return;
2130 }
2131
2132 self.accounts_index
2133 .get_and_then(entry.key(), |index_entry| {
2134 if let Some(index_entry) = index_entry {
2135 match (index_entry.ref_count() as usize).cmp(&entry.value().len()) {
2136 std::cmp::Ordering::Equal => {
2137 }
2139 std::cmp::Ordering::Greater => {
2140 let slot_list = index_entry.slot_list.read().unwrap();
2141 let num_too_new = slot_list
2142 .iter()
2143 .filter(|(slot, _)| slot > &max_slot_inclusive)
2144 .count();
2145
2146 if ((index_entry.ref_count() as usize) - num_too_new)
2147 > entry.value().len()
2148 {
2149 failed.store(true, Ordering::Relaxed);
2150 error!(
2151 "exhaustively_verify_refcounts: {} refcount too \
2152 large: {}, should be: {}, {:?}, {:?}, too_new: \
2153 {num_too_new}",
2154 entry.key(),
2155 index_entry.ref_count(),
2156 entry.value().len(),
2157 *entry.value(),
2158 slot_list
2159 );
2160 }
2161 }
2162 std::cmp::Ordering::Less => {
2163 error!(
2164 "exhaustively_verify_refcounts: {} refcount too \
2165 small: {}, should be: {}, {:?}, {:?}",
2166 entry.key(),
2167 index_entry.ref_count(),
2168 entry.value().len(),
2169 *entry.value(),
2170 index_entry.slot_list.read().unwrap()
2171 );
2172 }
2173 }
2174 };
2175 (false, ())
2176 });
2177 });
2178 });
2179 if failed.load(Ordering::Relaxed) {
2180 panic!("exhaustively_verify_refcounts failed");
2181 }
2182 }
2183
2184 pub fn clean_accounts(
2189 &self,
2190 max_clean_root_inclusive: Option<Slot>,
2191 is_startup: bool,
2192 epoch_schedule: &EpochSchedule,
2193 ) {
2194 if self.exhaustively_verify_refcounts {
2195 if is_startup {
2197 self.exhaustively_verify_refcounts(max_clean_root_inclusive);
2198 } else {
2199 self.thread_pool_background
2201 .install(|| self.exhaustively_verify_refcounts(max_clean_root_inclusive));
2202 }
2203 }
2204
2205 let _guard = self.active_stats.activate(ActiveStatItem::Clean);
2206
2207 let ancient_account_cleans = AtomicU64::default();
2208 let purges_old_accounts_count = AtomicU64::default();
2209
2210 let mut measure_all = Measure::start("clean_accounts");
2211 let max_clean_root_inclusive = self.max_clean_root(max_clean_root_inclusive);
2212
2213 self.report_store_stats();
2214
2215 let active_guard = self
2216 .active_stats
2217 .activate(ActiveStatItem::CleanConstructCandidates);
2218 let mut measure_construct_candidates = Measure::start("construct_candidates");
2219 let mut key_timings = CleanKeyTimings::default();
2220 let (mut candidates, min_dirty_slot) = self.construct_candidate_clean_keys(
2221 max_clean_root_inclusive,
2222 is_startup,
2223 &mut key_timings,
2224 epoch_schedule,
2225 );
2226 measure_construct_candidates.stop();
2227 drop(active_guard);
2228
2229 let num_candidates = Self::count_pubkeys(&candidates);
2230 let found_not_zero_accum = AtomicU64::new(0);
2231 let not_found_on_fork_accum = AtomicU64::new(0);
2232 let missing_accum = AtomicU64::new(0);
2233 let useful_accum = AtomicU64::new(0);
2234 let reclaims: SlotList<AccountInfo> = Vec::with_capacity(num_candidates as usize);
2235 let reclaims = Mutex::new(reclaims);
2236 let pubkeys_removed_from_accounts_index: PubkeysRemovedFromAccountsIndex = HashSet::new();
2237 let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);
2238 let do_clean_scan = || {
2240 candidates.par_iter().for_each(|candidates_bin| {
2241 let mut found_not_zero = 0;
2242 let mut not_found_on_fork = 0;
2243 let mut missing = 0;
2244 let mut useful = 0;
2245 let mut purges_old_accounts_local = 0;
2246 let mut candidates_bin = candidates_bin.write().unwrap();
2247 candidates_bin.retain(|candidate_pubkey, candidate_info| {
2252 let mut should_collect_reclaims = false;
2253 self.accounts_index.scan(
2254 iter::once(candidate_pubkey),
2255 |_candidate_pubkey, slot_list_and_ref_count, _entry| {
2256 let mut useless = true;
2257 if let Some((slot_list, ref_count)) = slot_list_and_ref_count {
2258 let index_in_slot_list = self.accounts_index.latest_slot(
2260 None,
2261 slot_list,
2262 max_clean_root_inclusive,
2263 );
2264
2265 match index_in_slot_list {
2266 Some(index_in_slot_list) => {
2267 let (slot, account_info) = &slot_list[index_in_slot_list];
2269 if account_info.is_zero_lamport() {
2270 useless = false;
2271 candidate_info.slot_list =
2275 self.accounts_index.get_rooted_entries(
2276 slot_list,
2277 max_clean_root_inclusive,
2278 );
2279 candidate_info.ref_count = ref_count;
2280 } else {
2281 found_not_zero += 1;
2282 }
2283
2284 if slot_list.len() > 1
2287 && *slot
2288 <= max_clean_root_inclusive.unwrap_or(Slot::MAX)
2289 {
2290 should_collect_reclaims = true;
2291 purges_old_accounts_local += 1;
2292 useless = false;
2293 }
2294 }
2295 None => {
2296 not_found_on_fork += 1;
2303 should_collect_reclaims = true;
2304 purges_old_accounts_local += 1;
2305 useless = false;
2306 }
2307 }
2308 } else {
2309 missing += 1;
2310 }
2311 if !useless {
2312 useful += 1;
2313 }
2314 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2315 },
2316 None,
2317 false,
2318 if candidate_info.might_contain_zero_lamport_entry {
2319 ScanFilter::All
2320 } else {
2321 self.scan_filter_for_shrinking
2322 },
2323 );
2324 if should_collect_reclaims {
2325 let reclaims_new = self.collect_reclaims(
2326 candidate_pubkey,
2327 max_clean_root_inclusive,
2328 &ancient_account_cleans,
2329 epoch_schedule,
2330 &pubkeys_removed_from_accounts_index,
2331 );
2332 if !reclaims_new.is_empty() {
2333 reclaims.lock().unwrap().extend(reclaims_new);
2334 }
2335 }
2336 !candidate_info.slot_list.is_empty()
2337 });
2338 found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed);
2339 not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed);
2340 missing_accum.fetch_add(missing, Ordering::Relaxed);
2341 useful_accum.fetch_add(useful, Ordering::Relaxed);
2342 purges_old_accounts_count.fetch_add(purges_old_accounts_local, Ordering::Relaxed);
2343 });
2344 };
2345 let active_guard = self
2346 .active_stats
2347 .activate(ActiveStatItem::CleanScanCandidates);
2348 let mut accounts_scan = Measure::start("accounts_scan");
2349 if is_startup {
2350 do_clean_scan();
2351 } else {
2352 self.thread_pool_background.install(do_clean_scan);
2353 }
2354 accounts_scan.stop();
2355 drop(active_guard);
2356
2357 let mut candidates: Box<_> = candidates
2359 .iter_mut()
2360 .map(|candidates_bin| mem::take(candidates_bin.get_mut().unwrap()))
2361 .collect();
2362
2363 let retained_keys_count: usize = candidates.iter().map(HashMap::len).sum();
2364 let reclaims = reclaims.into_inner().unwrap();
2365 let mut pubkeys_removed_from_accounts_index =
2366 pubkeys_removed_from_accounts_index.into_inner().unwrap();
2367
2368 let active_guard = self.active_stats.activate(ActiveStatItem::CleanOldAccounts);
2369 let mut clean_old_rooted = Measure::start("clean_old_roots");
2370 let (purged_account_slots, removed_accounts) =
2371 self.clean_accounts_older_than_root(&reclaims, &pubkeys_removed_from_accounts_index);
2372 clean_old_rooted.stop();
2373 drop(active_guard);
2374
2375 let active_guard = self
2378 .active_stats
2379 .activate(ActiveStatItem::CleanCollectStoreCounts);
2380 let mut store_counts_time = Measure::start("store_counts");
2381 let mut store_counts: HashMap<Slot, (usize, HashSet<Pubkey>)> = HashMap::new();
2382 for candidates_bin in candidates.iter_mut() {
2383 for (pubkey, cleaning_info) in candidates_bin.iter_mut() {
2384 let slot_list = &mut cleaning_info.slot_list;
2385 let ref_count = &mut cleaning_info.ref_count;
2386 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2387 if purged_account_slots.contains_key(pubkey) {
2388 *ref_count = self.accounts_index.ref_count_from_storage(pubkey);
2389 }
2390 slot_list.retain(|(slot, account_info)| {
2391 let was_slot_purged = purged_account_slots
2392 .get(pubkey)
2393 .map(|slots_removed| slots_removed.contains(slot))
2394 .unwrap_or(false);
2395 if was_slot_purged {
2396 return false;
2399 }
2400 let was_reclaimed = removed_accounts
2403 .get(slot)
2404 .map(|store_removed| store_removed.contains(&account_info.offset()))
2405 .unwrap_or(false);
2406 if was_reclaimed {
2407 return false;
2408 }
2409 if let Some(store_count) = store_counts.get_mut(slot) {
2410 store_count.0 -= 1;
2411 store_count.1.insert(*pubkey);
2412 } else {
2413 let mut key_set = HashSet::new();
2414 key_set.insert(*pubkey);
2415 assert!(
2416 !account_info.is_cached(),
2417 "The Accounts Cache must be flushed first for this account info. \
2418 pubkey: {}, slot: {}",
2419 *pubkey,
2420 *slot
2421 );
2422 let count = self
2423 .storage
2424 .get_account_storage_entry(*slot, account_info.store_id())
2425 .map(|store| store.count())
2426 .unwrap()
2427 - 1;
2428 debug!(
2429 "store_counts, inserting slot: {}, store id: {}, count: {}",
2430 slot,
2431 account_info.store_id(),
2432 count
2433 );
2434 store_counts.insert(*slot, (count, key_set));
2435 }
2436 true
2437 });
2438 }
2439 }
2440 store_counts_time.stop();
2441 drop(active_guard);
2442
2443 let active_guard = self
2444 .active_stats
2445 .activate(ActiveStatItem::CleanCalcDeleteDeps);
2446 let mut calc_deps_time = Measure::start("calc_deps");
2447 self.calc_delete_dependencies(&candidates, &mut store_counts, min_dirty_slot);
2448 calc_deps_time.stop();
2449 drop(active_guard);
2450
2451 let active_guard = self
2452 .active_stats
2453 .activate(ActiveStatItem::CleanFilterZeroLamport);
2454 let mut purge_filter = Measure::start("purge_filter");
2455 self.filter_zero_lamport_clean_for_incremental_snapshots(
2456 max_clean_root_inclusive,
2457 &store_counts,
2458 &mut candidates,
2459 );
2460 purge_filter.stop();
2461 drop(active_guard);
2462
2463 let active_guard = self.active_stats.activate(ActiveStatItem::CleanReclaims);
2464 let mut reclaims_time = Measure::start("reclaims");
2465 let mut pubkey_to_slot_set = Vec::new();
2467 for candidates_bin in candidates.iter() {
2468 let mut bin_set = candidates_bin
2469 .iter()
2470 .filter_map(|(pubkey, cleaning_info)| {
2471 let slot_list = &cleaning_info.slot_list;
2472 (!slot_list.is_empty()).then_some((
2473 *pubkey,
2474 slot_list
2475 .iter()
2476 .map(|(slot, _)| *slot)
2477 .collect::<HashSet<Slot>>(),
2478 ))
2479 })
2480 .collect::<Vec<_>>();
2481 pubkey_to_slot_set.append(&mut bin_set);
2482 }
2483
2484 let (reclaims, pubkeys_removed_from_accounts_index2) =
2485 self.purge_keys_exact(pubkey_to_slot_set.iter());
2486 pubkeys_removed_from_accounts_index.extend(pubkeys_removed_from_accounts_index2);
2487
2488 self.handle_reclaims(
2489 (!reclaims.is_empty()).then(|| reclaims.iter()),
2490 None,
2491 &pubkeys_removed_from_accounts_index,
2492 HandleReclaims::ProcessDeadSlots(&self.clean_accounts_stats.purge_stats),
2493 MarkAccountsObsolete::No,
2494 );
2495
2496 reclaims_time.stop();
2497 drop(active_guard);
2498
2499 measure_all.stop();
2500
2501 self.clean_accounts_stats.report();
2502 datapoint_info!(
2503 "clean_accounts",
2504 ("max_clean_root", max_clean_root_inclusive, Option<i64>),
2505 ("total_us", measure_all.as_us(), i64),
2506 (
2507 "collect_delta_keys_us",
2508 key_timings.collect_delta_keys_us,
2509 i64
2510 ),
2511 ("oldest_dirty_slot", key_timings.oldest_dirty_slot, i64),
2512 (
2513 "pubkeys_removed_from_accounts_index",
2514 pubkeys_removed_from_accounts_index.len(),
2515 i64
2516 ),
2517 (
2518 "dirty_ancient_stores",
2519 key_timings.dirty_ancient_stores,
2520 i64
2521 ),
2522 (
2523 "dirty_store_processing_us",
2524 key_timings.dirty_store_processing_us,
2525 i64
2526 ),
2527 ("construct_candidates_us", measure_construct_candidates.as_us(), i64),
2528 ("accounts_scan", accounts_scan.as_us(), i64),
2529 ("clean_old_rooted", clean_old_rooted.as_us(), i64),
2530 ("store_counts", store_counts_time.as_us(), i64),
2531 ("purge_filter", purge_filter.as_us(), i64),
2532 ("calc_deps", calc_deps_time.as_us(), i64),
2533 ("reclaims", reclaims_time.as_us(), i64),
2534 ("delta_insert_us", key_timings.delta_insert_us, i64),
2535 ("delta_key_count", key_timings.delta_key_count, i64),
2536 ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64),
2537 ("useful_keys", useful_accum.load(Ordering::Relaxed), i64),
2538 ("total_keys_count", num_candidates, i64),
2539 ("retained_keys_count", retained_keys_count, i64),
2540 (
2541 "scan_found_not_zero",
2542 found_not_zero_accum.load(Ordering::Relaxed),
2543 i64
2544 ),
2545 (
2546 "scan_not_found_on_fork",
2547 not_found_on_fork_accum.load(Ordering::Relaxed),
2548 i64
2549 ),
2550 ("scan_missing", missing_accum.load(Ordering::Relaxed), i64),
2551 (
2552 "get_account_sizes_us",
2553 self.clean_accounts_stats
2554 .get_account_sizes_us
2555 .swap(0, Ordering::Relaxed),
2556 i64
2557 ),
2558 (
2559 "slots_cleaned",
2560 self.clean_accounts_stats
2561 .slots_cleaned
2562 .swap(0, Ordering::Relaxed),
2563 i64
2564 ),
2565 (
2566 "clean_old_root_us",
2567 self.clean_accounts_stats
2568 .clean_old_root_us
2569 .swap(0, Ordering::Relaxed),
2570 i64
2571 ),
2572 (
2573 "clean_old_root_reclaim_us",
2574 self.clean_accounts_stats
2575 .clean_old_root_reclaim_us
2576 .swap(0, Ordering::Relaxed),
2577 i64
2578 ),
2579 (
2580 "remove_dead_accounts_remove_us",
2581 self.clean_accounts_stats
2582 .remove_dead_accounts_remove_us
2583 .swap(0, Ordering::Relaxed),
2584 i64
2585 ),
2586 (
2587 "remove_dead_accounts_shrink_us",
2588 self.clean_accounts_stats
2589 .remove_dead_accounts_shrink_us
2590 .swap(0, Ordering::Relaxed),
2591 i64
2592 ),
2593 (
2594 "clean_stored_dead_slots_us",
2595 self.clean_accounts_stats
2596 .clean_stored_dead_slots_us
2597 .swap(0, Ordering::Relaxed),
2598 i64
2599 ),
2600 (
2601 "roots_added",
2602 self.accounts_index.roots_added.swap(0, Ordering::Relaxed),
2603 i64
2604 ),
2605 (
2606 "purge_older_root_entries_one_slot_list",
2607 self.accounts_index
2608 .purge_older_root_entries_one_slot_list
2609 .swap(0, Ordering::Relaxed),
2610 i64
2611 ),
2612 (
2613 "roots_removed",
2614 self.accounts_index.roots_removed.swap(0, Ordering::Relaxed),
2615 i64
2616 ),
2617 (
2618 "active_scans",
2619 self.accounts_index.active_scans.load(Ordering::Relaxed),
2620 i64
2621 ),
2622 (
2623 "max_distance_to_min_scan_slot",
2624 self.accounts_index
2625 .max_distance_to_min_scan_slot
2626 .swap(0, Ordering::Relaxed),
2627 i64
2628 ),
2629 (
2630 "ancient_account_cleans",
2631 ancient_account_cleans.load(Ordering::Relaxed),
2632 i64
2633 ),
2634 (
2635 "purges_old_accounts_count",
2636 purges_old_accounts_count.load(Ordering::Relaxed),
2637 i64
2638 ),
2639 ("next_store_id", self.next_id.load(Ordering::Relaxed), i64),
2640 );
2641 }
2642
2643 fn handle_reclaims<'a, I>(
2674 &'a self,
2675 reclaims: Option<I>,
2676 expected_single_dead_slot: Option<Slot>,
2677 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2678 handle_reclaims: HandleReclaims<'a>,
2679 mark_accounts_obsolete: MarkAccountsObsolete,
2680 ) -> ReclaimResult
2681 where
2682 I: Iterator<Item = &'a (Slot, AccountInfo)>,
2683 {
2684 let mut reclaim_result = ReclaimResult::default();
2685 if let Some(reclaims) = reclaims {
2686 let (dead_slots, reclaimed_offsets) = self.remove_dead_accounts(
2687 reclaims,
2688 expected_single_dead_slot,
2689 mark_accounts_obsolete,
2690 );
2691 reclaim_result.1 = reclaimed_offsets;
2692 let HandleReclaims::ProcessDeadSlots(purge_stats) = handle_reclaims;
2693 if let Some(expected_single_dead_slot) = expected_single_dead_slot {
2694 assert!(dead_slots.len() <= 1);
2695 if dead_slots.len() == 1 {
2696 assert!(dead_slots.contains(&expected_single_dead_slot));
2697 }
2698 }
2699 let clean_stored_dead_slots =
2701 !matches!(mark_accounts_obsolete, MarkAccountsObsolete::Yes(_));
2702
2703 self.process_dead_slots(
2704 &dead_slots,
2705 Some(&mut reclaim_result.0),
2706 purge_stats,
2707 pubkeys_removed_from_accounts_index,
2708 clean_stored_dead_slots,
2709 );
2710 }
2711 reclaim_result
2712 }
2713
2714 fn filter_zero_lamport_clean_for_incremental_snapshots(
2738 &self,
2739 max_clean_root_inclusive: Option<Slot>,
2740 store_counts: &HashMap<Slot, (usize, HashSet<Pubkey>)>,
2741 candidates: &mut [HashMap<Pubkey, CleaningInfo>],
2742 ) {
2743 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2744 let should_filter_for_incremental_snapshots = max_clean_root_inclusive.unwrap_or(Slot::MAX)
2745 > latest_full_snapshot_slot.unwrap_or(Slot::MAX);
2746 assert!(
2747 latest_full_snapshot_slot.is_some() || !should_filter_for_incremental_snapshots,
2748 "if filtering for incremental snapshots, then snapshots should be enabled",
2749 );
2750
2751 for bin in candidates {
2752 bin.retain(|pubkey, cleaning_info| {
2753 let slot_list = &cleaning_info.slot_list;
2754 debug_assert!(!slot_list.is_empty(), "candidate slot_list can't be empty");
2755 for (slot, _account_info) in slot_list.iter() {
2758 if let Some(store_count) = store_counts.get(slot) {
2759 if store_count.0 != 0 {
2760 return false;
2762 }
2763 } else {
2764 return false;
2766 }
2767 }
2768
2769 if !should_filter_for_incremental_snapshots {
2771 return true;
2772 }
2773
2774 let (slot, account_info) = slot_list
2777 .iter()
2778 .max_by_key(|(slot, _account_info)| slot)
2779 .unwrap();
2780
2781 assert!(account_info.is_zero_lamport());
2787 let cannot_purge = *slot > latest_full_snapshot_slot.unwrap();
2788 if cannot_purge {
2789 self.zero_lamport_accounts_to_purge_after_full_snapshot
2790 .insert((*slot, *pubkey));
2791 }
2792 !cannot_purge
2793 });
2794 }
2795 }
2796
2797 fn process_dead_slots(
2806 &self,
2807 dead_slots: &IntSet<Slot>,
2808 purged_account_slots: Option<&mut AccountSlots>,
2809 purge_stats: &PurgeStats,
2810 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
2811 clean_stored_dead_slots: bool,
2812 ) {
2813 if dead_slots.is_empty() {
2814 return;
2815 }
2816 let mut clean_dead_slots = Measure::start("reclaims::clean_dead_slots");
2817
2818 if clean_stored_dead_slots {
2819 self.clean_stored_dead_slots(
2820 dead_slots,
2821 purged_account_slots,
2822 pubkeys_removed_from_accounts_index,
2823 );
2824 }
2825
2826 self.remove_dead_slots_metadata(dead_slots.iter());
2828
2829 clean_dead_slots.stop();
2830
2831 let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
2832 self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats);
2833 purge_removed_slots.stop();
2834
2835 {
2838 let mut list = self.shrink_candidate_slots.lock().unwrap();
2839 for slot in dead_slots {
2840 list.remove(slot);
2841 }
2842 }
2843
2844 debug!(
2845 "process_dead_slots({}): {} {} {:?}",
2846 dead_slots.len(),
2847 clean_dead_slots,
2848 purge_removed_slots,
2849 dead_slots,
2850 );
2851 }
2852
2853 fn load_accounts_index_for_shrink<'a, T: ShrinkCollectRefs<'a>>(
2858 &self,
2859 accounts: &'a [AccountFromStorage],
2860 stats: &ShrinkStats,
2861 slot_to_shrink: Slot,
2862 ) -> LoadAccountsIndexForShrink<'a, T> {
2863 let count = accounts.len();
2864 let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
2865 let mut pubkeys_to_unref = Vec::with_capacity(count);
2866 let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);
2867
2868 let mut alive = 0;
2869 let mut dead = 0;
2870 let mut index = 0;
2871 let mut index_scan_returned_some_count = 0;
2872 let mut index_scan_returned_none_count = 0;
2873 let mut all_are_zero_lamports = true;
2874 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
2875 self.accounts_index.scan(
2876 accounts.iter().map(|account| account.pubkey()),
2877 |pubkey, slots_refs, _entry| {
2878 let stored_account = &accounts[index];
2879 let mut do_populate_accounts_for_shrink = |ref_count, slot_list| {
2880 if stored_account.is_zero_lamport()
2881 && ref_count == 1
2882 && latest_full_snapshot_slot
2883 .map(|latest_full_snapshot_slot| {
2884 latest_full_snapshot_slot >= slot_to_shrink
2885 })
2886 .unwrap_or(true)
2887 {
2888 zero_lamport_single_ref_pubkeys.push(pubkey);
2891 self.add_uncleaned_pubkeys_after_shrink(
2892 slot_to_shrink,
2893 [*pubkey].into_iter(),
2894 );
2895 } else {
2896 all_are_zero_lamports &= stored_account.is_zero_lamport();
2897 alive_accounts.add(ref_count, stored_account, slot_list);
2898 alive += 1;
2899 }
2900 };
2901 if let Some((slot_list, ref_count)) = slots_refs {
2902 index_scan_returned_some_count += 1;
2903 let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
2904 *slot == slot_to_shrink
2906 });
2907
2908 if !is_alive {
2909 pubkeys_to_unref.push(pubkey);
2914 dead += 1;
2915 } else {
2916 do_populate_accounts_for_shrink(ref_count, slot_list);
2917 }
2918 } else {
2919 index_scan_returned_none_count += 1;
2920 let ref_count = 1;
2926 let slot_list = [(slot_to_shrink, AccountInfo::default())];
2927 do_populate_accounts_for_shrink(ref_count, &slot_list);
2928 }
2929 index += 1;
2930 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
2931 },
2932 None,
2933 false,
2934 self.scan_filter_for_shrinking,
2935 );
2936 assert_eq!(index, std::cmp::min(accounts.len(), count));
2937 stats
2938 .index_scan_returned_some
2939 .fetch_add(index_scan_returned_some_count, Ordering::Relaxed);
2940 stats
2941 .index_scan_returned_none
2942 .fetch_add(index_scan_returned_none_count, Ordering::Relaxed);
2943 stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
2944 stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);
2945
2946 LoadAccountsIndexForShrink {
2947 alive_accounts,
2948 pubkeys_to_unref,
2949 zero_lamport_single_ref_pubkeys,
2950 all_are_zero_lamports,
2951 }
2952 }
2953
2954 pub fn get_unique_accounts_from_storage(
2957 &self,
2958 store: &AccountStorageEntry,
2959 ) -> GetUniqueAccountsResult {
2960 let capacity = store.capacity();
2961 let mut stored_accounts = Vec::with_capacity(store.count());
2962 store
2963 .accounts
2964 .scan_accounts_without_data(|offset, account| {
2965 let file_id = 0;
2967 stored_accounts.push(AccountFromStorage {
2968 index_info: AccountInfo::new(
2969 StorageLocation::AppendVec(file_id, offset),
2970 account.is_zero_lamport(),
2971 ),
2972 pubkey: *account.pubkey(),
2973 data_len: account.data_len as u64,
2974 });
2975 })
2976 .expect("must scan accounts storage");
2977
2978 let num_duplicated_accounts = Self::sort_and_remove_dups(&mut stored_accounts);
2980
2981 GetUniqueAccountsResult {
2982 stored_accounts,
2983 capacity,
2984 num_duplicated_accounts,
2985 }
2986 }
2987
2988 #[cfg(feature = "dev-context-only-utils")]
2989 pub fn set_storage_access(&mut self, storage_access: StorageAccess) {
2990 self.storage_access = storage_access;
2991 }
2992
2993 #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
2998 fn sort_and_remove_dups(accounts: &mut Vec<AccountFromStorage>) -> usize {
2999 accounts.sort_by(|a, b| a.pubkey().cmp(b.pubkey()));
3001 let len0 = accounts.len();
3002 if accounts.len() > 1 {
3003 let mut last = 0;
3004 let mut curr = 1;
3005
3006 while curr < accounts.len() {
3007 if accounts[curr].pubkey() != accounts[last].pubkey() {
3008 last += 1;
3009 }
3010 accounts[last] = accounts[curr];
3011 curr += 1;
3012 }
3013 accounts.truncate(last + 1);
3014 }
3015 len0 - accounts.len()
3016 }
3017
3018 pub(crate) fn get_unique_accounts_from_storage_for_shrink(
3019 &self,
3020 store: &AccountStorageEntry,
3021 stats: &ShrinkStats,
3022 ) -> GetUniqueAccountsResult {
3023 let (result, storage_read_elapsed_us) =
3024 measure_us!(self.get_unique_accounts_from_storage(store));
3025 stats
3026 .storage_read_elapsed
3027 .fetch_add(storage_read_elapsed_us, Ordering::Relaxed);
3028 stats
3029 .num_duplicated_accounts
3030 .fetch_add(result.num_duplicated_accounts as u64, Ordering::Relaxed);
3031 result
3032 }
3033
3034 pub(crate) fn shrink_collect<'a: 'b, 'b, T: ShrinkCollectRefs<'b>>(
3037 &self,
3038 store: &'a AccountStorageEntry,
3039 unique_accounts: &'b mut GetUniqueAccountsResult,
3040 stats: &ShrinkStats,
3041 ) -> ShrinkCollect<'b, T> {
3042 let slot = store.slot();
3043
3044 let GetUniqueAccountsResult {
3045 stored_accounts,
3046 capacity,
3047 num_duplicated_accounts,
3048 } = unique_accounts;
3049
3050 let mut index_read_elapsed = Measure::start("index_read_elapsed");
3051
3052 let len = stored_accounts.len();
3053 let alive_accounts_collect = Mutex::new(T::with_capacity(len, slot));
3054 let pubkeys_to_unref_collect = Mutex::new(Vec::with_capacity(len));
3055 let zero_lamport_single_ref_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
3056
3057 let obsolete_offsets: IntSet<_> = store
3061 .get_obsolete_accounts(None)
3062 .into_iter()
3063 .map(|(offset, _)| offset)
3064 .collect();
3065
3066 let initial_len = stored_accounts.len();
3068 stored_accounts.retain(|account| !obsolete_offsets.contains(&account.index_info.offset()));
3069 let obsolete_accounts_filtered = initial_len - stored_accounts.len();
3070
3071 stats
3072 .accounts_loaded
3073 .fetch_add(len as u64, Ordering::Relaxed);
3074 stats
3075 .num_duplicated_accounts
3076 .fetch_add(*num_duplicated_accounts as u64, Ordering::Relaxed);
3077 let all_are_zero_lamports_collect = Mutex::new(true);
3078 self.thread_pool_background.install(|| {
3079 stored_accounts
3080 .par_chunks(SHRINK_COLLECT_CHUNK_SIZE)
3081 .for_each(|stored_accounts| {
3082 let LoadAccountsIndexForShrink {
3083 alive_accounts,
3084 mut pubkeys_to_unref,
3085 all_are_zero_lamports,
3086 mut zero_lamport_single_ref_pubkeys,
3087 } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
3088
3089 alive_accounts_collect
3091 .lock()
3092 .unwrap()
3093 .collect(alive_accounts);
3094 pubkeys_to_unref_collect
3095 .lock()
3096 .unwrap()
3097 .append(&mut pubkeys_to_unref);
3098 zero_lamport_single_ref_pubkeys_collect
3099 .lock()
3100 .unwrap()
3101 .append(&mut zero_lamport_single_ref_pubkeys);
3102 if !all_are_zero_lamports {
3103 *all_are_zero_lamports_collect.lock().unwrap() = false;
3104 }
3105 });
3106 });
3107
3108 let alive_accounts = alive_accounts_collect.into_inner().unwrap();
3109 let pubkeys_to_unref = pubkeys_to_unref_collect.into_inner().unwrap();
3110 let zero_lamport_single_ref_pubkeys = zero_lamport_single_ref_pubkeys_collect
3111 .into_inner()
3112 .unwrap();
3113
3114 index_read_elapsed.stop();
3115
3116 stats
3117 .obsolete_accounts_filtered
3118 .fetch_add(obsolete_accounts_filtered as u64, Ordering::Relaxed);
3119
3120 stats
3121 .index_read_elapsed
3122 .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
3123
3124 let alive_total_bytes = alive_accounts.alive_bytes();
3125
3126 stats
3127 .accounts_removed
3128 .fetch_add(len - alive_accounts.len(), Ordering::Relaxed);
3129 stats.bytes_removed.fetch_add(
3130 capacity.saturating_sub(alive_total_bytes as u64),
3131 Ordering::Relaxed,
3132 );
3133 stats
3134 .bytes_written
3135 .fetch_add(alive_total_bytes as u64, Ordering::Relaxed);
3136
3137 ShrinkCollect {
3138 slot,
3139 capacity: *capacity,
3140 pubkeys_to_unref,
3141 zero_lamport_single_ref_pubkeys,
3142 alive_accounts,
3143 alive_total_bytes,
3144 total_starting_accounts: len,
3145 all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
3146 }
3147 }
3148
3149 fn remove_zero_lamport_single_ref_accounts_after_shrink(
3160 &self,
3161 zero_lamport_single_ref_pubkeys: &[&Pubkey],
3162 slot: Slot,
3163 stats: &ShrinkStats,
3164 do_assert: bool,
3165 ) {
3166 stats.purged_zero_lamports.fetch_add(
3167 zero_lamport_single_ref_pubkeys.len() as u64,
3168 Ordering::Relaxed,
3169 );
3170
3171 self.accounts_index.scan(
3175 zero_lamport_single_ref_pubkeys.iter().cloned(),
3176 |_pubkey, _slots_refs, _entry| AccountsIndexScanResult::Unref,
3177 if do_assert {
3178 Some(AccountsIndexScanResult::UnrefAssert0)
3179 } else {
3180 Some(AccountsIndexScanResult::UnrefLog0)
3181 },
3182 false,
3183 ScanFilter::All,
3184 );
3185
3186 zero_lamport_single_ref_pubkeys.iter().for_each(|k| {
3187 _ = self.purge_keys_exact([&(**k, slot)].into_iter());
3188 });
3189 }
3190
3191 pub(crate) fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
3194 &self,
3195 shrink_collect: &ShrinkCollect<'a, T>,
3196 stats: &ShrinkStats,
3197 shrink_in_progress: Option<ShrinkInProgress>,
3198 shrink_can_be_active: bool,
3199 ) {
3200 let mut time = Measure::start("remove_old_stores_shrink");
3201
3202 self.remove_zero_lamport_single_ref_accounts_after_shrink(
3206 &shrink_collect.zero_lamport_single_ref_pubkeys,
3207 shrink_collect.slot,
3208 stats,
3209 false,
3210 );
3211
3212 let dead_storages = self.mark_dirty_dead_stores(
3216 shrink_collect.slot,
3217 shrink_collect.all_are_zero_lamports,
3220 shrink_in_progress,
3221 shrink_can_be_active,
3222 );
3223 let dead_storages_len = dead_storages.len();
3224
3225 if !shrink_collect.all_are_zero_lamports {
3226 self.add_uncleaned_pubkeys_after_shrink(
3227 shrink_collect.slot,
3228 shrink_collect.pubkeys_to_unref.iter().cloned().cloned(),
3229 );
3230 }
3231
3232 let (_, drop_storage_entries_elapsed) = measure_us!(drop(dead_storages));
3233 time.stop();
3234
3235 self.stats
3236 .dropped_stores
3237 .fetch_add(dead_storages_len as u64, Ordering::Relaxed);
3238 stats
3239 .drop_storage_entries_elapsed
3240 .fetch_add(drop_storage_entries_elapsed, Ordering::Relaxed);
3241 stats
3242 .remove_old_stores_shrink_us
3243 .fetch_add(time.as_us(), Ordering::Relaxed);
3244 }
3245
3246 pub(crate) fn unref_shrunk_dead_accounts<'a>(
3247 &self,
3248 pubkeys: impl Iterator<Item = &'a Pubkey>,
3249 slot: Slot,
3250 ) {
3251 self.accounts_index.scan(
3252 pubkeys,
3253 |pubkey, slot_refs, _entry| {
3254 match slot_refs {
3255 Some((slot_list, ref_count)) => {
3256 if slot_list.len() == 1 && ref_count == 2 {
3258 if let Some((slot_alive, acct_info)) = slot_list.first() {
3259 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
3260 self.zero_lamport_single_ref_found(
3261 *slot_alive,
3262 acct_info.offset(),
3263 );
3264 }
3265 }
3266 }
3267 }
3268 None => {
3269 warn!(
3273 "pubkey {pubkey} in slot {slot} was NOT found in accounts index \
3274 during shrink"
3275 );
3276 datapoint_warn!(
3277 "accounts_db-shink_pubkey_missing_from_index",
3278 ("store_slot", slot, i64),
3279 ("pubkey", pubkey.to_string(), String),
3280 );
3281 }
3282 }
3283 AccountsIndexScanResult::Unref
3284 },
3285 None,
3286 false,
3287 ScanFilter::All,
3288 );
3289 }
3290
3291 pub(crate) fn zero_lamport_single_ref_found(&self, slot: Slot, offset: Offset) {
3293 if let Some(store) = self
3306 .storage
3307 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3308 {
3309 if store.insert_zero_lamport_single_ref_account_offset(offset) {
3310 self.shrink_stats
3312 .num_zero_lamport_single_ref_accounts_found
3313 .fetch_add(1, Ordering::Relaxed);
3314
3315 if store.num_zero_lamport_single_ref_accounts() == store.count() {
3316 self.dirty_stores.entry(slot).or_insert(store);
3318 self.shrink_stats
3319 .num_dead_slots_added_to_clean
3320 .fetch_add(1, Ordering::Relaxed);
3321 } else if Self::is_shrinking_productive(&store)
3322 && self.is_candidate_for_shrink(&store)
3323 {
3324 let is_new = self.shrink_candidate_slots.lock().unwrap().insert(slot);
3326 if is_new {
3327 self.shrink_stats
3328 .num_slots_with_zero_lamport_accounts_added_to_shrink
3329 .fetch_add(1, Ordering::Relaxed);
3330 }
3331 } else {
3332 self.shrink_stats
3333 .marking_zero_dead_accounts_in_non_shrinkable_store
3334 .fetch_add(1, Ordering::Relaxed);
3335 }
3336 }
3337 }
3338 }
3339
3340 fn shrink_storage(&self, store: Arc<AccountStorageEntry>) {
3342 let slot = store.slot();
3343 if self.accounts_cache.contains(slot) {
3344 return;
3356 }
3357 let mut unique_accounts =
3358 self.get_unique_accounts_from_storage_for_shrink(&store, &self.shrink_stats);
3359 debug!("do_shrink_slot_store: slot: {slot}");
3360 let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
3361 &store,
3362 &mut unique_accounts,
3363 &self.shrink_stats,
3364 );
3365
3366 if Self::should_not_shrink(
3369 shrink_collect.alive_total_bytes as u64,
3370 shrink_collect.capacity,
3371 ) || shrink_collect.alive_total_bytes == 0
3372 {
3373 if shrink_collect.alive_total_bytes == 0 {
3374 self.dirty_stores.insert(slot, store.clone());
3376 }
3377
3378 if !shrink_collect.all_are_zero_lamports {
3379 info!(
3381 "Unexpected shrink for slot {} alive {} capacity {}, likely caused by a bug \
3382 for calculating alive bytes.",
3383 slot, shrink_collect.alive_total_bytes, shrink_collect.capacity
3384 );
3385 }
3386
3387 self.shrink_stats
3388 .skipped_shrink
3389 .fetch_add(1, Ordering::Relaxed);
3390 return;
3391 }
3392
3393 self.unref_shrunk_dead_accounts(shrink_collect.pubkeys_to_unref.iter().cloned(), slot);
3394
3395 let total_accounts_after_shrink = shrink_collect.alive_accounts.len();
3396 debug!(
3397 "shrinking: slot: {}, accounts: ({} => {}) bytes: {} original: {}",
3398 slot,
3399 shrink_collect.total_starting_accounts,
3400 total_accounts_after_shrink,
3401 shrink_collect.alive_total_bytes,
3402 shrink_collect.capacity,
3403 );
3404
3405 let mut stats_sub = ShrinkStatsSub::default();
3406 let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
3407 let (shrink_in_progress, time_us) =
3408 measure_us!(self.get_store_for_shrink(slot, shrink_collect.alive_total_bytes as u64));
3409 stats_sub.create_and_insert_store_elapsed_us = Saturating(time_us);
3410
3411 let accounts = [(slot, &shrink_collect.alive_accounts.alive_accounts()[..])];
3415 let storable_accounts = StorableAccountsBySlot::new(slot, &accounts, self);
3416 stats_sub.store_accounts_timing = self.store_accounts_frozen(
3417 storable_accounts,
3418 shrink_in_progress.new_storage(),
3419 UpdateIndexThreadSelection::PoolWithThreshold,
3420 );
3421
3422 rewrite_elapsed.stop();
3423 stats_sub.rewrite_elapsed_us = Saturating(rewrite_elapsed.as_us());
3424
3425 self.shrink_candidate_slots.lock().unwrap().remove(&slot);
3431
3432 self.remove_old_stores_shrink(
3433 &shrink_collect,
3434 &self.shrink_stats,
3435 Some(shrink_in_progress),
3436 false,
3437 );
3438
3439 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
3440
3441 Self::update_shrink_stats(&self.shrink_stats, stats_sub, true);
3442 self.shrink_stats.report();
3443 }
3444
3445 pub(crate) fn update_shrink_stats(
3446 shrink_stats: &ShrinkStats,
3447 stats_sub: ShrinkStatsSub,
3448 increment_count: bool,
3449 ) {
3450 if increment_count {
3451 shrink_stats
3452 .num_slots_shrunk
3453 .fetch_add(1, Ordering::Relaxed);
3454 }
3455 shrink_stats.create_and_insert_store_elapsed.fetch_add(
3456 stats_sub.create_and_insert_store_elapsed_us.0,
3457 Ordering::Relaxed,
3458 );
3459 shrink_stats.store_accounts_elapsed.fetch_add(
3460 stats_sub.store_accounts_timing.store_accounts_elapsed,
3461 Ordering::Relaxed,
3462 );
3463 shrink_stats.update_index_elapsed.fetch_add(
3464 stats_sub.store_accounts_timing.update_index_elapsed,
3465 Ordering::Relaxed,
3466 );
3467 shrink_stats.handle_reclaims_elapsed.fetch_add(
3468 stats_sub.store_accounts_timing.handle_reclaims_elapsed,
3469 Ordering::Relaxed,
3470 );
3471 shrink_stats
3472 .rewrite_elapsed
3473 .fetch_add(stats_sub.rewrite_elapsed_us.0, Ordering::Relaxed);
3474 shrink_stats
3475 .unpackable_slots_count
3476 .fetch_add(stats_sub.unpackable_slots_count.0 as u64, Ordering::Relaxed);
3477 shrink_stats.newest_alive_packed_count.fetch_add(
3478 stats_sub.newest_alive_packed_count.0 as u64,
3479 Ordering::Relaxed,
3480 );
3481 }
3482
3483 pub fn mark_dirty_dead_stores(
3488 &self,
3489 slot: Slot,
3490 add_dirty_stores: bool,
3491 shrink_in_progress: Option<ShrinkInProgress>,
3492 shrink_can_be_active: bool,
3493 ) -> Vec<Arc<AccountStorageEntry>> {
3494 let mut dead_storages = Vec::default();
3495
3496 let mut not_retaining_store = |store: &Arc<AccountStorageEntry>| {
3497 if add_dirty_stores {
3498 self.dirty_stores.insert(slot, store.clone());
3499 }
3500 dead_storages.push(store.clone());
3501 };
3502
3503 if let Some(shrink_in_progress) = shrink_in_progress {
3504 not_retaining_store(shrink_in_progress.old_storage());
3506 } else if let Some(store) = self.storage.remove(&slot, shrink_can_be_active) {
3508 not_retaining_store(&store);
3510 }
3511
3512 dead_storages
3513 }
3514
3515 pub(crate) fn reopen_storage_as_readonly_shrinking_in_progress_ok(&self, slot: Slot) {
3518 if let Some(storage) = self
3519 .storage
3520 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3521 {
3522 if let Some(new_storage) = storage.reopen_as_readonly(self.storage_access) {
3523 assert_eq!(storage.id(), new_storage.id());
3528 assert_eq!(storage.accounts.len(), new_storage.accounts.len());
3529 self.storage
3530 .replace_storage_with_equivalent(slot, Arc::new(new_storage));
3531 }
3532 }
3533 }
3534
3535 pub fn get_store_for_shrink(&self, slot: Slot, size: u64) -> ShrinkInProgress<'_> {
3537 let shrunken_store = self.create_store(slot, size, "shrink", self.shrink_paths.as_slice());
3538 self.storage.shrinking_in_progress(slot, shrunken_store)
3539 }
3540
3541 fn shrink_slot_forced(&self, slot: Slot) {
3544 debug!("shrink_slot_forced: slot: {slot}");
3545
3546 if let Some(store) = self
3547 .storage
3548 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
3549 {
3550 if Self::is_shrinking_productive(&store) {
3551 self.shrink_storage(store)
3552 }
3553 }
3554 }
3555
3556 fn all_slots_in_storage(&self) -> Vec<Slot> {
3557 self.storage.all_slots()
3558 }
3559
3560 fn select_candidates_by_total_usage(
3568 &self,
3569 shrink_slots: &ShrinkCandidates,
3570 shrink_ratio: f64,
3571 ) -> (IntMap<Slot, Arc<AccountStorageEntry>>, ShrinkCandidates) {
3572 struct StoreUsageInfo {
3573 slot: Slot,
3574 alive_ratio: f64,
3575 store: Arc<AccountStorageEntry>,
3576 }
3577 let mut store_usage: Vec<StoreUsageInfo> = Vec::with_capacity(shrink_slots.len());
3578 let mut total_alive_bytes: u64 = 0;
3579 let mut total_bytes: u64 = 0;
3580 for slot in shrink_slots {
3581 let Some(store) = self.storage.get_slot_storage_entry(*slot) else {
3582 continue;
3583 };
3584 let alive_bytes = store.alive_bytes();
3585 total_alive_bytes += alive_bytes as u64;
3586 total_bytes += store.capacity();
3587 let alive_ratio = alive_bytes as f64 / store.capacity() as f64;
3588 store_usage.push(StoreUsageInfo {
3589 slot: *slot,
3590 alive_ratio,
3591 store: store.clone(),
3592 });
3593 }
3594 store_usage.sort_by(|a, b| {
3595 a.alive_ratio
3596 .partial_cmp(&b.alive_ratio)
3597 .unwrap_or(std::cmp::Ordering::Equal)
3598 });
3599
3600 let mut shrink_slots = IntMap::default();
3603 let mut shrink_slots_next_batch = ShrinkCandidates::default();
3604 for usage in &store_usage {
3605 let store = &usage.store;
3606 let alive_ratio = (total_alive_bytes as f64) / (total_bytes as f64);
3607 debug!(
3608 "alive_ratio: {:?} store_id: {:?}, store_ratio: {:?} requirement: {:?}, \
3609 total_bytes: {:?} total_alive_bytes: {:?}",
3610 alive_ratio,
3611 usage.store.id(),
3612 usage.alive_ratio,
3613 shrink_ratio,
3614 total_bytes,
3615 total_alive_bytes
3616 );
3617 if alive_ratio > shrink_ratio {
3618 debug!(
3620 "Shrinking goal can be achieved at slot {:?}, total_alive_bytes: {:?} \
3621 total_bytes: {:?}, alive_ratio: {:}, shrink_ratio: {:?}",
3622 usage.slot, total_alive_bytes, total_bytes, alive_ratio, shrink_ratio
3623 );
3624 if usage.alive_ratio < shrink_ratio {
3625 shrink_slots_next_batch.insert(usage.slot);
3626 } else {
3627 break;
3628 }
3629 } else {
3630 let current_store_size = store.capacity();
3631 let after_shrink_size = store.alive_bytes() as u64;
3632 let bytes_saved = current_store_size.saturating_sub(after_shrink_size);
3633 total_bytes -= bytes_saved;
3634 shrink_slots.insert(usage.slot, Arc::clone(store));
3635 }
3636 }
3637 (shrink_slots, shrink_slots_next_batch)
3638 }
3639
3640 fn get_roots_less_than(&self, slot: Slot) -> Vec<Slot> {
3641 self.accounts_index
3642 .roots_tracker
3643 .read()
3644 .unwrap()
3645 .alive_roots
3646 .get_all_less_than(slot)
3647 }
3648
3649 fn get_sorted_potential_ancient_slots(&self, oldest_non_ancient_slot: Slot) -> Vec<Slot> {
3653 let mut ancient_slots = self.get_roots_less_than(oldest_non_ancient_slot);
3654 ancient_slots.sort_unstable();
3655 ancient_slots
3656 }
3657
3658 pub fn shrink_ancient_slots(&self, epoch_schedule: &EpochSchedule) {
3661 if self.ancient_append_vec_offset.is_none() {
3662 return;
3663 }
3664
3665 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3666 let can_randomly_shrink = true;
3667 let sorted_slots = self.get_sorted_potential_ancient_slots(oldest_non_ancient_slot);
3668 self.combine_ancient_slots_packed(sorted_slots, can_randomly_shrink);
3669 }
3670
3671 pub(crate) fn handle_dropped_roots_for_ancient(
3674 &self,
3675 dropped_roots: impl Iterator<Item = Slot>,
3676 ) {
3677 dropped_roots.for_each(|slot| {
3678 self.accounts_index.clean_dead_slot(slot);
3679 assert!(self.storage.remove(&slot, false).is_none());
3681 debug_assert!(
3682 !self
3683 .accounts_index
3684 .roots_tracker
3685 .read()
3686 .unwrap()
3687 .alive_roots
3688 .contains(&slot),
3689 "slot: {slot}"
3690 );
3691 });
3692 }
3693
3694 fn add_uncleaned_pubkeys_after_shrink(
3697 &self,
3698 slot: Slot,
3699 pubkeys: impl Iterator<Item = Pubkey>,
3700 ) {
3701 let mut uncleaned_pubkeys = self.uncleaned_pubkeys.entry(slot).or_default();
3721 uncleaned_pubkeys.extend(pubkeys);
3722 }
3723
3724 pub fn shrink_candidate_slots(&self, epoch_schedule: &EpochSchedule) -> usize {
3725 let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
3726
3727 let shrink_candidates_slots =
3728 std::mem::take(&mut *self.shrink_candidate_slots.lock().unwrap());
3729 self.shrink_stats
3730 .initial_candidates_count
3731 .store(shrink_candidates_slots.len() as u64, Ordering::Relaxed);
3732
3733 let candidates_count = shrink_candidates_slots.len();
3734 let ((mut shrink_slots, shrink_slots_next_batch), select_time_us) = measure_us!({
3735 if let AccountShrinkThreshold::TotalSpace { shrink_ratio } = self.shrink_ratio {
3736 let (shrink_slots, shrink_slots_next_batch) =
3737 self.select_candidates_by_total_usage(&shrink_candidates_slots, shrink_ratio);
3738 (shrink_slots, Some(shrink_slots_next_batch))
3739 } else {
3740 (
3741 shrink_candidates_slots
3743 .into_iter()
3744 .filter_map(|slot| {
3745 self.storage
3746 .get_slot_storage_entry(slot)
3747 .map(|storage| (slot, storage))
3748 })
3749 .collect(),
3750 None,
3751 )
3752 }
3753 });
3754
3755 if shrink_slots.len() < SHRINK_INSERT_ANCIENT_THRESHOLD {
3758 let mut ancients = self.best_ancient_slots_to_shrink.write().unwrap();
3759 while let Some((slot, capacity)) = ancients.pop_front() {
3760 if let Some(store) = self.storage.get_slot_storage_entry(slot) {
3761 if !shrink_slots.contains(&slot)
3762 && capacity == store.capacity()
3763 && Self::is_candidate_for_shrink(self, &store)
3764 {
3765 let ancient_bytes_added_to_shrink = store.alive_bytes() as u64;
3766 shrink_slots.insert(slot, store);
3767 self.shrink_stats
3768 .ancient_bytes_added_to_shrink
3769 .fetch_add(ancient_bytes_added_to_shrink, Ordering::Relaxed);
3770 self.shrink_stats
3771 .ancient_slots_added_to_shrink
3772 .fetch_add(1, Ordering::Relaxed);
3773 break;
3774 }
3775 }
3776 }
3777 }
3778 if shrink_slots.is_empty()
3779 && shrink_slots_next_batch
3780 .as_ref()
3781 .map(|s| s.is_empty())
3782 .unwrap_or(true)
3783 {
3784 return 0;
3785 }
3786
3787 let _guard = (!shrink_slots.is_empty())
3788 .then_some(|| self.active_stats.activate(ActiveStatItem::Shrink));
3789
3790 let num_selected = shrink_slots.len();
3791 let (_, shrink_all_us) = measure_us!({
3792 self.thread_pool_background.install(|| {
3793 shrink_slots
3794 .into_par_iter()
3795 .for_each(|(slot, slot_shrink_candidate)| {
3796 if self.ancient_append_vec_offset.is_some()
3797 && slot < oldest_non_ancient_slot
3798 {
3799 self.shrink_stats
3800 .num_ancient_slots_shrunk
3801 .fetch_add(1, Ordering::Relaxed);
3802 }
3803 self.shrink_storage(slot_shrink_candidate);
3804 });
3805 })
3806 });
3807
3808 let mut pended_counts: usize = 0;
3809 if let Some(shrink_slots_next_batch) = shrink_slots_next_batch {
3810 let mut shrink_slots = self.shrink_candidate_slots.lock().unwrap();
3811 pended_counts = shrink_slots_next_batch.len();
3812 for slot in shrink_slots_next_batch {
3813 shrink_slots.insert(slot);
3814 }
3815 }
3816
3817 datapoint_info!(
3818 "shrink_candidate_slots",
3819 ("select_time_us", select_time_us, i64),
3820 ("shrink_all_us", shrink_all_us, i64),
3821 ("candidates_count", candidates_count, i64),
3822 ("selected_count", num_selected, i64),
3823 ("deferred_to_next_round_count", pended_counts, i64)
3824 );
3825
3826 num_selected
3827 }
3828
3829 pub fn shrink_all_slots(
3834 &self,
3835 is_startup: bool,
3836 epoch_schedule: &EpochSchedule,
3837 newest_slot_skip_shrink_inclusive: Option<Slot>,
3838 ) {
3839 let _guard = self.active_stats.activate(ActiveStatItem::Shrink);
3840 const DIRTY_STORES_CLEANING_THRESHOLD: usize = 10_000;
3841 const OUTER_CHUNK_SIZE: usize = 2000;
3842 let mut slots = self.all_slots_in_storage();
3843 if let Some(newest_slot_skip_shrink_inclusive) = newest_slot_skip_shrink_inclusive {
3844 slots.retain(|slot| slot < &newest_slot_skip_shrink_inclusive);
3847 }
3848
3849 let maybe_clean = || {
3858 if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
3859 let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
3860 self.clean_accounts(latest_full_snapshot_slot, is_startup, epoch_schedule);
3861 }
3862 };
3863
3864 if is_startup {
3865 let threads = num_cpus::get();
3866 let inner_chunk_size = std::cmp::max(OUTER_CHUNK_SIZE / threads, 1);
3867 slots.chunks(OUTER_CHUNK_SIZE).for_each(|chunk| {
3868 chunk.par_chunks(inner_chunk_size).for_each(|slots| {
3869 for slot in slots {
3870 self.shrink_slot_forced(*slot);
3871 }
3872 });
3873 maybe_clean();
3874 });
3875 } else {
3876 for slot in slots {
3877 self.shrink_slot_forced(slot);
3878 maybe_clean();
3879 }
3880 }
3881 }
3882
3883 pub fn scan_accounts<F>(
3884 &self,
3885 ancestors: &Ancestors,
3886 bank_id: BankId,
3887 mut scan_func: F,
3888 config: &ScanConfig,
3889 ) -> ScanResult<()>
3890 where
3891 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3892 {
3893 self.accounts_index.scan_accounts(
3895 ancestors,
3896 bank_id,
3897 |pubkey, (account_info, slot)| {
3898 let mut account_accessor =
3899 self.get_account_accessor(slot, pubkey, &account_info.storage_location());
3900
3901 let account_slot = match account_accessor {
3902 LoadedAccountAccessor::Cached(None) => None,
3903 _ => account_accessor.get_loaded_account(|loaded_account| {
3904 (pubkey, loaded_account.take_account(), slot)
3905 }),
3906 };
3907 scan_func(account_slot)
3908 },
3909 config,
3910 )?;
3911
3912 Ok(())
3913 }
3914
3915 #[cfg(feature = "dev-context-only-utils")]
3916 pub fn unchecked_scan_accounts<F>(
3917 &self,
3918 metric_name: &'static str,
3919 ancestors: &Ancestors,
3920 mut scan_func: F,
3921 config: &ScanConfig,
3922 ) where
3923 F: FnMut(&Pubkey, LoadedAccount, Slot),
3924 {
3925 self.accounts_index.unchecked_scan_accounts(
3926 metric_name,
3927 ancestors,
3928 |pubkey, (account_info, slot)| {
3929 self.get_account_accessor(slot, pubkey, &account_info.storage_location())
3930 .get_loaded_account(|loaded_account| {
3931 scan_func(pubkey, loaded_account, slot);
3932 });
3933 },
3934 config,
3935 );
3936 }
3937
3938 pub fn index_scan_accounts<F>(
3939 &self,
3940 ancestors: &Ancestors,
3941 bank_id: BankId,
3942 index_key: IndexKey,
3943 mut scan_func: F,
3944 config: &ScanConfig,
3945 ) -> ScanResult<bool>
3946 where
3947 F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
3948 {
3949 let key = match &index_key {
3950 IndexKey::ProgramId(key) => key,
3951 IndexKey::SplTokenMint(key) => key,
3952 IndexKey::SplTokenOwner(key) => key,
3953 };
3954 if !self.account_indexes.include_key(key) {
3955 let used_index = false;
3957 self.scan_accounts(ancestors, bank_id, scan_func, config)?;
3958 return Ok(used_index);
3959 }
3960
3961 self.accounts_index.index_scan_accounts(
3962 ancestors,
3963 bank_id,
3964 index_key,
3965 |pubkey, (account_info, slot)| {
3966 let account_slot = self
3967 .get_account_accessor(slot, pubkey, &account_info.storage_location())
3968 .get_loaded_account(|loaded_account| {
3969 (pubkey, loaded_account.take_account(), slot)
3970 });
3971 scan_func(account_slot)
3972 },
3973 config,
3974 )?;
3975 let used_index = true;
3976 Ok(used_index)
3977 }
3978
3979 pub(crate) fn scan_account_storage<R, B>(
3981 &self,
3982 slot: Slot,
3983 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
3984 storage_scan_func: impl for<'a, 'b, 'storage> Fn(
3985 &'b mut B,
3986 &'a StoredAccountInfoWithoutData<'storage>,
3987 Option<&'storage [u8]>, ) + Sync,
3989 scan_account_storage_data: ScanAccountStorageData,
3990 ) -> ScanStorageResult<R, B>
3991 where
3992 R: Send,
3993 B: Send + Default + Sync,
3994 {
3995 self.scan_cache_storage_fallback(slot, cache_map_func, |retval, storage| {
3996 match scan_account_storage_data {
3997 ScanAccountStorageData::NoData => {
3998 storage.scan_accounts_without_data(|_offset, account_without_data| {
3999 storage_scan_func(retval, &account_without_data, None);
4000 })
4001 }
4002 ScanAccountStorageData::DataRefForStorage => {
4003 let mut reader = append_vec::new_scan_accounts_reader();
4004 storage.scan_accounts(&mut reader, |_offset, account| {
4005 let account_without_data = StoredAccountInfoWithoutData::new_from(&account);
4006 storage_scan_func(retval, &account_without_data, Some(account.data));
4007 })
4008 }
4009 }
4010 .expect("must scan accounts storage");
4011 })
4012 }
4013
4014 pub fn scan_cache_storage_fallback<R, B>(
4016 &self,
4017 slot: Slot,
4018 cache_map_func: impl Fn(&LoadedAccount) -> Option<R> + Sync,
4019 storage_fallback_func: impl Fn(&mut B, &AccountsFile) + Sync,
4020 ) -> ScanStorageResult<R, B>
4021 where
4022 R: Send,
4023 B: Send + Default + Sync,
4024 {
4025 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
4026 if slot_cache.len() > SCAN_SLOT_PAR_ITER_THRESHOLD {
4029 ScanStorageResult::Cached(self.thread_pool_foreground.install(|| {
4030 slot_cache
4031 .par_iter()
4032 .filter_map(|cached_account| {
4033 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
4034 cached_account.value(),
4035 )))
4036 })
4037 .collect()
4038 }))
4039 } else {
4040 ScanStorageResult::Cached(
4041 slot_cache
4042 .iter()
4043 .filter_map(|cached_account| {
4044 cache_map_func(&LoadedAccount::Cached(Cow::Borrowed(
4045 cached_account.value(),
4046 )))
4047 })
4048 .collect(),
4049 )
4050 }
4051 } else {
4052 let mut retval = B::default();
4053 if let Some(storage) = self
4066 .storage
4067 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
4068 {
4069 storage_fallback_func(&mut retval, &storage.accounts);
4070 }
4071
4072 ScanStorageResult::Stored(retval)
4073 }
4074 }
4075
4076 pub fn load(
4077 &self,
4078 ancestors: &Ancestors,
4079 pubkey: &Pubkey,
4080 load_hint: LoadHint,
4081 ) -> Option<(AccountSharedData, Slot)> {
4082 self.do_load(ancestors, pubkey, None, load_hint, LoadZeroLamports::None)
4083 }
4084
4085 pub fn load_account_into_read_cache(&self, ancestors: &Ancestors, pubkey: &Pubkey) {
4088 self.do_load_with_populate_read_cache(
4089 ancestors,
4090 pubkey,
4091 None,
4092 LoadHint::Unspecified,
4093 true,
4094 LoadZeroLamports::None,
4096 );
4097 }
4098
4099 pub fn load_with_fixed_root(
4101 &self,
4102 ancestors: &Ancestors,
4103 pubkey: &Pubkey,
4104 ) -> Option<(AccountSharedData, Slot)> {
4105 self.load(ancestors, pubkey, LoadHint::FixedMaxRoot)
4106 }
4107
4108 fn read_index_for_accessor_or_load_slow<'a>(
4109 &'a self,
4110 ancestors: &Ancestors,
4111 pubkey: &'a Pubkey,
4112 max_root: Option<Slot>,
4113 clone_in_lock: bool,
4114 ) -> Option<(Slot, StorageLocation, Option<LoadedAccountAccessor<'a>>)> {
4115 self.accounts_index.get_with_and_then(
4116 pubkey,
4117 Some(ancestors),
4118 max_root,
4119 true,
4120 |(slot, account_info)| {
4121 let storage_location = account_info.storage_location();
4122 let account_accessor = clone_in_lock
4123 .then(|| self.get_account_accessor(slot, pubkey, &storage_location));
4124 (slot, storage_location, account_accessor)
4125 },
4126 )
4127 }
4128
4129 fn retry_to_get_account_accessor<'a>(
4130 &'a self,
4131 mut slot: Slot,
4132 mut storage_location: StorageLocation,
4133 ancestors: &'a Ancestors,
4134 pubkey: &'a Pubkey,
4135 max_root: Option<Slot>,
4136 load_hint: LoadHint,
4137 ) -> Option<(LoadedAccountAccessor<'a>, Slot)> {
4138 #[cfg(test)]
4242 {
4243 sleep(Duration::from_millis(self.load_delay));
4245 }
4246
4247 let mut num_acceptable_failed_iterations = 0;
4249 loop {
4250 let account_accessor = self.get_account_accessor(slot, pubkey, &storage_location);
4251 match account_accessor {
4252 LoadedAccountAccessor::Cached(Some(_)) | LoadedAccountAccessor::Stored(Some(_)) => {
4253 return Some((account_accessor, slot));
4255 }
4256 LoadedAccountAccessor::Cached(None) => {
4257 num_acceptable_failed_iterations += 1;
4258 match load_hint {
4262 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4263 assert!(num_acceptable_failed_iterations <= 1);
4272 }
4273 LoadHint::Unspecified => {
4274 }
4278 }
4279 }
4280 LoadedAccountAccessor::Stored(None) => {
4281 match load_hint {
4282 LoadHint::FixedMaxRootDoNotPopulateReadCache | LoadHint::FixedMaxRoot => {
4283 }
4309 LoadHint::Unspecified => {
4310 num_acceptable_failed_iterations += 1;
4321 }
4322 }
4323 }
4324 }
4325 #[cfg(not(test))]
4326 let load_limit = ABSURD_CONSECUTIVE_FAILED_ITERATIONS;
4327
4328 #[cfg(test)]
4329 let load_limit = self.load_limit.load(Ordering::Relaxed);
4330
4331 let fallback_to_slow_path = if num_acceptable_failed_iterations >= load_limit {
4332 let message = format!(
4336 "do_load() failed to get key: {pubkey} from storage, latest attempt was for \
4337 slot: {slot}, storage_location: {storage_location:?}, load_hint: \
4338 {load_hint:?}",
4339 );
4340 datapoint_warn!("accounts_db-do_load_warn", ("warn", message, String));
4341 true
4342 } else {
4343 false
4344 };
4345
4346 let (new_slot, new_storage_location, maybe_account_accessor) = self
4348 .read_index_for_accessor_or_load_slow(
4349 ancestors,
4350 pubkey,
4351 max_root,
4352 fallback_to_slow_path,
4353 )?;
4354 if new_slot == slot && new_storage_location.is_store_id_equal(&storage_location) {
4357 let message = format!(
4358 "Bad index entry detected ({}, {}, {:?}, {:?}, {:?}, {:?})",
4359 pubkey,
4360 slot,
4361 storage_location,
4362 load_hint,
4363 new_storage_location,
4364 self.accounts_index.get_cloned(pubkey)
4365 );
4366 assert!(
4370 new_storage_location.is_offset_equal(&storage_location),
4371 "{message}"
4372 );
4373
4374 assert!(!new_storage_location.is_cached(), "{message}");
4378
4379 assert_eq!(load_hint, LoadHint::Unspecified, "{message}");
4385
4386 panic!("{message}");
4395 } else if fallback_to_slow_path {
4396 return Some((
4399 maybe_account_accessor.expect("must be some if clone_in_lock=true"),
4400 new_slot,
4401 ));
4402 }
4403
4404 slot = new_slot;
4405 storage_location = new_storage_location;
4406 }
4407 }
4408
4409 fn do_load(
4410 &self,
4411 ancestors: &Ancestors,
4412 pubkey: &Pubkey,
4413 max_root: Option<Slot>,
4414 load_hint: LoadHint,
4415 load_zero_lamports: LoadZeroLamports,
4416 ) -> Option<(AccountSharedData, Slot)> {
4417 self.do_load_with_populate_read_cache(
4418 ancestors,
4419 pubkey,
4420 max_root,
4421 load_hint,
4422 false,
4423 load_zero_lamports,
4424 )
4425 }
4426
4427 pub fn load_account_with(
4432 &self,
4433 ancestors: &Ancestors,
4434 pubkey: &Pubkey,
4435 should_put_in_read_cache: bool,
4436 ) -> Option<(AccountSharedData, Slot)> {
4437 let (slot, storage_location, _maybe_account_accessor) =
4438 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, None, false)?;
4439 let in_write_cache = storage_location.is_cached();
4442 if !in_write_cache {
4443 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4444 if let Some(account) = result {
4445 if account.is_zero_lamport() {
4446 return None;
4447 }
4448 return Some((account, slot));
4449 }
4450 }
4451
4452 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4453 slot,
4454 storage_location,
4455 ancestors,
4456 pubkey,
4457 None,
4458 LoadHint::Unspecified,
4459 )?;
4460
4461 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4464 let account = account_accessor.check_and_get_loaded_account_shared_data();
4465 if account.is_zero_lamport() {
4466 return None;
4467 }
4468
4469 if !in_write_cache && should_put_in_read_cache {
4470 self.read_only_accounts_cache
4483 .store(*pubkey, slot, account.clone());
4484 }
4485 Some((account, slot))
4486 }
4487
4488 fn do_load_with_populate_read_cache(
4491 &self,
4492 ancestors: &Ancestors,
4493 pubkey: &Pubkey,
4494 max_root: Option<Slot>,
4495 load_hint: LoadHint,
4496 load_into_read_cache_only: bool,
4497 load_zero_lamports: LoadZeroLamports,
4498 ) -> Option<(AccountSharedData, Slot)> {
4499 #[cfg(not(test))]
4500 assert!(max_root.is_none());
4501
4502 let starting_max_root = self.accounts_index.max_root_inclusive();
4503
4504 let (slot, storage_location, _maybe_account_accessor) =
4505 self.read_index_for_accessor_or_load_slow(ancestors, pubkey, max_root, false)?;
4506 let in_write_cache = storage_location.is_cached();
4509 if !load_into_read_cache_only {
4510 if !in_write_cache {
4511 let result = self.read_only_accounts_cache.load(*pubkey, slot);
4512 if let Some(account) = result {
4513 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4514 return None;
4515 }
4516 return Some((account, slot));
4517 }
4518 }
4519 } else {
4520 if in_write_cache {
4522 return None;
4524 }
4525 if self.read_only_accounts_cache.in_cache(pubkey, slot) {
4526 return None;
4528 }
4529 }
4530
4531 let (mut account_accessor, slot) = self.retry_to_get_account_accessor(
4532 slot,
4533 storage_location,
4534 ancestors,
4535 pubkey,
4536 max_root,
4537 load_hint,
4538 )?;
4539 let in_write_cache = matches!(account_accessor, LoadedAccountAccessor::Cached(_));
4542 let account = account_accessor.check_and_get_loaded_account_shared_data();
4543 if load_zero_lamports == LoadZeroLamports::None && account.is_zero_lamport() {
4544 return None;
4545 }
4546
4547 if !in_write_cache && load_hint != LoadHint::FixedMaxRootDoNotPopulateReadCache {
4548 self.read_only_accounts_cache
4561 .store(*pubkey, slot, account.clone());
4562 }
4563 if load_hint == LoadHint::FixedMaxRoot
4564 || load_hint == LoadHint::FixedMaxRootDoNotPopulateReadCache
4565 {
4566 let ending_max_root = self.accounts_index.max_root_inclusive();
4568 if starting_max_root != ending_max_root {
4569 warn!(
4570 "do_load_with_populate_read_cache() scanning pubkey {pubkey} called with \
4571 fixed max root, but max root changed from {starting_max_root} to \
4572 {ending_max_root} during function call"
4573 );
4574 }
4575 }
4576 Some((account, slot))
4577 }
4578
4579 fn get_account_accessor<'a>(
4580 &'a self,
4581 slot: Slot,
4582 pubkey: &'a Pubkey,
4583 storage_location: &StorageLocation,
4584 ) -> LoadedAccountAccessor<'a> {
4585 match storage_location {
4586 StorageLocation::Cached => {
4587 let maybe_cached_account = self.accounts_cache.load(slot, pubkey).map(Cow::Owned);
4588 LoadedAccountAccessor::Cached(maybe_cached_account)
4589 }
4590 StorageLocation::AppendVec(store_id, offset) => {
4591 let maybe_storage_entry = self
4592 .storage
4593 .get_account_storage_entry(slot, *store_id)
4594 .map(|account_storage_entry| (account_storage_entry, *offset));
4595 LoadedAccountAccessor::Stored(maybe_storage_entry)
4596 }
4597 }
4598 }
4599
4600 fn has_space_available(&self, slot: Slot, size: u64) -> bool {
4601 let store = self.storage.get_slot_storage_entry(slot).unwrap();
4602 if store.status() == AccountStorageStatus::Available
4603 && store.accounts.remaining_bytes() >= size
4604 {
4605 return true;
4606 }
4607 false
4608 }
4609
4610 fn create_store(
4611 &self,
4612 slot: Slot,
4613 size: u64,
4614 from: &str,
4615 paths: &[PathBuf],
4616 ) -> Arc<AccountStorageEntry> {
4617 self.stats
4618 .create_store_count
4619 .fetch_add(1, Ordering::Relaxed);
4620 let path_index = thread_rng().gen_range(0..paths.len());
4621 let store = Arc::new(self.new_storage_entry(slot, Path::new(&paths[path_index]), size));
4622
4623 debug!(
4624 "creating store: {} slot: {} len: {} size: {} from: {} path: {}",
4625 store.id(),
4626 slot,
4627 store.accounts.len(),
4628 store.accounts.capacity(),
4629 from,
4630 store.accounts.path().display(),
4631 );
4632
4633 store
4634 }
4635
4636 fn create_and_insert_store(
4637 &self,
4638 slot: Slot,
4639 size: u64,
4640 from: &str,
4641 ) -> Arc<AccountStorageEntry> {
4642 self.create_and_insert_store_with_paths(slot, size, from, &self.paths)
4643 }
4644
4645 fn create_and_insert_store_with_paths(
4646 &self,
4647 slot: Slot,
4648 size: u64,
4649 from: &str,
4650 paths: &[PathBuf],
4651 ) -> Arc<AccountStorageEntry> {
4652 let store = self.create_store(slot, size, from, paths);
4653 let store_for_index = store.clone();
4654
4655 self.insert_store(slot, store_for_index);
4656 store
4657 }
4658
4659 fn insert_store(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
4660 self.storage.insert(slot, store)
4661 }
4662
4663 pub fn enable_bank_drop_callback(&self) {
4664 self.is_bank_drop_callback_enabled
4665 .store(true, Ordering::Release);
4666 }
4667
4668 pub fn purge_slot(&self, slot: Slot, bank_id: BankId, is_serialized_with_abs: bool) {
4674 if self.is_bank_drop_callback_enabled.load(Ordering::Acquire) && !is_serialized_with_abs {
4675 panic!(
4676 "bad drop callpath detected; Bank::drop() must run serially with other logic in \
4677 ABS like clean_accounts()"
4678 )
4679 }
4680
4681 if self
4686 .accounts_index
4687 .removed_bank_ids
4688 .lock()
4689 .unwrap()
4690 .remove(&bank_id)
4691 {
4692 return;
4694 }
4695
4696 self.purge_slots(std::iter::once(&slot));
4697 }
4698
4699 pub fn purge_slots_from_cache_and_store<'a>(
4702 &self,
4703 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4704 purge_stats: &PurgeStats,
4705 ) {
4706 let mut remove_cache_elapsed_across_slots = 0;
4707 let mut num_cached_slots_removed = 0;
4708 let mut total_removed_cached_bytes = 0;
4709 for remove_slot in removed_slots {
4710 let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed");
4713 if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) {
4718 num_cached_slots_removed += 1;
4721 total_removed_cached_bytes += slot_cache.total_bytes();
4722 self.purge_slot_cache(*remove_slot, &slot_cache);
4723 remove_cache_elapsed.stop();
4724 remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us();
4725 assert!(self.accounts_cache.remove_slot(*remove_slot).is_some());
4727 } else {
4728 self.purge_slot_storage(*remove_slot, purge_stats);
4729 }
4730 }
4734
4735 purge_stats
4736 .remove_cache_elapsed
4737 .fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed);
4738 purge_stats
4739 .num_cached_slots_removed
4740 .fetch_add(num_cached_slots_removed, Ordering::Relaxed);
4741 purge_stats
4742 .total_removed_cached_bytes
4743 .fetch_add(total_removed_cached_bytes, Ordering::Relaxed);
4744 }
4745
4746 fn purge_dead_slots_from_storage<'a>(
4749 &'a self,
4750 removed_slots: impl Iterator<Item = &'a Slot> + Clone,
4751 purge_stats: &PurgeStats,
4752 ) {
4753 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4760 assert!(self
4761 .accounts_index
4762 .get_rooted_from_list(removed_slots.clone())
4763 .is_empty());
4764 safety_checks_elapsed.stop();
4765 purge_stats
4766 .safety_checks_elapsed
4767 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4768
4769 let mut total_removed_stored_bytes = 0;
4770 let mut all_removed_slot_storages = vec![];
4771
4772 let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed");
4773 for remove_slot in removed_slots {
4774 if let Some(store) = self.storage.remove(remove_slot, false) {
4776 total_removed_stored_bytes += store.accounts.capacity();
4777 all_removed_slot_storages.push(store);
4778 }
4779 }
4780 remove_storage_entries_elapsed.stop();
4781 let num_stored_slots_removed = all_removed_slot_storages.len();
4782
4783 let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
4786 drop(all_removed_slot_storages);
4787 drop_storage_entries_elapsed.stop();
4788
4789 purge_stats
4790 .remove_storage_entries_elapsed
4791 .fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4792 purge_stats
4793 .drop_storage_entries_elapsed
4794 .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
4795 purge_stats
4796 .num_stored_slots_removed
4797 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4798 purge_stats
4799 .total_removed_storage_entries
4800 .fetch_add(num_stored_slots_removed, Ordering::Relaxed);
4801 purge_stats
4802 .total_removed_stored_bytes
4803 .fetch_add(total_removed_stored_bytes, Ordering::Relaxed);
4804 self.stats
4805 .dropped_stores
4806 .fetch_add(num_stored_slots_removed as u64, Ordering::Relaxed);
4807 }
4808
4809 fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: &SlotCache) {
4810 let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache
4811 .iter()
4812 .map(|account| (*account.key(), purged_slot))
4813 .collect();
4814 self.purge_slot_cache_pubkeys(purged_slot, pubkey_to_slot_set, true);
4815 }
4816
4817 fn purge_slot_cache_pubkeys(
4818 &self,
4819 purged_slot: Slot,
4820 pubkey_to_slot_set: Vec<(Pubkey, Slot)>,
4821 is_dead: bool,
4822 ) {
4823 assert!(self
4825 .storage
4826 .get_slot_storage_entry_shrinking_in_progress_ok(purged_slot)
4827 .is_none());
4828 let num_purged_keys = pubkey_to_slot_set.len();
4829 let (reclaims, _) = self.purge_keys_exact(pubkey_to_slot_set.iter());
4830 assert_eq!(reclaims.len(), num_purged_keys);
4831 if is_dead {
4832 self.remove_dead_slots_metadata(std::iter::once(&purged_slot));
4833 }
4834 }
4835
4836 fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) {
4837 let mut scan_storages_elapsed = Measure::start("scan_storages_elapsed");
4844 let mut stored_keys = HashSet::new();
4845 if let Some(storage) = self
4846 .storage
4847 .get_slot_storage_entry_shrinking_in_progress_ok(remove_slot)
4848 {
4849 storage
4850 .accounts
4851 .scan_pubkeys(|pk| {
4852 stored_keys.insert((*pk, remove_slot));
4853 })
4854 .expect("must scan accounts storage");
4855 }
4856 scan_storages_elapsed.stop();
4857 purge_stats
4858 .scan_storages_elapsed
4859 .fetch_add(scan_storages_elapsed.as_us(), Ordering::Relaxed);
4860
4861 let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed");
4862 let (reclaims, pubkeys_removed_from_accounts_index) =
4864 self.purge_keys_exact(stored_keys.iter());
4865 purge_accounts_index_elapsed.stop();
4866 purge_stats
4867 .purge_accounts_index_elapsed
4868 .fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed);
4869
4870 let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed");
4873 let expected_dead_slot = Some(remove_slot);
4876 self.handle_reclaims(
4877 (!reclaims.is_empty()).then(|| reclaims.iter()),
4878 expected_dead_slot,
4879 &pubkeys_removed_from_accounts_index,
4880 HandleReclaims::ProcessDeadSlots(purge_stats),
4881 MarkAccountsObsolete::No,
4882 );
4883 handle_reclaims_elapsed.stop();
4884 purge_stats
4885 .handle_reclaims_elapsed
4886 .fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed);
4887 assert!(
4890 self.storage.get_slot_storage_entry(remove_slot).is_none(),
4891 "slot {remove_slot} is not none"
4892 );
4893 }
4894
4895 fn purge_slots<'a>(&self, slots: impl Iterator<Item = &'a Slot> + Clone) {
4896 let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed");
4898 let non_roots = slots
4899 .filter(|slot| !self.accounts_index.is_alive_root(**slot));
4909 safety_checks_elapsed.stop();
4910 self.external_purge_slots_stats
4911 .safety_checks_elapsed
4912 .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed);
4913 self.purge_slots_from_cache_and_store(non_roots, &self.external_purge_slots_stats);
4914 self.external_purge_slots_stats
4915 .report("external_purge_slots_stats", Some(1000));
4916 }
4917
4918 pub fn remove_unrooted_slots(&self, remove_slots: &[(Slot, BankId)]) {
4919 let rooted_slots = self
4920 .accounts_index
4921 .get_rooted_from_list(remove_slots.iter().map(|(slot, _)| slot));
4922 assert!(
4923 rooted_slots.is_empty(),
4924 "Trying to remove accounts for rooted slots {rooted_slots:?}"
4925 );
4926
4927 let RemoveUnrootedSlotsSynchronization {
4928 slots_under_contention,
4929 signal,
4930 } = &self.remove_unrooted_slots_synchronization;
4931
4932 {
4933 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4936
4937 let mut remaining_contended_flush_slots: Vec<Slot> = remove_slots
4940 .iter()
4941 .filter_map(|(remove_slot, _)| {
4942 let is_being_flushed = !currently_contended_slots.insert(*remove_slot);
4951 is_being_flushed.then_some(remove_slot)
4953 })
4954 .cloned()
4955 .collect();
4956
4957 loop {
4959 if !remaining_contended_flush_slots.is_empty() {
4960 currently_contended_slots = signal.wait(currently_contended_slots).unwrap();
4966 } else {
4967 break;
4970 }
4971
4972 remaining_contended_flush_slots.retain(|flush_slot| {
4975 !currently_contended_slots.insert(*flush_slot)
4977 });
4978 }
4979 }
4980
4981 {
4985 let mut locked_removed_bank_ids = self.accounts_index.removed_bank_ids.lock().unwrap();
4986 for (_slot, remove_bank_id) in remove_slots.iter() {
4987 locked_removed_bank_ids.insert(*remove_bank_id);
4988 }
4989 }
4990
4991 let remove_unrooted_purge_stats = PurgeStats::default();
4992 self.purge_slots_from_cache_and_store(
4993 remove_slots.iter().map(|(slot, _)| slot),
4994 &remove_unrooted_purge_stats,
4995 );
4996 remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", None);
4997
4998 let mut currently_contended_slots = slots_under_contention.lock().unwrap();
4999 for (remove_slot, _) in remove_slots {
5000 assert!(currently_contended_slots.remove(remove_slot));
5001 }
5002 }
5003
5004 pub fn lt_hash_account(account: &impl ReadableAccount, pubkey: &Pubkey) -> AccountLtHash {
5006 if account.lamports() == 0 {
5007 return ZERO_LAMPORT_ACCOUNT_LT_HASH;
5008 }
5009
5010 let hasher = Self::hash_account_helper(account, pubkey);
5011 let lt_hash = LtHash::with(&hasher);
5012 AccountLtHash(lt_hash)
5013 }
5014
5015 fn hash_account_helper(account: &impl ReadableAccount, pubkey: &Pubkey) -> blake3::Hasher {
5017 let mut hasher = blake3::Hasher::new();
5018
5019 const META_SIZE: usize = 8 + 1 + 32 + 32 ;
5022 const DATA_SIZE: usize = 200; const BUFFER_SIZE: usize = META_SIZE + DATA_SIZE;
5024 let mut buffer = SmallVec::<[u8; BUFFER_SIZE]>::new();
5025
5026 buffer.extend_from_slice(&account.lamports().to_le_bytes());
5028
5029 let data = account.data();
5030 if data.len() > DATA_SIZE {
5031 hasher.update(&buffer);
5033 buffer.clear();
5034
5035 hasher.update(data);
5037 } else {
5038 buffer.extend_from_slice(data);
5040 }
5041
5042 buffer.push(account.executable().into());
5044 buffer.extend_from_slice(account.owner().as_ref());
5045 buffer.extend_from_slice(pubkey.as_ref());
5046 hasher.update(&buffer);
5047
5048 hasher
5049 }
5050
5051 pub fn mark_slot_frozen(&self, slot: Slot) {
5052 if let Some(slot_cache) = self.accounts_cache.slot_cache(slot) {
5053 slot_cache.mark_slot_frozen();
5054 slot_cache.report_slot_store_metrics();
5055 }
5056 self.accounts_cache.report_size();
5057 }
5058
5059 #[cfg(feature = "dev-context-only-utils")]
5061 pub fn flush_accounts_cache_slot_for_tests(&self, slot: Slot) {
5062 self.flush_slot_cache(slot);
5063 }
5064
5065 fn should_aggressively_flush_cache(&self) -> bool {
5067 self.write_cache_limit_bytes
5068 .unwrap_or(WRITE_CACHE_LIMIT_BYTES_DEFAULT)
5069 < self.accounts_cache.size()
5070 }
5071
5072 pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option<Slot>) {
5076 #[cfg(not(test))]
5077 assert!(requested_flush_root.is_some());
5078
5079 if !force_flush && !self.should_aggressively_flush_cache() {
5080 return;
5081 }
5082
5083 let mut flush_roots_elapsed = Measure::start("flush_roots_elapsed");
5086
5087 let _guard = self.active_stats.activate(ActiveStatItem::Flush);
5088
5089 let (total_new_cleaned_roots, num_cleaned_roots_flushed, mut flush_stats) = self
5093 .flush_rooted_accounts_cache(
5094 requested_flush_root,
5095 true, );
5097 flush_roots_elapsed.stop();
5098
5099 let (total_new_excess_roots, num_excess_roots_flushed, flush_stats_aggressively) =
5105 if self.should_aggressively_flush_cache() {
5106 self.flush_rooted_accounts_cache(None, false)
5112 } else {
5113 (0, 0, FlushStats::default())
5114 };
5115 flush_stats.accumulate(&flush_stats_aggressively);
5116
5117 let mut excess_slot_count = 0;
5118 let mut unflushable_unrooted_slot_count = 0;
5119 let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
5120 if self.should_aggressively_flush_cache() {
5121 let mut old_slots = self.accounts_cache.cached_frozen_slots();
5122 old_slots.sort_unstable();
5123 excess_slot_count = old_slots.len();
5124 let mut flush_stats = FlushStats::default();
5125 old_slots.into_iter().for_each(|old_slot| {
5126 if old_slot > max_flushed_root {
5128 if self.should_aggressively_flush_cache() {
5129 if let Some(stats) = self.flush_slot_cache(old_slot) {
5130 flush_stats.accumulate(&stats);
5131 }
5132 }
5133 } else {
5134 unflushable_unrooted_slot_count += 1;
5135 }
5136 });
5137 datapoint_info!(
5138 "accounts_db-flush_accounts_cache_aggressively",
5139 (
5140 "num_accounts_flushed",
5141 flush_stats.num_accounts_flushed.0,
5142 i64
5143 ),
5144 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
5145 (
5146 "account_bytes_flushed",
5147 flush_stats.num_bytes_flushed.0,
5148 i64
5149 ),
5150 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
5151 ("total_cache_size", self.accounts_cache.size(), i64),
5152 ("total_frozen_slots", excess_slot_count, i64),
5153 ("total_slots", self.accounts_cache.num_slots(), i64),
5154 );
5155 }
5156
5157 datapoint_info!(
5158 "accounts_db-flush_accounts_cache",
5159 ("total_new_cleaned_roots", total_new_cleaned_roots, i64),
5160 ("num_cleaned_roots_flushed", num_cleaned_roots_flushed, i64),
5161 ("total_new_excess_roots", total_new_excess_roots, i64),
5162 ("num_excess_roots_flushed", num_excess_roots_flushed, i64),
5163 ("excess_slot_count", excess_slot_count, i64),
5164 (
5165 "unflushable_unrooted_slot_count",
5166 unflushable_unrooted_slot_count,
5167 i64
5168 ),
5169 ("flush_roots_elapsed", flush_roots_elapsed.as_us(), i64),
5170 (
5171 "account_bytes_flushed",
5172 flush_stats.num_bytes_flushed.0,
5173 i64
5174 ),
5175 (
5176 "num_accounts_flushed",
5177 flush_stats.num_accounts_flushed.0,
5178 i64
5179 ),
5180 ("account_bytes_saved", flush_stats.num_bytes_purged.0, i64),
5181 ("num_accounts_saved", flush_stats.num_accounts_purged.0, i64),
5182 (
5183 "store_accounts_total_us",
5184 flush_stats.store_accounts_total_us.0,
5185 i64
5186 ),
5187 (
5188 "update_index_us",
5189 flush_stats.store_accounts_timing.update_index_elapsed,
5190 i64
5191 ),
5192 (
5193 "store_accounts_elapsed_us",
5194 flush_stats.store_accounts_timing.store_accounts_elapsed,
5195 i64
5196 ),
5197 (
5198 "handle_reclaims_elapsed_us",
5199 flush_stats.store_accounts_timing.handle_reclaims_elapsed,
5200 i64
5201 ),
5202 );
5203 }
5204
5205 fn flush_rooted_accounts_cache(
5206 &self,
5207 requested_flush_root: Option<Slot>,
5208 should_clean: bool,
5209 ) -> (usize, usize, FlushStats) {
5210 let max_clean_root = should_clean
5211 .then(|| {
5212 self.max_clean_root(requested_flush_root)
5215 })
5216 .flatten();
5217
5218 let mut written_accounts = HashSet::new();
5219
5220 let mut should_flush_f = should_clean
5223 .then(|| {
5224 Some(move |&pubkey: &Pubkey| {
5225 written_accounts.insert(pubkey)
5227 })
5228 })
5229 .flatten();
5230
5231 let flushed_roots: BTreeSet<Slot> = self.accounts_cache.clear_roots(requested_flush_root);
5233
5234 let mut num_roots_flushed = 0;
5237 let mut flush_stats = FlushStats::default();
5238 for &root in flushed_roots.iter().rev() {
5239 if let Some(stats) =
5240 self.flush_slot_cache_with_clean(root, should_flush_f.as_mut(), max_clean_root)
5241 {
5242 num_roots_flushed += 1;
5243 flush_stats.accumulate(&stats);
5244 }
5245 }
5246
5247 if let Some(&root) = flushed_roots.last() {
5256 self.accounts_cache.set_max_flush_root(root);
5257 }
5258 let num_new_roots = flushed_roots.len();
5259 (num_new_roots, num_roots_flushed, flush_stats)
5260 }
5261
5262 fn do_flush_slot_cache(
5263 &self,
5264 slot: Slot,
5265 slot_cache: &SlotCache,
5266 mut should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5267 max_clean_root: Option<Slot>,
5268 ) -> FlushStats {
5269 let mut flush_stats = FlushStats::default();
5270 let iter_items: Vec<_> = slot_cache.iter().collect();
5271 let mut pubkey_to_slot_set: Vec<(Pubkey, Slot)> = vec![];
5272 if should_flush_f.is_some() {
5273 if let Some(max_clean_root) = max_clean_root {
5274 if slot > max_clean_root {
5275 should_flush_f = None;
5279 }
5280 }
5281 }
5282
5283 let accounts: Vec<(&Pubkey, &AccountSharedData)> = iter_items
5284 .iter()
5285 .filter_map(|iter_item| {
5286 let key = iter_item.key();
5287 let account = &iter_item.value().account;
5288 let should_flush = should_flush_f
5289 .as_mut()
5290 .map(|should_flush_f| should_flush_f(key))
5291 .unwrap_or(true);
5292 if should_flush {
5293 flush_stats.num_bytes_flushed +=
5294 aligned_stored_size(account.data().len()) as u64;
5295 flush_stats.num_accounts_flushed += 1;
5296 Some((key, account))
5297 } else {
5298 pubkey_to_slot_set.push((*key, slot));
5301 flush_stats.num_bytes_purged +=
5302 aligned_stored_size(account.data().len()) as u64;
5303 flush_stats.num_accounts_purged += 1;
5304 None
5305 }
5306 })
5307 .collect();
5308
5309 let is_dead_slot = accounts.is_empty();
5310 self.purge_slot_cache_pubkeys(slot, pubkey_to_slot_set, is_dead_slot);
5313
5314 if !is_dead_slot {
5315 let flushed_store = self.create_and_insert_store(
5319 slot,
5320 flush_stats.num_bytes_flushed.0,
5321 "flush_slot_cache",
5322 );
5323
5324 let reclaim_method = if self.mark_obsolete_accounts && should_flush_f.is_some() {
5330 UpsertReclaim::ReclaimOldSlots
5331 } else {
5332 UpsertReclaim::IgnoreReclaims
5333 };
5334
5335 let (store_accounts_timing_inner, store_accounts_total_inner_us) = measure_us!(self
5336 ._store_accounts_frozen(
5337 (slot, &accounts[..]),
5338 &flushed_store,
5339 reclaim_method,
5340 UpdateIndexThreadSelection::PoolWithThreshold,
5341 ));
5342 flush_stats.store_accounts_timing = store_accounts_timing_inner;
5343 flush_stats.store_accounts_total_us = Saturating(store_accounts_total_inner_us);
5344
5345 assert!(self.storage.get_slot_storage_entry(slot).is_some());
5348 self.reopen_storage_as_readonly_shrinking_in_progress_ok(slot);
5349 }
5350
5351 assert!(self.accounts_cache.remove_slot(slot).is_some());
5356
5357 self.uncleaned_pubkeys
5360 .entry(slot)
5361 .or_default()
5362 .extend(accounts.iter().map(|(pubkey, _account)| **pubkey));
5363
5364 flush_stats
5365 }
5366
5367 fn flush_slot_cache(&self, slot: Slot) -> Option<FlushStats> {
5369 self.flush_slot_cache_with_clean(slot, None::<&mut fn(&_) -> bool>, None)
5370 }
5371
5372 fn flush_slot_cache_with_clean(
5376 &self,
5377 slot: Slot,
5378 should_flush_f: Option<&mut impl FnMut(&Pubkey) -> bool>,
5379 max_clean_root: Option<Slot>,
5380 ) -> Option<FlushStats> {
5381 if self
5382 .remove_unrooted_slots_synchronization
5383 .slots_under_contention
5384 .lock()
5385 .unwrap()
5386 .insert(slot)
5387 {
5388 let flush_stats = self.accounts_cache.slot_cache(slot).map(|slot_cache| {
5390 #[cfg(test)]
5391 {
5392 sleep(Duration::from_millis(self.load_delay));
5394 }
5395 self.do_flush_slot_cache(slot, &slot_cache, should_flush_f, max_clean_root)
5400 });
5401
5402 assert!(self
5405 .remove_unrooted_slots_synchronization
5406 .slots_under_contention
5407 .lock()
5408 .unwrap()
5409 .remove(&slot));
5410
5411 self.remove_unrooted_slots_synchronization
5414 .signal
5415 .notify_all();
5416 flush_stats
5417 } else {
5418 None
5420 }
5421 }
5422
5423 fn report_store_stats(&self) {
5424 let mut total_count = 0;
5425 let mut newest_slot = 0;
5426 let mut oldest_slot = u64::MAX;
5427 let mut total_bytes = 0;
5428 let mut total_alive_bytes = 0;
5429 for (slot, store) in self.storage.iter() {
5430 total_count += 1;
5431 newest_slot = std::cmp::max(newest_slot, slot);
5432
5433 oldest_slot = std::cmp::min(oldest_slot, slot);
5434
5435 total_alive_bytes += store.alive_bytes();
5436 total_bytes += store.capacity();
5437 }
5438 info!(
5439 "total_stores: {total_count}, newest_slot: {newest_slot}, oldest_slot: {oldest_slot}"
5440 );
5441
5442 let total_alive_ratio = if total_bytes > 0 {
5443 total_alive_bytes as f64 / total_bytes as f64
5444 } else {
5445 0.
5446 };
5447
5448 datapoint_info!(
5449 "accounts_db-stores",
5450 ("total_count", total_count, i64),
5451 ("total_bytes", total_bytes, i64),
5452 ("total_alive_bytes", total_alive_bytes, i64),
5453 ("total_alive_ratio", total_alive_ratio, f64),
5454 );
5455 }
5456
5457 pub fn calculate_accounts_lt_hash_at_startup_from_index(
5462 &self,
5463 ancestors: &Ancestors,
5464 startup_slot: Slot,
5465 ) -> AccountsLtHash {
5466 let lt_hash = self
5474 .accounts_index
5475 .account_maps
5476 .par_iter()
5477 .fold(
5478 LtHash::identity,
5479 |mut accumulator_lt_hash, accounts_index_bin| {
5480 for pubkey in accounts_index_bin.keys() {
5481 let account_lt_hash = self
5482 .accounts_index
5483 .get_with_and_then(
5484 &pubkey,
5485 Some(ancestors),
5486 Some(startup_slot),
5487 false,
5488 |(slot, account_info)| {
5489 (!account_info.is_zero_lamport()).then(|| {
5490 self.get_account_accessor(
5491 slot,
5492 &pubkey,
5493 &account_info.storage_location(),
5494 )
5495 .get_loaded_account(|loaded_account| {
5496 Self::lt_hash_account(&loaded_account, &pubkey)
5497 })
5498 .unwrap()
5501 })
5502 },
5503 )
5504 .flatten();
5505 if let Some(account_lt_hash) = account_lt_hash {
5506 accumulator_lt_hash.mix_in(&account_lt_hash.0);
5507 }
5508 }
5509 accumulator_lt_hash
5510 },
5511 )
5512 .reduce(LtHash::identity, |mut accum, elem| {
5513 accum.mix_in(&elem);
5514 accum
5515 });
5516
5517 AccountsLtHash(lt_hash)
5518 }
5519
5520 pub fn calculate_accounts_lt_hash_at_startup_from_storages(
5528 &self,
5529 storages: &[Arc<AccountStorageEntry>],
5530 duplicates_lt_hash: &DuplicatesLtHash,
5531 startup_slot: Slot,
5532 num_threads: NonZeroUsize,
5533 ) -> AccountsLtHash {
5534 let storages =
5538 AccountStoragesOrderer::with_random_order(storages).into_concurrent_consumer();
5539 let mut lt_hash = thread::scope(|s| {
5540 let handles = (0..num_threads.get())
5541 .map(|i| {
5542 thread::Builder::new()
5543 .name(format!("solAcctLtHash{i:02}"))
5544 .spawn_scoped(s, || {
5545 let mut thread_lt_hash = LtHash::identity();
5546 let mut reader = append_vec::new_scan_accounts_reader();
5547
5548 while let Some(storage) = storages.next() {
5549 let obsolete_accounts =
5553 storage.get_obsolete_accounts(Some(startup_slot));
5554 storage
5555 .accounts
5556 .scan_accounts(&mut reader, |offset, account| {
5557 if !obsolete_accounts
5559 .contains(&(offset, account.data.len()))
5560 {
5561 let account_lt_hash =
5562 Self::lt_hash_account(&account, account.pubkey());
5563 thread_lt_hash.mix_in(&account_lt_hash.0);
5564 }
5565 })
5566 .expect("must scan accounts storage");
5567 }
5568 thread_lt_hash
5569 })
5570 })
5571 .collect::<Result<Vec<_>, _>>()
5572 .expect("threads should spawn successfully");
5573 handles
5574 .into_iter()
5575 .map(|handle| handle.join().expect("thread should join successfully"))
5576 .fold(LtHash::identity(), |mut accum, elem| {
5577 accum.mix_in(&elem);
5578 accum
5579 })
5580 });
5581
5582 if self.mark_obsolete_accounts {
5583 assert_eq!(*duplicates_lt_hash, DuplicatesLtHash::default());
5587 }
5588 lt_hash.mix_out(&duplicates_lt_hash.0);
5589
5590 AccountsLtHash(lt_hash)
5591 }
5592
5593 pub fn calculate_capitalization_at_startup_from_index(
5602 &self,
5603 ancestors: &Ancestors,
5604 startup_slot: Slot,
5605 ) -> u64 {
5606 self.accounts_index
5607 .account_maps
5608 .par_iter()
5609 .map(|accounts_index_bin| {
5610 accounts_index_bin
5611 .keys()
5612 .into_iter()
5613 .map(|pubkey| {
5614 self.accounts_index
5615 .get_with_and_then(
5616 &pubkey,
5617 Some(ancestors),
5618 Some(startup_slot),
5619 false,
5620 |(slot, account_info)| {
5621 (!account_info.is_zero_lamport()).then(|| {
5622 self.get_account_accessor(
5623 slot,
5624 &pubkey,
5625 &account_info.storage_location(),
5626 )
5627 .get_loaded_account(|loaded_account| {
5628 loaded_account.lamports()
5629 })
5630 .unwrap()
5633 })
5634 },
5635 )
5636 .flatten()
5637 .unwrap_or(0)
5638 })
5639 .try_fold(0, u64::checked_add)
5640 })
5641 .try_reduce(|| 0, u64::checked_add)
5642 .expect("capitalization cannot overflow")
5643 }
5644
5645 fn apply_offset_to_slot(slot: Slot, offset: i64) -> Slot {
5647 if offset > 0 {
5648 slot.saturating_add(offset as u64)
5649 } else {
5650 slot.saturating_sub(offset.unsigned_abs())
5651 }
5652 }
5653
5654 pub fn get_pubkeys_for_slot(&self, slot: Slot) -> Vec<Pubkey> {
5656 let scan_result = self.scan_cache_storage_fallback(
5657 slot,
5658 |loaded_account| Some(*loaded_account.pubkey()),
5659 |accum: &mut HashSet<Pubkey>, storage| {
5660 storage
5661 .scan_pubkeys(|pubkey| {
5662 accum.insert(*pubkey);
5663 })
5664 .expect("must scan accounts storage");
5665 },
5666 );
5667 match scan_result {
5668 ScanStorageResult::Cached(cached_result) => cached_result,
5669 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5670 }
5671 }
5672
5673 pub fn get_pubkey_account_for_slot(&self, slot: Slot) -> Vec<(Pubkey, AccountSharedData)> {
5675 let scan_result = self.scan_account_storage(
5676 slot,
5677 |loaded_account| {
5678 Some((*loaded_account.pubkey(), loaded_account.take_account()))
5680 },
5681 |accum: &mut HashMap<_, _>, stored_account, data| {
5682 let data = data.unwrap();
5685 let loaded_account =
5686 LoadedAccount::Stored(StoredAccountInfo::new_from(stored_account, data));
5687 accum.insert(*loaded_account.pubkey(), loaded_account.take_account());
5689 },
5690 ScanAccountStorageData::DataRefForStorage,
5691 );
5692
5693 match scan_result {
5694 ScanStorageResult::Cached(cached_result) => cached_result,
5695 ScanStorageResult::Stored(stored_result) => stored_result.into_iter().collect(),
5696 }
5697 }
5698
5699 fn update_index<'a>(
5700 &self,
5701 infos: Vec<AccountInfo>,
5702 accounts: &impl StorableAccounts<'a>,
5703 reclaim: UpsertReclaim,
5704 update_index_thread_selection: UpdateIndexThreadSelection,
5705 thread_pool: &ThreadPool,
5706 ) -> SlotList<AccountInfo> {
5707 let target_slot = accounts.target_slot();
5708 let len = std::cmp::min(accounts.len(), infos.len());
5709
5710 if reclaim == UpsertReclaim::ReclaimOldSlots {
5715 assert!(target_slot <= self.accounts_index.max_root_inclusive());
5716 }
5717
5718 let update = |start, end| {
5719 let mut reclaims = Vec::with_capacity((end - start) / 2);
5720
5721 (start..end).for_each(|i| {
5722 let info = infos[i];
5723 accounts.account(i, |account| {
5724 let old_slot = accounts.slot(i);
5725 self.accounts_index.upsert(
5726 target_slot,
5727 old_slot,
5728 account.pubkey(),
5729 &account,
5730 &self.account_indexes,
5731 info,
5732 &mut reclaims,
5733 reclaim,
5734 );
5735 });
5736 });
5737 reclaims
5738 };
5739
5740 let threshold = 1;
5741 if matches!(
5742 update_index_thread_selection,
5743 UpdateIndexThreadSelection::PoolWithThreshold,
5744 ) && len > threshold
5745 {
5746 let chunk_size = std::cmp::max(1, len / quarter_thread_count()); let batches = 1 + len / chunk_size;
5748 thread_pool.install(|| {
5749 (0..batches)
5750 .into_par_iter()
5751 .map(|batch| {
5752 let start = batch * chunk_size;
5753 let end = std::cmp::min(start + chunk_size, len);
5754 update(start, end)
5755 })
5756 .flatten()
5757 .collect::<Vec<_>>()
5758 })
5759 } else {
5760 update(0, len)
5761 }
5762 }
5763
5764 fn should_not_shrink(alive_bytes: u64, total_bytes: u64) -> bool {
5765 alive_bytes >= total_bytes
5766 }
5767
5768 fn is_shrinking_productive(store: &AccountStorageEntry) -> bool {
5769 let alive_count = store.count();
5770 let total_bytes = store.capacity();
5771 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5772 if Self::should_not_shrink(alive_bytes, total_bytes) {
5773 trace!(
5774 "shrink_slot_forced ({}): not able to shrink at all: num alive: {}, bytes alive: \
5775 {}, bytes total: {}, bytes saved: {}",
5776 store.slot(),
5777 alive_count,
5778 alive_bytes,
5779 total_bytes,
5780 total_bytes.saturating_sub(alive_bytes),
5781 );
5782 return false;
5783 }
5784
5785 true
5786 }
5787
5788 pub(crate) fn is_candidate_for_shrink(&self, store: &AccountStorageEntry) -> bool {
5791 let total_bytes = store.capacity();
5794
5795 let alive_bytes = store.alive_bytes_exclude_zero_lamport_single_ref_accounts() as u64;
5796 match self.shrink_ratio {
5797 AccountShrinkThreshold::TotalSpace { shrink_ratio: _ } => alive_bytes < total_bytes,
5798 AccountShrinkThreshold::IndividualStore { shrink_ratio } => {
5799 (alive_bytes as f64 / total_bytes as f64) < shrink_ratio
5800 }
5801 }
5802 }
5803
5804 fn remove_dead_accounts<'a, I>(
5806 &'a self,
5807 reclaims: I,
5808 expected_slot: Option<Slot>,
5809 mark_accounts_obsolete: MarkAccountsObsolete,
5810 ) -> (IntSet<Slot>, SlotOffsets)
5811 where
5812 I: Iterator<Item = &'a (Slot, AccountInfo)>,
5813 {
5814 let mut reclaimed_offsets = SlotOffsets::default();
5815
5816 assert!(self.storage.no_shrink_in_progress());
5817
5818 let mut dead_slots = IntSet::default();
5819 let mut new_shrink_candidates = ShrinkCandidates::default();
5820 let mut measure = Measure::start("remove");
5821 for (slot, account_info) in reclaims {
5822 assert!(!account_info.is_cached());
5824 reclaimed_offsets
5825 .entry(*slot)
5826 .or_default()
5827 .insert(account_info.offset());
5828 }
5829 if let Some(expected_slot) = expected_slot {
5830 assert_eq!(reclaimed_offsets.len(), 1);
5831 assert!(reclaimed_offsets.contains_key(&expected_slot));
5832 }
5833
5834 self.clean_accounts_stats
5835 .slots_cleaned
5836 .fetch_add(reclaimed_offsets.len() as u64, Ordering::Relaxed);
5837
5838 reclaimed_offsets.iter().for_each(|(slot, offsets)| {
5839 if let Some(store) = self.storage.get_slot_storage_entry(*slot) {
5840 assert_eq!(
5841 *slot,
5842 store.slot(),
5843 "AccountsDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, \
5844 should only point to one slot",
5845 store.slot(),
5846 *slot
5847 );
5848
5849 let remaining_accounts = if offsets.len() == store.count() {
5850 store.remove_accounts(store.alive_bytes(), offsets.len())
5852 } else {
5853 let (remaining_accounts, us) = measure_us!({
5855 let mut offsets = offsets.iter().cloned().collect::<Vec<_>>();
5856 offsets.sort_unstable();
5858 let data_lens = store.accounts.get_account_data_lens(&offsets);
5859 let dead_bytes = data_lens
5860 .iter()
5861 .map(|len| store.accounts.calculate_stored_size(*len))
5862 .sum();
5863 let remaining_accounts = store.remove_accounts(dead_bytes, offsets.len());
5864
5865 if let MarkAccountsObsolete::Yes(slot_marked_obsolete) =
5866 mark_accounts_obsolete
5867 {
5868 store.mark_accounts_obsolete(
5869 offsets.into_iter().zip(data_lens),
5870 slot_marked_obsolete,
5871 );
5872 }
5873 remaining_accounts
5874 });
5875 self.clean_accounts_stats
5876 .get_account_sizes_us
5877 .fetch_add(us, Ordering::Relaxed);
5878 remaining_accounts
5879 };
5880
5881 if remaining_accounts == 0 {
5885 self.dirty_stores.insert(*slot, store);
5886 dead_slots.insert(*slot);
5887 } else if Self::is_shrinking_productive(&store)
5888 && self.is_candidate_for_shrink(&store)
5889 {
5890 new_shrink_candidates.insert(*slot);
5895 };
5896 }
5897 });
5898 measure.stop();
5899 self.clean_accounts_stats
5900 .remove_dead_accounts_remove_us
5901 .fetch_add(measure.as_us(), Ordering::Relaxed);
5902
5903 let mut measure = Measure::start("shrink");
5904 let mut shrink_candidate_slots = self.shrink_candidate_slots.lock().unwrap();
5905 for slot in new_shrink_candidates {
5906 shrink_candidate_slots.insert(slot);
5907 }
5908 drop(shrink_candidate_slots);
5909 measure.stop();
5910 self.clean_accounts_stats
5911 .remove_dead_accounts_shrink_us
5912 .fetch_add(measure.as_us(), Ordering::Relaxed);
5913
5914 dead_slots.retain(|slot| {
5915 if let Some(slot_store) = self.storage.get_slot_storage_entry(*slot) {
5916 if slot_store.count() != 0 {
5917 return false;
5918 }
5919 }
5920 true
5921 });
5922
5923 (dead_slots, reclaimed_offsets)
5924 }
5925
5926 fn remove_dead_slots_metadata<'a>(&'a self, dead_slots_iter: impl Iterator<Item = &'a Slot>) {
5927 let mut measure = Measure::start("remove_dead_slots_metadata-ms");
5928 self.clean_dead_slots_from_accounts_index(dead_slots_iter);
5929 measure.stop();
5930 inc_new_counter_info!("remove_dead_slots_metadata-ms", measure.as_ms() as usize);
5931 }
5932
5933 fn unref_pubkeys<'a>(
5936 &'a self,
5937 pubkeys: impl Iterator<Item = &'a Pubkey> + Clone + Send + Sync,
5938 num_pubkeys: usize,
5939 pubkeys_removed_from_accounts_index: &'a PubkeysRemovedFromAccountsIndex,
5940 ) {
5941 let batches = 1 + (num_pubkeys / UNREF_ACCOUNTS_BATCH_SIZE);
5942 self.thread_pool_background.install(|| {
5943 (0..batches).into_par_iter().for_each(|batch| {
5944 let skip = batch * UNREF_ACCOUNTS_BATCH_SIZE;
5945 self.accounts_index.scan(
5946 pubkeys
5947 .clone()
5948 .skip(skip)
5949 .take(UNREF_ACCOUNTS_BATCH_SIZE)
5950 .filter(|pubkey| {
5951 let already_removed =
5953 pubkeys_removed_from_accounts_index.contains(pubkey);
5954 !already_removed
5955 }),
5956 |_pubkey, slots_refs, _entry| {
5957 if let Some((slot_list, ref_count)) = slots_refs {
5958 if slot_list.len() == 1 && ref_count == 2 {
5960 if let Some((slot_alive, acct_info)) = slot_list.first() {
5961 if acct_info.is_zero_lamport() && !acct_info.is_cached() {
5962 self.zero_lamport_single_ref_found(
5963 *slot_alive,
5964 acct_info.offset(),
5965 );
5966 }
5967 }
5968 }
5969 }
5970 AccountsIndexScanResult::Unref
5971 },
5972 None,
5973 false,
5974 ScanFilter::All,
5975 )
5976 });
5977 });
5978 }
5979
5980 fn unref_accounts(
5985 &self,
5986 purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
5987 purged_stored_account_slots: &mut AccountSlots,
5988 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
5989 ) {
5990 self.unref_pubkeys(
5991 purged_slot_pubkeys.iter().map(|(_slot, pubkey)| pubkey),
5992 purged_slot_pubkeys.len(),
5993 pubkeys_removed_from_accounts_index,
5994 );
5995 for (slot, pubkey) in purged_slot_pubkeys {
5996 purged_stored_account_slots
5997 .entry(pubkey)
5998 .or_default()
5999 .insert(slot);
6000 }
6001 }
6002
6003 fn clean_dead_slots_from_accounts_index<'a>(
6004 &'a self,
6005 dead_slots_iter: impl Iterator<Item = &'a Slot>,
6006 ) {
6007 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
6008 let mut measure = Measure::start("clean_dead_slot");
6009 let mut rooted_cleaned_count = 0;
6010 let mut unrooted_cleaned_count = 0;
6011 let dead_slots: Vec<_> = dead_slots_iter
6012 .map(|slot| {
6013 if self.accounts_index.clean_dead_slot(*slot) {
6014 rooted_cleaned_count += 1;
6015 } else {
6016 unrooted_cleaned_count += 1;
6017 }
6018 *slot
6019 })
6020 .collect();
6021 measure.stop();
6022 accounts_index_root_stats.clean_dead_slot_us += measure.as_us();
6023 if self.log_dead_slots.load(Ordering::Relaxed) {
6024 info!(
6025 "remove_dead_slots_metadata: {} dead slots",
6026 dead_slots.len()
6027 );
6028 trace!("remove_dead_slots_metadata: dead_slots: {dead_slots:?}");
6029 }
6030 self.accounts_index
6031 .update_roots_stats(&mut accounts_index_root_stats);
6032 accounts_index_root_stats.rooted_cleaned_count += rooted_cleaned_count;
6033 accounts_index_root_stats.unrooted_cleaned_count += unrooted_cleaned_count;
6034
6035 self.clean_accounts_stats
6036 .latest_accounts_index_roots_stats
6037 .update(&accounts_index_root_stats);
6038 }
6039
6040 fn clean_stored_dead_slots(
6043 &self,
6044 dead_slots: &IntSet<Slot>,
6045 purged_account_slots: Option<&mut AccountSlots>,
6046 pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
6047 ) {
6048 let mut measure = Measure::start("clean_stored_dead_slots-ms");
6049 let mut stores = vec![];
6050 for slot in dead_slots.iter() {
6052 if let Some(slot_storage) = self.storage.get_slot_storage_entry(*slot) {
6053 stores.push(slot_storage);
6054 }
6055 }
6056 let purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = {
6058 self.thread_pool_background.install(|| {
6059 stores
6060 .into_par_iter()
6061 .map(|store| {
6062 let slot = store.slot();
6063 let mut pubkeys = Vec::with_capacity(store.count());
6064 let obsolete_accounts = store.get_obsolete_accounts(None);
6067 store
6068 .accounts
6069 .scan_accounts_without_data(|offset, account| {
6070 if !obsolete_accounts.contains(&(offset, account.data_len)) {
6071 pubkeys.push((slot, *account.pubkey));
6072 }
6073 })
6074 .expect("must scan accounts storage");
6075 pubkeys
6076 })
6077 .flatten()
6078 .collect::<HashSet<_>>()
6079 })
6080 };
6081
6082 let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
6084 let mut measure_unref = Measure::start("unref_from_storage");
6085
6086 if let Some(purged_account_slots) = purged_account_slots {
6087 self.unref_accounts(
6088 purged_slot_pubkeys,
6089 purged_account_slots,
6090 pubkeys_removed_from_accounts_index,
6091 );
6092 }
6093 measure_unref.stop();
6094 accounts_index_root_stats.clean_unref_from_storage_us += measure_unref.as_us();
6095
6096 self.clean_accounts_stats
6097 .latest_accounts_index_roots_stats
6098 .update(&accounts_index_root_stats);
6099
6100 measure.stop();
6101 self.clean_accounts_stats
6102 .clean_stored_dead_slots_us
6103 .fetch_add(measure.as_us(), Ordering::Relaxed);
6104 }
6105
6106 pub(crate) fn store_accounts_unfrozen<'a>(
6109 &self,
6110 accounts: impl StorableAccounts<'a>,
6111 transactions: Option<&'a [&'a SanitizedTransaction]>,
6112 update_index_thread_selection: UpdateIndexThreadSelection,
6113 ) {
6114 if accounts.is_empty() {
6117 return;
6118 }
6119
6120 let mut total_data = 0;
6121 (0..accounts.len()).for_each(|index| {
6122 total_data += accounts.data_len(index);
6123 });
6124
6125 self.stats
6126 .store_total_data
6127 .fetch_add(total_data as u64, Ordering::Relaxed);
6128
6129 let mut store_accounts_time = Measure::start("store_accounts");
6131 let infos = self.write_accounts_to_cache(accounts.target_slot(), &accounts, transactions);
6132 store_accounts_time.stop();
6133 self.stats
6134 .store_accounts
6135 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
6136
6137 let mut update_index_time = Measure::start("update_index");
6139
6140 self.update_index(
6141 infos,
6142 &accounts,
6143 UpsertReclaim::PreviousSlotEntryWasCached,
6144 update_index_thread_selection,
6145 &self.thread_pool_foreground,
6146 );
6147
6148 update_index_time.stop();
6149 self.stats
6150 .store_update_index
6151 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
6152 self.stats
6153 .store_num_accounts
6154 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
6155 self.report_store_timings();
6156 }
6157
6158 pub fn store_accounts_frozen<'a>(
6164 &self,
6165 accounts: impl StorableAccounts<'a>,
6166 storage: &Arc<AccountStorageEntry>,
6167 update_index_thread_selection: UpdateIndexThreadSelection,
6168 ) -> StoreAccountsTiming {
6169 self._store_accounts_frozen(
6170 accounts,
6171 storage,
6172 UpsertReclaim::IgnoreReclaims,
6173 update_index_thread_selection,
6174 )
6175 }
6176
6177 fn _store_accounts_frozen<'a>(
6181 &self,
6182 accounts: impl StorableAccounts<'a>,
6183 storage: &Arc<AccountStorageEntry>,
6184 reclaim_handling: UpsertReclaim,
6185 update_index_thread_selection: UpdateIndexThreadSelection,
6186 ) -> StoreAccountsTiming {
6187 let slot = accounts.target_slot();
6188 let mut store_accounts_time = Measure::start("store_accounts");
6189
6190 if self.read_only_accounts_cache.can_slot_be_in_cache(slot) {
6192 (0..accounts.len()).for_each(|index| {
6193 self.read_only_accounts_cache
6196 .remove_assume_not_present(accounts.pubkey(index));
6197 });
6198 }
6199
6200 let infos = self.write_accounts_to_storage(slot, storage, &accounts);
6202 store_accounts_time.stop();
6203 self.stats
6204 .store_accounts
6205 .fetch_add(store_accounts_time.as_us(), Ordering::Relaxed);
6206
6207 self.mark_zero_lamport_single_ref_accounts(&infos, storage, reclaim_handling);
6208
6209 let mut update_index_time = Measure::start("update_index");
6210
6211 let reclaims = self.update_index(
6216 infos,
6217 &accounts,
6218 reclaim_handling,
6219 update_index_thread_selection,
6220 &self.thread_pool_background,
6221 );
6222
6223 update_index_time.stop();
6224 self.stats
6225 .store_update_index
6226 .fetch_add(update_index_time.as_us(), Ordering::Relaxed);
6227 self.stats
6228 .store_num_accounts
6229 .fetch_add(accounts.len() as u64, Ordering::Relaxed);
6230
6231 let mut handle_reclaims_elapsed = 0;
6234 if !reclaims.is_empty() {
6235 let purge_stats = PurgeStats::default();
6236 let mut handle_reclaims_time = Measure::start("handle_reclaims");
6237 self.handle_reclaims(
6238 (!reclaims.is_empty()).then(|| reclaims.iter()),
6239 None,
6240 &HashSet::default(),
6241 HandleReclaims::ProcessDeadSlots(&purge_stats),
6242 MarkAccountsObsolete::Yes(slot),
6243 );
6244 handle_reclaims_time.stop();
6245 handle_reclaims_elapsed = handle_reclaims_time.as_us();
6246 self.stats.num_obsolete_slots_removed.fetch_add(
6247 purge_stats.num_stored_slots_removed.load(Ordering::Relaxed),
6248 Ordering::Relaxed,
6249 );
6250 self.stats.num_obsolete_bytes_removed.fetch_add(
6251 purge_stats
6252 .total_removed_stored_bytes
6253 .load(Ordering::Relaxed),
6254 Ordering::Relaxed,
6255 );
6256 self.stats
6257 .store_handle_reclaims
6258 .fetch_add(handle_reclaims_elapsed, Ordering::Relaxed);
6259 }
6260
6261 StoreAccountsTiming {
6262 store_accounts_elapsed: store_accounts_time.as_us(),
6263 update_index_elapsed: update_index_time.as_us(),
6264 handle_reclaims_elapsed,
6265 }
6266 }
6267
6268 fn write_accounts_to_cache<'a, 'b>(
6269 &self,
6270 slot: Slot,
6271 accounts_and_meta_to_store: &impl StorableAccounts<'b>,
6272 txs: Option<&[&SanitizedTransaction]>,
6273 ) -> Vec<AccountInfo> {
6274 let mut current_write_version = if self.accounts_update_notifier.is_some() {
6275 self.write_version
6276 .fetch_add(accounts_and_meta_to_store.len() as u64, Ordering::AcqRel)
6277 } else {
6278 0
6279 };
6280
6281 (0..accounts_and_meta_to_store.len())
6282 .map(|index| {
6283 let txn = txs.map(|txs| *txs.get(index).expect("txs must be present if provided"));
6284 accounts_and_meta_to_store.account_default_if_zero_lamport(index, |account| {
6285 let account_shared_data = account.to_account_shared_data();
6286 let pubkey = account.pubkey();
6287 let account_info =
6288 AccountInfo::new(StorageLocation::Cached, account.is_zero_lamport());
6289
6290 self.notify_account_at_accounts_update(
6291 slot,
6292 &account_shared_data,
6293 &txn,
6294 pubkey,
6295 current_write_version,
6296 );
6297 current_write_version = current_write_version.saturating_add(1);
6298
6299 self.accounts_cache.store(slot, pubkey, account_shared_data);
6300 account_info
6301 })
6302 })
6303 .collect()
6304 }
6305
6306 fn write_accounts_to_storage<'a>(
6307 &self,
6308 slot: Slot,
6309 storage: &AccountStorageEntry,
6310 accounts_and_meta_to_store: &impl StorableAccounts<'a>,
6311 ) -> Vec<AccountInfo> {
6312 let mut infos: Vec<AccountInfo> = Vec::with_capacity(accounts_and_meta_to_store.len());
6313 let mut total_append_accounts_us = 0;
6314 while infos.len() < accounts_and_meta_to_store.len() {
6315 let mut append_accounts = Measure::start("append_accounts");
6316 let stored_accounts_info = storage
6317 .accounts
6318 .write_accounts(accounts_and_meta_to_store, infos.len());
6319 append_accounts.stop();
6320 total_append_accounts_us += append_accounts.as_us();
6321 let Some(stored_accounts_info) = stored_accounts_info else {
6322 storage.set_status(AccountStorageStatus::Full);
6323
6324 let data_len = accounts_and_meta_to_store.data_len(infos.len());
6326 let data_len = (data_len + STORE_META_OVERHEAD) as u64;
6327 if !self.has_space_available(slot, data_len) {
6328 info!(
6329 "write_accounts_to_storage, no space: {}, {}, {}, {}, {}",
6330 storage.accounts.capacity(),
6331 storage.accounts.remaining_bytes(),
6332 data_len,
6333 infos.len(),
6334 accounts_and_meta_to_store.len()
6335 );
6336 let special_store_size = std::cmp::max(data_len * 2, self.file_size);
6337 self.create_and_insert_store(slot, special_store_size, "large create");
6338 }
6339 continue;
6340 };
6341
6342 let store_id = storage.id();
6343 for (i, offset) in stored_accounts_info.offsets.iter().enumerate() {
6344 infos.push(AccountInfo::new(
6345 StorageLocation::AppendVec(store_id, *offset),
6346 accounts_and_meta_to_store.is_zero_lamport(i),
6347 ));
6348 }
6349 storage.add_accounts(
6350 stored_accounts_info.offsets.len(),
6351 stored_accounts_info.size,
6352 );
6353
6354 storage.set_status(AccountStorageStatus::Available);
6356 }
6357
6358 self.stats
6359 .store_append_accounts
6360 .fetch_add(total_append_accounts_us, Ordering::Relaxed);
6361
6362 infos
6363 }
6364
6365 fn mark_zero_lamport_single_ref_accounts(
6367 &self,
6368 account_infos: &[AccountInfo],
6369 storage: &AccountStorageEntry,
6370 reclaim_handling: UpsertReclaim,
6371 ) {
6372 if reclaim_handling == UpsertReclaim::ReclaimOldSlots {
6378 let mut add_zero_lamport_accounts = Measure::start("add_zero_lamport_accounts");
6379 let mut num_zero_lamport_accounts_added = 0;
6380
6381 for account_info in account_infos {
6382 if account_info.is_zero_lamport() {
6383 storage.insert_zero_lamport_single_ref_account_offset(account_info.offset());
6384 num_zero_lamport_accounts_added += 1;
6385 }
6386 }
6387
6388 if num_zero_lamport_accounts_added > 0
6390 && self.is_candidate_for_shrink(storage)
6391 && Self::is_shrinking_productive(storage)
6392 {
6393 self.shrink_candidate_slots
6394 .lock()
6395 .unwrap()
6396 .insert(storage.slot);
6397 }
6398
6399 add_zero_lamport_accounts.stop();
6400 self.stats
6401 .add_zero_lamport_accounts_us
6402 .fetch_add(add_zero_lamport_accounts.as_us(), Ordering::Relaxed);
6403 self.stats
6404 .num_zero_lamport_accounts_added
6405 .fetch_add(num_zero_lamport_accounts_added, Ordering::Relaxed);
6406 }
6407 }
6408
6409 fn report_store_timings(&self) {
6410 if self.stats.last_store_report.should_update(1000) {
6411 let read_cache_stats = self.read_only_accounts_cache.get_and_reset_stats();
6412 datapoint_info!(
6413 "accounts_db_store_timings",
6414 (
6415 "hash_accounts",
6416 self.stats.store_hash_accounts.swap(0, Ordering::Relaxed),
6417 i64
6418 ),
6419 (
6420 "store_accounts",
6421 self.stats.store_accounts.swap(0, Ordering::Relaxed),
6422 i64
6423 ),
6424 (
6425 "update_index",
6426 self.stats.store_update_index.swap(0, Ordering::Relaxed),
6427 i64
6428 ),
6429 (
6430 "handle_reclaims",
6431 self.stats.store_handle_reclaims.swap(0, Ordering::Relaxed),
6432 i64
6433 ),
6434 (
6435 "append_accounts",
6436 self.stats.store_append_accounts.swap(0, Ordering::Relaxed),
6437 i64
6438 ),
6439 (
6440 "stakes_cache_check_and_store_us",
6441 self.stats
6442 .stakes_cache_check_and_store_us
6443 .swap(0, Ordering::Relaxed),
6444 i64
6445 ),
6446 (
6447 "num_accounts",
6448 self.stats.store_num_accounts.swap(0, Ordering::Relaxed),
6449 i64
6450 ),
6451 (
6452 "total_data",
6453 self.stats.store_total_data.swap(0, Ordering::Relaxed),
6454 i64
6455 ),
6456 (
6457 "read_only_accounts_cache_entries",
6458 self.read_only_accounts_cache.cache_len(),
6459 i64
6460 ),
6461 (
6462 "read_only_accounts_cache_data_size",
6463 self.read_only_accounts_cache.data_size(),
6464 i64
6465 ),
6466 ("read_only_accounts_cache_hits", read_cache_stats.hits, i64),
6467 (
6468 "read_only_accounts_cache_misses",
6469 read_cache_stats.misses,
6470 i64
6471 ),
6472 (
6473 "read_only_accounts_cache_evicts",
6474 read_cache_stats.evicts,
6475 i64
6476 ),
6477 (
6478 "read_only_accounts_cache_load_us",
6479 read_cache_stats.load_us,
6480 i64
6481 ),
6482 (
6483 "read_only_accounts_cache_store_us",
6484 read_cache_stats.store_us,
6485 i64
6486 ),
6487 (
6488 "read_only_accounts_cache_evict_us",
6489 read_cache_stats.evict_us,
6490 i64
6491 ),
6492 (
6493 "read_only_accounts_cache_evictor_wakeup_count_all",
6494 read_cache_stats.evictor_wakeup_count_all,
6495 i64
6496 ),
6497 (
6498 "read_only_accounts_cache_evictor_wakeup_count_productive",
6499 read_cache_stats.evictor_wakeup_count_productive,
6500 i64
6501 ),
6502 (
6503 "handle_dead_keys_us",
6504 self.stats.handle_dead_keys_us.swap(0, Ordering::Relaxed),
6505 i64
6506 ),
6507 (
6508 "purge_exact_us",
6509 self.stats.purge_exact_us.swap(0, Ordering::Relaxed),
6510 i64
6511 ),
6512 (
6513 "purge_exact_count",
6514 self.stats.purge_exact_count.swap(0, Ordering::Relaxed),
6515 i64
6516 ),
6517 (
6518 "num_obsolete_slots_removed",
6519 self.stats
6520 .num_obsolete_slots_removed
6521 .swap(0, Ordering::Relaxed),
6522 i64
6523 ),
6524 (
6525 "num_obsolete_bytes_removed",
6526 self.stats
6527 .num_obsolete_bytes_removed
6528 .swap(0, Ordering::Relaxed),
6529 i64
6530 ),
6531 (
6532 "add_zero_lamport_accounts_us",
6533 self.stats
6534 .add_zero_lamport_accounts_us
6535 .swap(0, Ordering::Relaxed),
6536 i64
6537 ),
6538 (
6539 "num_zero_lamport_accounts_added",
6540 self.stats
6541 .num_zero_lamport_accounts_added
6542 .swap(0, Ordering::Relaxed),
6543 i64
6544 ),
6545 );
6546
6547 datapoint_info!(
6548 "accounts_db_store_timings2",
6549 (
6550 "create_store_count",
6551 self.stats.create_store_count.swap(0, Ordering::Relaxed),
6552 i64
6553 ),
6554 (
6555 "store_get_slot_store",
6556 self.stats.store_get_slot_store.swap(0, Ordering::Relaxed),
6557 i64
6558 ),
6559 (
6560 "store_find_existing",
6561 self.stats.store_find_existing.swap(0, Ordering::Relaxed),
6562 i64
6563 ),
6564 (
6565 "dropped_stores",
6566 self.stats.dropped_stores.swap(0, Ordering::Relaxed),
6567 i64
6568 ),
6569 );
6570 }
6571 }
6572
6573 pub fn add_root(&self, slot: Slot) -> AccountsAddRootTiming {
6574 let mut index_time = Measure::start("index_add_root");
6575 self.accounts_index.add_root(slot);
6576 index_time.stop();
6577 let mut cache_time = Measure::start("cache_add_root");
6578 self.accounts_cache.add_root(slot);
6579 cache_time.stop();
6580 let mut store_time = Measure::start("store_add_root");
6581 if let Some(store) = self
6585 .storage
6586 .get_slot_storage_entry_shrinking_in_progress_ok(slot)
6587 {
6588 self.dirty_stores.insert(slot, store);
6589 }
6590 store_time.stop();
6591
6592 AccountsAddRootTiming {
6593 index_us: index_time.as_us(),
6594 cache_us: cache_time.as_us(),
6595 store_us: store_time.as_us(),
6596 }
6597 }
6598
6599 pub fn get_storages(
6601 &self,
6602 requested_slots: impl RangeBounds<Slot> + Sync,
6603 ) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
6604 let start = Instant::now();
6605 let (slots, storages) = self
6606 .storage
6607 .get_if(|slot, storage| requested_slots.contains(slot) && storage.has_accounts())
6608 .into_vec()
6609 .into_iter()
6610 .unzip();
6611 let duration = start.elapsed();
6612 debug!("get_snapshot_storages: {duration:?}");
6613 (storages, slots)
6614 }
6615
6616 pub fn latest_full_snapshot_slot(&self) -> Option<Slot> {
6618 self.latest_full_snapshot_slot.read()
6619 }
6620
6621 pub fn set_latest_full_snapshot_slot(&self, slot: Slot) {
6623 *self.latest_full_snapshot_slot.lock_write() = Some(slot);
6624 }
6625
6626 fn generate_index_for_slot<'a>(
6627 &self,
6628 reader: &mut impl RequiredLenBufFileRead<'a>,
6629 storage: &'a AccountStorageEntry,
6630 slot: Slot,
6631 store_id: AccountsFileId,
6632 storage_info: &StorageSizeAndCountMap,
6633 ) -> SlotIndexGenerationInfo {
6634 if storage.accounts.get_account_data_lens(&[0]).is_empty() {
6635 return SlotIndexGenerationInfo::default();
6636 }
6637 let secondary = !self.account_indexes.is_empty();
6638
6639 let mut accounts_data_len = 0;
6640 let mut stored_size_alive = 0;
6641 let mut zero_lamport_pubkeys = vec![];
6642 let mut all_accounts_are_zero_lamports = true;
6643
6644 let (insert_time_us, generate_index_results) = {
6645 let mut keyed_account_infos = vec![];
6646 let mut itemizer = |info: IndexInfo| {
6648 stored_size_alive += info.stored_size_aligned;
6649 if info.index_info.lamports > 0 {
6650 accounts_data_len += info.index_info.data_len;
6651 all_accounts_are_zero_lamports = false;
6652 } else {
6653 zero_lamport_pubkeys.push(info.index_info.pubkey);
6655 }
6656 keyed_account_infos.push((
6657 info.index_info.pubkey,
6658 AccountInfo::new(
6659 StorageLocation::AppendVec(store_id, info.index_info.offset), info.index_info.is_zero_lamport(),
6661 ),
6662 ));
6663 };
6664
6665 if secondary {
6666 storage.accounts.scan_accounts(reader, |offset, account| {
6668 let data_len = account.data.len() as u64;
6669 let stored_size_aligned =
6670 storage.accounts.calculate_stored_size(data_len as usize);
6671 let info = IndexInfo {
6672 stored_size_aligned,
6673 index_info: IndexInfoInner {
6674 offset,
6675 pubkey: *account.pubkey,
6676 lamports: account.lamports,
6677 data_len,
6678 },
6679 };
6680 itemizer(info);
6681 self.accounts_index.update_secondary_indexes(
6682 account.pubkey,
6683 &account,
6684 &self.account_indexes,
6685 );
6686 })
6687 } else {
6688 storage
6690 .accounts
6691 .scan_accounts_without_data(|offset, account| {
6692 let data_len = account.data_len as u64;
6693 let stored_size_aligned =
6694 storage.accounts.calculate_stored_size(data_len as usize);
6695 let info = IndexInfo {
6696 stored_size_aligned,
6697 index_info: IndexInfoInner {
6698 offset,
6699 pubkey: *account.pubkey,
6700 lamports: account.lamports,
6701 data_len,
6702 },
6703 };
6704 itemizer(info);
6705 })
6706 }
6707 .expect("must scan accounts storage");
6708 self.accounts_index
6709 .insert_new_if_missing_into_primary_index(slot, keyed_account_infos)
6710 };
6711
6712 {
6713 let mut info = storage_info.entry(store_id).or_default();
6715 info.stored_size += stored_size_alive;
6716 info.count += generate_index_results.count;
6717
6718 assert!(
6721 info.stored_size <= u64_align!(storage.accounts.len()),
6722 "Stored size ({}) is larger than the size of the accounts file ({}) for store_id: \
6723 {}",
6724 info.stored_size,
6725 storage.accounts.len(),
6726 store_id
6727 );
6728 }
6729 if !zero_lamport_pubkeys.is_empty() {
6733 let old = self
6734 .uncleaned_pubkeys
6735 .insert(slot, zero_lamport_pubkeys.clone());
6736 assert!(old.is_none());
6737 }
6738 SlotIndexGenerationInfo {
6739 insert_time_us,
6740 num_accounts: generate_index_results.count as u64,
6741 accounts_data_len,
6742 zero_lamport_pubkeys,
6743 all_accounts_are_zero_lamports,
6744 num_did_not_exist: generate_index_results.num_did_not_exist,
6745 num_existed_in_mem: generate_index_results.num_existed_in_mem,
6746 num_existed_on_disk: generate_index_results.num_existed_on_disk,
6747 }
6748 }
6749
6750 pub fn generate_index(
6751 &self,
6752 limit_load_slot_count_from_snapshot: Option<usize>,
6753 verify: bool,
6754 ) -> IndexGenerationInfo {
6755 let mut total_time = Measure::start("generate_index");
6756
6757 let mut storages = self.storage.all_storages();
6758 storages.sort_unstable_by_key(|storage| storage.slot);
6759 if let Some(limit) = limit_load_slot_count_from_snapshot {
6760 storages.truncate(limit); }
6762 let accounts_data_len = AtomicU64::new(0);
6763
6764 let zero_lamport_pubkeys = Mutex::new(Vec::new());
6765 let mut outer_duplicates_lt_hash = None;
6766
6767 let passes = if verify { 2 } else { 1 };
6771 for pass in 0..passes {
6772 if pass == 0 {
6773 self.accounts_index
6774 .set_startup(Startup::StartupWithExtraThreads);
6775 }
6776 let storage_info = StorageSizeAndCountMap::default();
6777 let total_processed_slots_across_all_threads = AtomicU64::new(0);
6778 let outer_slots_len = storages.len();
6779 let threads = num_cpus::get();
6780 let chunk_size = (outer_slots_len / (std::cmp::max(1, threads.saturating_sub(1)))) + 1; let mut index_time = Measure::start("index");
6782 let insertion_time_us = AtomicU64::new(0);
6783 let total_including_duplicates = AtomicU64::new(0);
6784 let all_accounts_are_zero_lamports_slots = AtomicU64::new(0);
6785 let mut all_zeros_slots = Mutex::new(Vec::<(Slot, Arc<AccountStorageEntry>)>::new());
6786 let scan_time: u64 = storages
6787 .par_chunks(chunk_size)
6788 .map(|storages| {
6789 let mut reader = append_vec::new_scan_accounts_reader();
6790 let mut log_status = MultiThreadProgress::new(
6791 &total_processed_slots_across_all_threads,
6792 2,
6793 outer_slots_len as u64,
6794 );
6795 let mut scan_time_sum = 0;
6796 let mut all_accounts_are_zero_lamports_slots_inner = 0;
6797 let mut all_zeros_slots_inner = vec![];
6798 let mut local_zero_lamport_pubkeys = Vec::new();
6799 let mut insert_time_sum = 0;
6800 let mut total_including_duplicates_sum = 0;
6801 let mut accounts_data_len_sum = 0;
6802 let mut local_num_did_not_exist = 0;
6803 let mut local_num_existed_in_mem = 0;
6804 let mut local_num_existed_on_disk = 0;
6805 for (index, storage) in storages.iter().enumerate() {
6806 let mut scan_time = Measure::start("scan");
6807 log_status.report(index as u64);
6808 let store_id = storage.id();
6809 let slot = storage.slot();
6810
6811 scan_time.stop();
6812 scan_time_sum += scan_time.as_us();
6813
6814 let insert_us = if pass == 0 {
6815 self.maybe_throttle_index_generation();
6817 let SlotIndexGenerationInfo {
6818 insert_time_us: insert_us,
6819 num_accounts: total_this_slot,
6820 accounts_data_len: accounts_data_len_this_slot,
6821 zero_lamport_pubkeys: mut zero_lamport_pubkeys_this_slot,
6822 all_accounts_are_zero_lamports,
6823 num_did_not_exist,
6824 num_existed_in_mem,
6825 num_existed_on_disk,
6826 } = self.generate_index_for_slot(
6827 &mut reader,
6828 storage,
6829 slot,
6830 store_id,
6831 &storage_info,
6832 );
6833
6834 local_num_did_not_exist += num_did_not_exist;
6835 local_num_existed_in_mem += num_existed_in_mem;
6836 local_num_existed_on_disk += num_existed_on_disk;
6837 total_including_duplicates_sum += total_this_slot;
6838 accounts_data_len_sum += accounts_data_len_this_slot;
6839 if all_accounts_are_zero_lamports {
6840 all_accounts_are_zero_lamports_slots_inner += 1;
6841 all_zeros_slots_inner.push((slot, Arc::clone(storage)));
6842 }
6843 local_zero_lamport_pubkeys.append(&mut zero_lamport_pubkeys_this_slot);
6844
6845 insert_us
6846 } else {
6847 assert!(verify);
6849 let mut lookup_time = Measure::start("lookup_time");
6850 storage
6851 .accounts
6852 .scan_accounts_without_data(|offset, account| {
6853 let key = account.pubkey();
6854 let index_entry = self.accounts_index.get_cloned(key).unwrap();
6855 let slot_list = index_entry.slot_list.read().unwrap();
6856 let mut count = 0;
6857 for (slot2, account_info2) in slot_list.iter() {
6858 if *slot2 == slot {
6859 count += 1;
6860 let ai = AccountInfo::new(
6861 StorageLocation::AppendVec(store_id, offset), account.is_zero_lamport(),
6863 );
6864 assert_eq!(&ai, account_info2);
6865 }
6866 }
6867 assert_eq!(1, count);
6868 })
6869 .expect("must scan accounts storage");
6870 lookup_time.stop();
6871 lookup_time.as_us()
6872 };
6873 insert_time_sum += insert_us;
6874 }
6875
6876 if pass == 0 {
6877 let mut zero_lamport_pubkeys_lock = zero_lamport_pubkeys.lock().unwrap();
6878 zero_lamport_pubkeys_lock.reserve(local_zero_lamport_pubkeys.len());
6879 zero_lamport_pubkeys_lock.extend(local_zero_lamport_pubkeys.into_iter());
6880 drop(zero_lamport_pubkeys_lock);
6881
6882 let index_stats = self.accounts_index.bucket_map_holder_stats();
6885
6886 index_stats.inc_insert_count(local_num_did_not_exist);
6888 index_stats.add_mem_count(local_num_did_not_exist as usize);
6889
6890 index_stats
6892 .entries_from_mem
6893 .fetch_add(local_num_existed_in_mem, Ordering::Relaxed);
6894 index_stats
6895 .updates_in_mem
6896 .fetch_add(local_num_existed_in_mem, Ordering::Relaxed);
6897
6898 index_stats.add_mem_count(local_num_existed_on_disk as usize);
6900 index_stats
6901 .entries_missing
6902 .fetch_add(local_num_existed_on_disk, Ordering::Relaxed);
6903 index_stats
6904 .updates_in_mem
6905 .fetch_add(local_num_existed_on_disk, Ordering::Relaxed);
6906 }
6907
6908 all_accounts_are_zero_lamports_slots.fetch_add(
6909 all_accounts_are_zero_lamports_slots_inner,
6910 Ordering::Relaxed,
6911 );
6912 all_zeros_slots
6913 .lock()
6914 .unwrap()
6915 .append(&mut all_zeros_slots_inner);
6916 insertion_time_us.fetch_add(insert_time_sum, Ordering::Relaxed);
6917 total_including_duplicates
6918 .fetch_add(total_including_duplicates_sum, Ordering::Relaxed);
6919 accounts_data_len.fetch_add(accounts_data_len_sum, Ordering::Relaxed);
6920 scan_time_sum
6921 })
6922 .sum();
6923 index_time.stop();
6924
6925 let mut index_flush_us = 0;
6926 let total_duplicate_slot_keys = AtomicU64::default();
6927 let mut populate_duplicate_keys_us = 0;
6928 let total_num_unique_duplicate_keys = AtomicU64::default();
6929
6930 let unique_pubkeys_by_bin = Mutex::new(Vec::<Vec<Pubkey>>::default());
6933 if pass == 0 {
6934 let mut m = Measure::start("accounts_index_idle_us");
6936 self.accounts_index.set_startup(Startup::Normal);
6937 m.stop();
6938 index_flush_us = m.as_us();
6939
6940 populate_duplicate_keys_us = measure_us!({
6941 self.accounts_index
6944 .populate_and_retrieve_duplicate_keys_from_startup(|slot_keys| {
6945 total_duplicate_slot_keys
6946 .fetch_add(slot_keys.len() as u64, Ordering::Relaxed);
6947 let unique_keys =
6948 HashSet::<Pubkey>::from_iter(slot_keys.iter().map(|(_, key)| *key));
6949 for (slot, key) in slot_keys {
6950 self.uncleaned_pubkeys.entry(slot).or_default().push(key);
6951 }
6952 let unique_pubkeys_by_bin_inner =
6953 unique_keys.into_iter().collect::<Vec<_>>();
6954 total_num_unique_duplicate_keys.fetch_add(
6955 unique_pubkeys_by_bin_inner.len() as u64,
6956 Ordering::Relaxed,
6957 );
6958 unique_pubkeys_by_bin
6960 .lock()
6961 .unwrap()
6962 .push(unique_pubkeys_by_bin_inner);
6963 });
6964 })
6965 .1;
6966 }
6967 let unique_pubkeys_by_bin = unique_pubkeys_by_bin.into_inner().unwrap();
6968
6969 let mut timings = GenerateIndexTimings {
6970 index_flush_us,
6971 scan_time,
6972 index_time: index_time.as_us(),
6973 insertion_time_us: insertion_time_us.load(Ordering::Relaxed),
6974 total_duplicate_slot_keys: total_duplicate_slot_keys.load(Ordering::Relaxed),
6975 total_num_unique_duplicate_keys: total_num_unique_duplicate_keys
6976 .load(Ordering::Relaxed),
6977 populate_duplicate_keys_us,
6978 total_including_duplicates: total_including_duplicates.load(Ordering::Relaxed),
6979 total_slots: storages.len() as u64,
6980 all_accounts_are_zero_lamports_slots: all_accounts_are_zero_lamports_slots
6981 .load(Ordering::Relaxed),
6982 ..GenerateIndexTimings::default()
6983 };
6984
6985 if pass == 0 {
6986 #[derive(Debug, Default)]
6987 struct DuplicatePubkeysVisitedInfo {
6988 accounts_data_len_from_duplicates: u64,
6989 num_duplicate_accounts: u64,
6990 duplicates_lt_hash: Option<Box<DuplicatesLtHash>>,
6991 }
6992 impl DuplicatePubkeysVisitedInfo {
6993 fn reduce(mut self, other: Self) -> Self {
6994 self.accounts_data_len_from_duplicates +=
6995 other.accounts_data_len_from_duplicates;
6996 self.num_duplicate_accounts += other.num_duplicate_accounts;
6997
6998 match (
6999 self.duplicates_lt_hash.is_some(),
7000 other.duplicates_lt_hash.is_some(),
7001 ) {
7002 (true, true) => {
7003 self.duplicates_lt_hash
7005 .as_mut()
7006 .unwrap()
7007 .0
7008 .mix_in(&other.duplicates_lt_hash.as_ref().unwrap().0);
7009 }
7010 (true, false) => {
7011 }
7013 (false, true) => {
7014 self.duplicates_lt_hash = other.duplicates_lt_hash;
7016 }
7017 (false, false) => {
7018 }
7020 }
7021 self
7022 }
7023 }
7024
7025 let zero_lamport_pubkeys_to_visit =
7026 std::mem::take(&mut *zero_lamport_pubkeys.lock().unwrap());
7027 let (num_zero_lamport_single_refs, visit_zero_lamports_us) =
7028 measure_us!(self
7029 .visit_zero_lamport_pubkeys_during_startup(zero_lamport_pubkeys_to_visit));
7030 timings.visit_zero_lamports_us = visit_zero_lamports_us;
7031 timings.num_zero_lamport_single_refs = num_zero_lamport_single_refs;
7032
7033 let mut accounts_data_len_dedup_timer =
7035 Measure::start("handle accounts data len duplicates");
7036 let DuplicatePubkeysVisitedInfo {
7037 accounts_data_len_from_duplicates,
7038 num_duplicate_accounts,
7039 duplicates_lt_hash,
7040 } = unique_pubkeys_by_bin
7041 .par_iter()
7042 .fold(
7043 DuplicatePubkeysVisitedInfo::default,
7044 |accum, pubkeys_by_bin| {
7045 let intermediate = pubkeys_by_bin
7046 .par_chunks(4096)
7047 .fold(DuplicatePubkeysVisitedInfo::default, |accum, pubkeys| {
7048 let (
7049 accounts_data_len_from_duplicates,
7050 accounts_duplicates_num,
7051 duplicates_lt_hash,
7052 ) = self
7053 .visit_duplicate_pubkeys_during_startup(pubkeys, &timings);
7054 let intermediate = DuplicatePubkeysVisitedInfo {
7055 accounts_data_len_from_duplicates,
7056 num_duplicate_accounts: accounts_duplicates_num,
7057 duplicates_lt_hash,
7058 };
7059 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
7060 })
7061 .reduce(
7062 DuplicatePubkeysVisitedInfo::default,
7063 DuplicatePubkeysVisitedInfo::reduce,
7064 );
7065 DuplicatePubkeysVisitedInfo::reduce(accum, intermediate)
7066 },
7067 )
7068 .reduce(
7069 DuplicatePubkeysVisitedInfo::default,
7070 DuplicatePubkeysVisitedInfo::reduce,
7071 );
7072 accounts_data_len_dedup_timer.stop();
7073 timings.accounts_data_len_dedup_time_us = accounts_data_len_dedup_timer.as_us();
7074 timings.num_duplicate_accounts = num_duplicate_accounts;
7075
7076 accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);
7077 if let Some(duplicates_lt_hash) = duplicates_lt_hash {
7078 let old_val = outer_duplicates_lt_hash.replace(duplicates_lt_hash);
7079 assert!(old_val.is_none());
7080 }
7081 info!(
7082 "accounts data len: {}",
7083 accounts_data_len.load(Ordering::Relaxed)
7084 );
7085
7086 let all_zero_slots_to_clean = std::mem::take(all_zeros_slots.get_mut().unwrap());
7088 info!(
7089 "insert all zero slots to clean at startup {}",
7090 all_zero_slots_to_clean.len()
7091 );
7092 for (slot, storage) in all_zero_slots_to_clean {
7093 self.dirty_stores.insert(slot, storage);
7094 }
7095 }
7096
7097 if pass == 0 {
7098 for storage in &storages {
7100 self.accounts_index.add_root(storage.slot());
7101 }
7102
7103 self.set_storage_count_and_alive_bytes(storage_info, &mut timings);
7104
7105 if self.mark_obsolete_accounts {
7106 let mut mark_obsolete_accounts_time =
7107 Measure::start("mark_obsolete_accounts_time");
7108 let slot_marked_obsolete = storages.last().unwrap().slot();
7114 let obsolete_account_stats = self.mark_obsolete_accounts_at_startup(
7115 slot_marked_obsolete,
7116 unique_pubkeys_by_bin,
7117 );
7118
7119 mark_obsolete_accounts_time.stop();
7120 timings.mark_obsolete_accounts_us = mark_obsolete_accounts_time.as_us();
7121 timings.num_obsolete_accounts_marked =
7122 obsolete_account_stats.accounts_marked_obsolete;
7123 timings.num_slots_removed_as_obsolete = obsolete_account_stats.slots_removed;
7124 }
7125 }
7126 total_time.stop();
7127 timings.total_time_us = total_time.as_us();
7128 timings.report(self.accounts_index.get_startup_stats());
7129 }
7130
7131 self.accounts_index.log_secondary_indexes();
7132
7133 if outer_duplicates_lt_hash.is_none() {
7139 outer_duplicates_lt_hash = Some(Box::new(DuplicatesLtHash::default()));
7140 }
7141
7142 IndexGenerationInfo {
7143 accounts_data_len: accounts_data_len.load(Ordering::Relaxed),
7144 duplicates_lt_hash: outer_duplicates_lt_hash,
7145 }
7146 }
7147
7148 fn mark_obsolete_accounts_at_startup(
7151 &self,
7152 slot_marked_obsolete: Slot,
7153 pubkeys_with_duplicates_by_bin: Vec<Vec<Pubkey>>,
7154 ) -> ObsoleteAccountsStats {
7155 let stats: ObsoleteAccountsStats = pubkeys_with_duplicates_by_bin
7156 .par_iter()
7157 .map(|pubkeys_by_bin| {
7158 let reclaims = self.accounts_index.clean_and_unref_rooted_entries_by_bin(
7159 pubkeys_by_bin,
7160 |slot, account_info| {
7161 if account_info.is_zero_lamport() {
7164 self.zero_lamport_single_ref_found(slot, account_info.offset());
7165 }
7166 },
7167 );
7168 let stats = PurgeStats::default();
7169
7170 self.handle_reclaims(
7172 (!reclaims.is_empty()).then(|| reclaims.iter()),
7173 None,
7174 &HashSet::new(),
7175 HandleReclaims::ProcessDeadSlots(&stats),
7176 MarkAccountsObsolete::Yes(slot_marked_obsolete),
7177 );
7178 ObsoleteAccountsStats {
7179 accounts_marked_obsolete: reclaims.len() as u64,
7180 slots_removed: stats.total_removed_storage_entries.load(Ordering::Relaxed)
7181 as u64,
7182 }
7183 })
7184 .sum();
7185 stats
7186 }
7187
7188 fn maybe_throttle_index_generation(&self) {
7191 if !self.accounts_index.is_disk_index_enabled() {
7193 return;
7194 }
7195 const LIMIT: usize = 10_000_000;
7203 while self
7204 .accounts_index
7205 .get_startup_remaining_items_to_flush_estimate()
7206 > LIMIT
7207 {
7208 sleep(Duration::from_millis(10));
7211 }
7212 }
7213
7214 fn visit_zero_lamport_pubkeys_during_startup(&self, mut pubkeys: Vec<Pubkey>) -> u64 {
7218 let mut slot_offsets = HashMap::<_, Vec<_>>::default();
7219 let orig_len = pubkeys.len();
7223 pubkeys.sort_unstable();
7224 pubkeys.dedup();
7225 let uniq_len = pubkeys.len();
7226 info!(
7227 "visit_zero_lamport_pubkeys_during_startup: {orig_len} pubkeys, {uniq_len} after dedup",
7228 );
7229
7230 self.accounts_index.scan(
7231 pubkeys.iter(),
7232 |_pubkey, slots_refs, _entry| {
7233 let (slot_list, ref_count) = slots_refs.unwrap();
7234 if ref_count == 1 {
7235 assert_eq!(slot_list.len(), 1);
7236 let (slot_alive, account_info) = slot_list.first().unwrap();
7237 assert!(!account_info.is_cached());
7238 if account_info.is_zero_lamport() {
7239 slot_offsets
7240 .entry(*slot_alive)
7241 .or_default()
7242 .push(account_info.offset());
7243 }
7244 }
7245 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
7246 },
7247 None,
7248 false,
7249 ScanFilter::All,
7250 );
7251
7252 let mut count = 0;
7253 let mut dead_stores = 0;
7254 let mut shrink_stores = 0;
7255 let mut non_shrink_stores = 0;
7256 for (slot, offsets) in slot_offsets {
7257 if let Some(store) = self.storage.get_slot_storage_entry(slot) {
7258 count += store.batch_insert_zero_lamport_single_ref_account_offsets(&offsets);
7259 if store.num_zero_lamport_single_ref_accounts() == store.count() {
7260 self.dirty_stores.entry(slot).or_insert(store);
7262 dead_stores += 1;
7263 } else if Self::is_shrinking_productive(&store)
7264 && self.is_candidate_for_shrink(&store)
7265 {
7266 if self.shrink_candidate_slots.lock().unwrap().insert(slot) {
7268 shrink_stores += 1;
7269 }
7270 } else {
7271 non_shrink_stores += 1;
7272 }
7273 }
7274 }
7275 self.shrink_stats
7276 .num_zero_lamport_single_ref_accounts_found
7277 .fetch_add(count, Ordering::Relaxed);
7278
7279 self.shrink_stats
7280 .num_dead_slots_added_to_clean
7281 .fetch_add(dead_stores, Ordering::Relaxed);
7282
7283 self.shrink_stats
7284 .num_slots_with_zero_lamport_accounts_added_to_shrink
7285 .fetch_add(shrink_stores, Ordering::Relaxed);
7286
7287 self.shrink_stats
7288 .marking_zero_dead_accounts_in_non_shrinkable_store
7289 .fetch_add(non_shrink_stores, Ordering::Relaxed);
7290
7291 count
7292 }
7293
7294 fn visit_duplicate_pubkeys_during_startup(
7306 &self,
7307 pubkeys: &[Pubkey],
7308 timings: &GenerateIndexTimings,
7309 ) -> (u64, u64, Option<Box<DuplicatesLtHash>>) {
7310 let mut accounts_data_len_from_duplicates = 0;
7311 let mut num_duplicate_accounts = 0_u64;
7312 let mut duplicates_lt_hash =
7315 (!self.mark_obsolete_accounts).then(|| Box::new(DuplicatesLtHash::default()));
7316 let mut lt_hash_time = Duration::default();
7317 self.accounts_index.scan(
7318 pubkeys.iter(),
7319 |pubkey, slots_refs, _entry| {
7320 if let Some((slot_list, _ref_count)) = slots_refs {
7321 if slot_list.len() > 1 {
7322 let max = slot_list.iter().map(|(slot, _)| slot).max().unwrap();
7328 slot_list.iter().for_each(|(slot, account_info)| {
7329 if slot == max {
7330 return;
7332 }
7333 let maybe_storage_entry = self
7334 .storage
7335 .get_account_storage_entry(*slot, account_info.store_id());
7336 let mut accessor = LoadedAccountAccessor::Stored(
7337 maybe_storage_entry.map(|entry| (entry, account_info.offset())),
7338 );
7339 accessor.check_and_get_loaded_account(|loaded_account| {
7340 let data_len = loaded_account.data_len();
7341 if loaded_account.lamports() > 0 {
7342 accounts_data_len_from_duplicates += data_len;
7343 }
7344 num_duplicate_accounts += 1;
7345 if let Some(duplicates_lt_hash) = duplicates_lt_hash.as_mut() {
7346 let (_, duration) = meas_dur!({
7347 let account_lt_hash =
7348 Self::lt_hash_account(&loaded_account, pubkey);
7349 duplicates_lt_hash.0.mix_in(&account_lt_hash.0);
7350 });
7351 lt_hash_time += duration;
7352 }
7353 });
7354 });
7355 }
7356 }
7357 AccountsIndexScanResult::OnlyKeepInMemoryIfDirty
7358 },
7359 None,
7360 false,
7361 ScanFilter::All,
7362 );
7363 timings
7364 .par_duplicates_lt_hash_us
7365 .fetch_add(lt_hash_time.as_micros() as u64, Ordering::Relaxed);
7366 (
7367 accounts_data_len_from_duplicates as u64,
7368 num_duplicate_accounts,
7369 duplicates_lt_hash,
7370 )
7371 }
7372
7373 fn set_storage_count_and_alive_bytes(
7374 &self,
7375 stored_sizes_and_counts: StorageSizeAndCountMap,
7376 timings: &mut GenerateIndexTimings,
7377 ) {
7378 let mut storage_size_storages_time = Measure::start("storage_size_storages");
7380 for (_slot, store) in self.storage.iter() {
7381 let id = store.id();
7382 assert_eq!(store.alive_bytes(), 0);
7384 if let Some(entry) = stored_sizes_and_counts.get(&id) {
7385 trace!(
7386 "id: {} setting count: {} cur: {}",
7387 id,
7388 entry.count,
7389 store.count(),
7390 );
7391 {
7392 let mut count_and_status = store.count_and_status.lock_write();
7393 assert_eq!(count_and_status.0, 0);
7394 count_and_status.0 = entry.count;
7395 }
7396 store
7397 .alive_bytes
7398 .store(entry.stored_size, Ordering::Release);
7399 } else {
7400 trace!("id: {id} clearing count");
7401 store.count_and_status.lock_write().0 = 0;
7402 }
7403 }
7404 storage_size_storages_time.stop();
7405 timings.storage_size_storages_us = storage_size_storages_time.as_us();
7406 }
7407
7408 pub fn print_accounts_stats(&self, label: &str) {
7409 self.print_index(label);
7410 self.print_count_and_status(label);
7411 }
7412
7413 fn print_index(&self, label: &str) {
7414 let mut alive_roots: Vec<_> = self.accounts_index.all_alive_roots();
7415 #[allow(clippy::stable_sort_primitive)]
7416 alive_roots.sort();
7417 info!("{label}: accounts_index alive_roots: {alive_roots:?}");
7418 self.accounts_index.account_maps.iter().for_each(|map| {
7419 for pubkey in map.keys() {
7420 self.accounts_index.get_and_then(&pubkey, |account_entry| {
7421 if let Some(account_entry) = account_entry {
7422 let list_r = &account_entry.slot_list.read().unwrap();
7423 info!(" key: {} ref_count: {}", pubkey, account_entry.ref_count(),);
7424 info!(" slots: {list_r:?}");
7425 }
7426 let add_to_in_mem_cache = false;
7427 (add_to_in_mem_cache, ())
7428 });
7429 }
7430 });
7431 }
7432
7433 pub fn print_count_and_status(&self, label: &str) {
7434 let mut slots: Vec<_> = self.storage.all_slots();
7435 #[allow(clippy::stable_sort_primitive)]
7436 slots.sort();
7437 info!("{}: count_and status for {} slots:", label, slots.len());
7438 for slot in &slots {
7439 let entry = self.storage.get_slot_storage_entry(*slot).unwrap();
7440 info!(
7441 " slot: {} id: {} count_and_status: {:?} len: {} capacity: {}",
7442 slot,
7443 entry.id(),
7444 entry.count_and_status.read(),
7445 entry.accounts.len(),
7446 entry.accounts.capacity(),
7447 );
7448 }
7449 }
7450}
7451
7452#[derive(Debug, Copy, Clone)]
7453enum HandleReclaims<'a> {
7454 ProcessDeadSlots(&'a PurgeStats),
7455}
7456
7457#[derive(Debug, Copy, Clone, PartialEq, Eq)]
7461enum MarkAccountsObsolete {
7462 Yes(Slot),
7463 No,
7464}
7465
7466pub enum UpdateIndexThreadSelection {
7467 Inline,
7469 PoolWithThreshold,
7471}
7472
7473#[cfg(feature = "dev-context-only-utils")]
7475impl AccountStorageEntry {
7476 fn accounts_count(&self) -> usize {
7477 let mut count = 0;
7478 self.accounts
7479 .scan_pubkeys(|_| {
7480 count += 1;
7481 })
7482 .expect("must scan accounts storage");
7483 count
7484 }
7485}
7486
7487#[cfg(feature = "dev-context-only-utils")]
7489impl AccountsDb {
7490 pub fn get_len_of_slots_with_uncleaned_pubkeys(&self) -> usize {
7493 self.uncleaned_pubkeys.len()
7494 }
7495
7496 pub fn add_root_and_flush_write_cache(&self, slot: Slot) {
7499 self.add_root(slot);
7500 self.flush_root_write_cache(slot);
7501 }
7502
7503 pub fn load_without_fixed_root(
7504 &self,
7505 ancestors: &Ancestors,
7506 pubkey: &Pubkey,
7507 ) -> Option<(AccountSharedData, Slot)> {
7508 self.do_load(
7509 ancestors,
7510 pubkey,
7511 None,
7512 LoadHint::Unspecified,
7513 LoadZeroLamports::SomeWithZeroLamportAccountForTests,
7515 )
7516 }
7517
7518 pub fn assert_load_account(&self, slot: Slot, pubkey: Pubkey, expected_lamports: u64) {
7519 let ancestors = vec![(slot, 0)].into_iter().collect();
7520 let (account, slot) = self.load_without_fixed_root(&ancestors, &pubkey).unwrap();
7521 assert_eq!((account.lamports(), slot), (expected_lamports, slot));
7522 }
7523
7524 pub fn assert_not_load_account(&self, slot: Slot, pubkey: Pubkey) {
7525 let ancestors = vec![(slot, 0)].into_iter().collect();
7526 let load = self.load_without_fixed_root(&ancestors, &pubkey);
7527 assert!(load.is_none(), "{load:?}");
7528 }
7529
7530 pub fn check_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7531 let ancestors = vec![(slot, 0)].into_iter().collect();
7532 for _ in 0..num {
7533 let idx = thread_rng().gen_range(0..num);
7534 let account = self.load_without_fixed_root(&ancestors, &pubkeys[idx]);
7535 let account1 = Some((
7536 AccountSharedData::new(
7537 (idx + count) as u64,
7538 0,
7539 AccountSharedData::default().owner(),
7540 ),
7541 slot,
7542 ));
7543 assert_eq!(account, account1);
7544 }
7545 }
7546
7547 pub fn scan_accounts_from_storages(
7553 storages: &[Arc<AccountStorageEntry>],
7554 mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>),
7555 ) {
7556 let mut reader = append_vec::new_scan_accounts_reader();
7557 for storage in storages {
7558 storage
7559 .accounts
7560 .scan_accounts(&mut reader, &mut callback)
7561 .expect("must scan accounts storage");
7562 }
7563 }
7564
7565 pub fn store_for_tests<'a>(&self, accounts: impl StorableAccounts<'a>) {
7567 self.store_accounts_unfrozen(
7568 accounts,
7569 None,
7570 UpdateIndexThreadSelection::PoolWithThreshold,
7571 );
7572 }
7573
7574 #[allow(clippy::needless_range_loop)]
7575 pub fn modify_accounts(&self, pubkeys: &[Pubkey], slot: Slot, num: usize, count: usize) {
7576 for idx in 0..num {
7577 let account = AccountSharedData::new(
7578 (idx + count) as u64,
7579 0,
7580 AccountSharedData::default().owner(),
7581 );
7582 self.store_for_tests((slot, [(&pubkeys[idx], &account)].as_slice()));
7583 }
7584 }
7585
7586 pub fn check_storage(&self, slot: Slot, alive_count: usize, total_count: usize) {
7587 let store = self.storage.get_slot_storage_entry(slot).unwrap();
7588 assert_eq!(store.status(), AccountStorageStatus::Available);
7589 assert_eq!(store.count(), alive_count);
7590 assert_eq!(store.accounts_count(), total_count);
7591 }
7592
7593 pub fn create_account(
7594 &self,
7595 pubkeys: &mut Vec<Pubkey>,
7596 slot: Slot,
7597 num: usize,
7598 space: usize,
7599 num_vote: usize,
7600 ) {
7601 let ancestors = vec![(slot, 0)].into_iter().collect();
7602 for t in 0..num {
7603 let pubkey = solana_pubkey::new_rand();
7604 let account =
7605 AccountSharedData::new((t + 1) as u64, space, AccountSharedData::default().owner());
7606 pubkeys.push(pubkey);
7607 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7608 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7609 }
7610 for t in 0..num_vote {
7611 let pubkey = solana_pubkey::new_rand();
7612 let account =
7613 AccountSharedData::new((num + t + 1) as u64, space, &solana_vote_program::id());
7614 pubkeys.push(pubkey);
7615 let ancestors = vec![(slot, 0)].into_iter().collect();
7616 assert!(self.load_without_fixed_root(&ancestors, &pubkey).is_none());
7617 self.store_for_tests((slot, [(&pubkey, &account)].as_slice()));
7618 }
7619 }
7620
7621 pub fn sizes_of_accounts_in_storage_for_tests(&self, slot: Slot) -> Vec<usize> {
7622 let mut sizes = Vec::default();
7623 if let Some(storage) = self.storage.get_slot_storage_entry(slot) {
7624 storage
7625 .accounts
7626 .scan_accounts_stored_meta(|account| {
7627 sizes.push(account.stored_size());
7628 })
7629 .expect("must scan accounts storage");
7630 }
7631 sizes
7632 }
7633
7634 pub fn ref_count_for_pubkey(&self, pubkey: &Pubkey) -> RefCount {
7635 self.accounts_index.ref_count_from_storage(pubkey)
7636 }
7637
7638 pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize {
7639 self.storage
7640 .get_slot_storage_entry(slot)
7641 .map(|storage| storage.count())
7642 .unwrap_or(0)
7643 .saturating_add(
7644 self.accounts_cache
7645 .slot_cache(slot)
7646 .map(|slot_cache| slot_cache.len())
7647 .unwrap_or_default(),
7648 )
7649 }
7650
7651 pub fn flush_root_write_cache(&self, root: Slot) {
7654 assert!(
7655 self.accounts_index
7656 .roots_tracker
7657 .read()
7658 .unwrap()
7659 .alive_roots
7660 .contains(&root),
7661 "slot: {root}"
7662 );
7663 self.flush_accounts_cache(true, Some(root));
7664 }
7665
7666 pub fn all_account_count_in_accounts_file(&self, slot: Slot) -> usize {
7667 let store = self.storage.get_slot_storage_entry(slot);
7668 if let Some(store) = store {
7669 store.accounts_count()
7670 } else {
7671 0
7672 }
7673 }
7674
7675 pub fn uncleaned_pubkeys(&self) -> &DashMap<Slot, Vec<Pubkey>, BuildNoHashHasher<Slot>> {
7676 &self.uncleaned_pubkeys
7677 }
7678}