1use {
4 crate::accounts_db::{AccountStorageEntry, AccountsFileId},
5 dashmap::DashMap,
6 rand::seq::SliceRandom,
7 rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator},
8 solana_clock::Slot,
9 solana_nohash_hasher::IntMap,
10 std::{
11 ops::{Index, Range},
12 sync::{
13 atomic::{AtomicUsize, Ordering},
14 Arc, RwLock,
15 },
16 },
17};
18
19pub mod stored_account_info;
20
21pub type AccountStorageMap = DashMap<Slot, Arc<AccountStorageEntry>>;
22
23#[derive(Default, Debug)]
24pub struct AccountStorage {
25 map: AccountStorageMap,
27 shrink_in_progress_map: RwLock<IntMap<Slot, Arc<AccountStorageEntry>>>,
31}
32
33impl AccountStorage {
34 pub(crate) fn get_account_storage_entry(
53 &self,
54 slot: Slot,
55 store_id: AccountsFileId,
56 ) -> Option<Arc<AccountStorageEntry>> {
57 let lookup_in_map = || {
58 self.map.get(&slot).and_then(|entry| {
59 (entry.value().id() == store_id).then_some(Arc::clone(entry.value()))
60 })
61 };
62
63 lookup_in_map()
64 .or_else(|| {
65 self.shrink_in_progress_map
66 .read()
67 .unwrap()
68 .get(&slot)
69 .and_then(|entry| (entry.id() == store_id).then(|| Arc::clone(entry)))
70 })
71 .or_else(lookup_in_map)
72 }
73
74 pub(crate) fn no_shrink_in_progress(&self) -> bool {
76 self.shrink_in_progress_map.read().unwrap().is_empty()
77 }
78
79 pub fn get_slot_storage_entry(&self, slot: Slot) -> Option<Arc<AccountStorageEntry>> {
82 assert!(
83 self.no_shrink_in_progress(),
84 "self.no_shrink_in_progress(): {slot}"
85 );
86 self.get_slot_storage_entry_shrinking_in_progress_ok(slot)
87 }
88
89 pub(super) fn all_storages(&self) -> Vec<Arc<AccountStorageEntry>> {
90 assert!(self.no_shrink_in_progress());
91 self.map
92 .iter()
93 .map(|item| Arc::clone(item.value()))
94 .collect()
95 }
96
97 pub(crate) fn replace_storage_with_equivalent(
98 &self,
99 slot: Slot,
100 storage: Arc<AccountStorageEntry>,
101 ) {
102 assert_eq!(storage.slot(), slot);
103 if let Some(mut existing_storage) = self.map.get_mut(&slot) {
104 assert_eq!(slot, existing_storage.value().slot());
105 *existing_storage.value_mut() = storage;
106 }
107 }
108
109 pub(crate) fn get_slot_storage_entry_shrinking_in_progress_ok(
111 &self,
112 slot: Slot,
113 ) -> Option<Arc<AccountStorageEntry>> {
114 self.map.get(&slot).map(|entry| Arc::clone(entry.value()))
115 }
116
117 pub(crate) fn all_slots(&self) -> Vec<Slot> {
118 assert!(self.no_shrink_in_progress());
119 self.map.iter().map(|iter_item| *iter_item.key()).collect()
120 }
121
122 #[cfg(test)]
124 pub(crate) fn is_empty_entry(&self, slot: Slot) -> bool {
125 assert!(
126 self.no_shrink_in_progress(),
127 "self.no_shrink_in_progress(): {slot}"
128 );
129 self.map.get(&slot).is_none()
130 }
131
132 pub fn initialize(&mut self, all_storages: AccountStorageMap) {
134 assert!(self.map.is_empty());
135 assert!(self.no_shrink_in_progress());
136 self.map = all_storages;
137 }
138
139 pub(crate) fn remove(
142 &self,
143 slot: &Slot,
144 shrink_can_be_active: bool,
145 ) -> Option<Arc<AccountStorageEntry>> {
146 assert!(shrink_can_be_active || self.shrink_in_progress_map.read().unwrap().is_empty());
147 self.map.remove(slot).map(|(_, storage)| storage)
148 }
149
150 pub(crate) fn iter(&self) -> AccountStorageIter<'_> {
152 assert!(self.no_shrink_in_progress());
153 AccountStorageIter::new(self)
154 }
155
156 pub(crate) fn insert(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
157 assert!(
158 self.no_shrink_in_progress(),
159 "self.no_shrink_in_progress(): {slot}"
160 );
161 assert!(self.map.insert(slot, store).is_none());
162 }
163
164 pub(crate) fn shrinking_in_progress(
171 &self,
172 slot: Slot,
173 new_store: Arc<AccountStorageEntry>,
174 ) -> ShrinkInProgress<'_> {
175 let shrinking_store = Arc::clone(
176 self.map
177 .get(&slot)
178 .expect("no pre-existing storage for shrinking slot")
179 .value(),
180 );
181
182 assert!(
184 self.shrink_in_progress_map
185 .write()
186 .unwrap()
187 .insert(slot, Arc::clone(&new_store))
188 .is_none(),
189 "duplicate call"
190 );
191
192 ShrinkInProgress {
193 storage: self,
194 slot,
195 new_store,
196 old_store: shrinking_store,
197 }
198 }
199
200 #[cfg(test)]
201 pub(crate) fn len(&self) -> usize {
202 self.map.len()
203 }
204
205 pub fn get_if(
214 &self,
215 predicate: impl Fn(&Slot, &AccountStorageEntry) -> bool,
216 ) -> Box<[(Slot, Arc<AccountStorageEntry>)]> {
217 assert!(self.no_shrink_in_progress());
218 self.map
219 .iter()
220 .filter_map(|entry| {
221 let slot = entry.key();
222 let storage = entry.value();
223 predicate(slot, storage).then(|| (*slot, Arc::clone(storage)))
224 })
225 .collect()
226 }
227}
228
229pub struct AccountStorageIter<'a> {
231 iter: dashmap::iter::Iter<'a, Slot, Arc<AccountStorageEntry>>,
232}
233
234impl<'a> AccountStorageIter<'a> {
235 pub fn new(storage: &'a AccountStorage) -> Self {
236 Self {
237 iter: storage.map.iter(),
238 }
239 }
240}
241
242impl Iterator for AccountStorageIter<'_> {
243 type Item = (Slot, Arc<AccountStorageEntry>);
244
245 fn next(&mut self) -> Option<Self::Item> {
246 if let Some(entry) = self.iter.next() {
247 let slot = entry.key();
248 let store = entry.value();
249 return Some((*slot, Arc::clone(store)));
250 }
251 None
252 }
253}
254
255#[derive(Debug)]
258pub struct ShrinkInProgress<'a> {
259 storage: &'a AccountStorage,
260 old_store: Arc<AccountStorageEntry>,
262 new_store: Arc<AccountStorageEntry>,
264 slot: Slot,
265}
266
267impl Drop for ShrinkInProgress<'_> {
269 fn drop(&mut self) {
270 assert_eq!(
271 self.storage
272 .map
273 .insert(self.slot, Arc::clone(&self.new_store))
274 .map(|store| store.id()),
275 Some(self.old_store.id())
276 );
277
278 assert!(self
280 .storage
281 .shrink_in_progress_map
282 .write()
283 .unwrap()
284 .remove(&self.slot)
285 .is_some());
286 }
287}
288
289impl ShrinkInProgress<'_> {
290 pub fn new_storage(&self) -> &Arc<AccountStorageEntry> {
291 &self.new_store
292 }
293 pub(crate) fn old_storage(&self) -> &Arc<AccountStorageEntry> {
294 &self.old_store
295 }
296}
297
298pub struct AccountStoragesOrderer<'a> {
304 storages: &'a [Arc<AccountStorageEntry>],
305 indices: Box<[usize]>,
306}
307
308impl<'a> AccountStoragesOrderer<'a> {
309 pub fn with_small_to_large_ratio(
314 storages: &'a [Arc<AccountStorageEntry>],
315 small_to_large_ratio: (usize, usize),
316 ) -> Self {
317 let len_range = 0..storages.len();
318 let mut indices: Vec<_> = len_range.clone().collect();
319 indices.sort_unstable_by_key(|i| storages[*i].capacity());
320 indices.iter_mut().for_each(|i| {
321 *i = select_from_range_with_start_end_rates(len_range.clone(), *i, small_to_large_ratio)
322 });
323 Self {
324 storages,
325 indices: indices.into_boxed_slice(),
326 }
327 }
328
329 pub fn with_random_order(storages: &'a [Arc<AccountStorageEntry>]) -> Self {
331 let mut indices: Vec<usize> = (0..storages.len()).collect();
332 indices.shuffle(&mut rand::thread_rng());
333 Self {
334 storages,
335 indices: indices.into_boxed_slice(),
336 }
337 }
338
339 pub fn entries_len(&self) -> usize {
340 self.indices.len()
341 }
342
343 pub fn original_index(&'a self, position: usize) -> usize {
349 self.indices[position]
350 }
351
352 pub fn iter(&'a self) -> impl ExactSizeIterator<Item = &'a AccountStorageEntry> + 'a {
353 self.indices.iter().map(|i| self.storages[*i].as_ref())
354 }
355
356 pub fn par_iter(&'a self) -> impl IndexedParallelIterator<Item = &'a AccountStorageEntry> + 'a {
357 self.indices.par_iter().map(|i| self.storages[*i].as_ref())
358 }
359
360 pub fn into_concurrent_consumer(self) -> AccountStoragesConcurrentConsumer<'a> {
361 AccountStoragesConcurrentConsumer::new(self)
362 }
363}
364
365impl Index<usize> for AccountStoragesOrderer<'_> {
366 type Output = AccountStorageEntry;
367
368 fn index(&self, position: usize) -> &Self::Output {
369 let original_index = self.original_index(position);
371 self.storages[original_index].as_ref()
373 }
374}
375
376pub struct AccountStoragesConcurrentConsumer<'a> {
384 orderer: AccountStoragesOrderer<'a>,
385 current_position: AtomicUsize,
386}
387
388impl<'a> AccountStoragesConcurrentConsumer<'a> {
389 pub fn new(orderer: AccountStoragesOrderer<'a>) -> Self {
390 Self {
391 orderer,
392 current_position: AtomicUsize::new(0),
393 }
394 }
395
396 pub fn next(&'a self) -> Option<NextItem<'a>> {
399 let position = self.current_position.fetch_add(1, Ordering::Relaxed);
400 if position < self.orderer.entries_len() {
401 let original_index = self.orderer.original_index(position);
403 let storage = &self.orderer[position];
404 Some(NextItem {
405 position,
406 original_index,
407 storage,
408 })
409 } else {
410 None
411 }
412 }
413}
414
415#[derive(Debug)]
417pub struct NextItem<'a> {
418 pub position: usize,
420 pub original_index: usize,
422 pub storage: &'a AccountStorageEntry,
424}
425
426fn select_from_range_with_start_end_rates(
433 range: Range<usize>,
434 nth: usize,
435 (start_rate, end_rate): (usize, usize),
436) -> usize {
437 let range_len = range.len();
438 let cycle = start_rate + end_rate;
439 let cycle_index = nth % cycle;
440 let cycle_num = nth.checked_div(cycle).expect("rates sum must be positive");
441
442 let index = if cycle_index < start_rate {
443 cycle_num * start_rate + cycle_index
444 } else {
445 let end_index = cycle_num * end_rate + cycle_index - start_rate;
446 range_len - end_index - 1
447 };
448 range.start + index
449}
450
451#[cfg(test)]
452pub(crate) mod tests {
453 use {
454 super::*,
455 crate::accounts_file::{AccountsFileProvider, StorageAccess},
456 std::{iter, path::Path},
457 test_case::test_case,
458 };
459
460 #[test_case(StorageAccess::Mmap)]
461 #[test_case(StorageAccess::File)]
462 fn test_shrink_in_progress(storage_access: StorageAccess) {
463 let storage = AccountStorage::default();
465 let slot = 0;
466 let id = 0;
467 assert!(storage.get_account_storage_entry(slot, id).is_none());
469
470 let common_store_path = Path::new("");
472 let store_file_size = 4000;
473 let store_file_size2 = store_file_size * 2;
474 let entry = Arc::new(AccountStorageEntry::new(
476 common_store_path,
477 slot,
478 id,
479 store_file_size,
480 AccountsFileProvider::AppendVec,
481 storage_access,
482 ));
483 let entry2 = Arc::new(AccountStorageEntry::new(
484 common_store_path,
485 slot,
486 id,
487 store_file_size2,
488 AccountsFileProvider::AppendVec,
489 storage_access,
490 ));
491 storage.map.insert(slot, entry);
492
493 assert_eq!(
495 store_file_size,
496 storage
497 .get_account_storage_entry(slot, id)
498 .map(|entry| entry.accounts.capacity())
499 .unwrap_or_default()
500 );
501
502 storage
504 .shrink_in_progress_map
505 .write()
506 .unwrap()
507 .insert(slot, entry2);
508
509 assert_eq!(
511 store_file_size,
512 storage
513 .get_account_storage_entry(slot, id)
514 .map(|entry| entry.accounts.capacity())
515 .unwrap_or_default()
516 );
517
518 storage.map.remove(&slot).unwrap();
520
521 assert_eq!(
523 store_file_size2,
524 storage
525 .get_account_storage_entry(slot, id)
526 .map(|entry| entry.accounts.capacity())
527 .unwrap_or_default()
528 );
529 }
530
531 impl AccountStorage {
532 fn get_test_storage_with_id(
533 &self,
534 id: AccountsFileId,
535 storage_access: StorageAccess,
536 ) -> Arc<AccountStorageEntry> {
537 let slot = 0;
538 let common_store_path = Path::new("");
540 let store_file_size = 4000;
541 Arc::new(AccountStorageEntry::new(
542 common_store_path,
543 slot,
544 id,
545 store_file_size,
546 AccountsFileProvider::AppendVec,
547 storage_access,
548 ))
549 }
550 fn get_test_storage(&self, storage_access: StorageAccess) -> Arc<AccountStorageEntry> {
551 self.get_test_storage_with_id(0, storage_access)
552 }
553 }
554
555 #[test_case(StorageAccess::Mmap)]
556 #[test_case(StorageAccess::File)]
557 #[should_panic(expected = "self.no_shrink_in_progress()")]
558 fn test_get_slot_storage_entry_fail(storage_access: StorageAccess) {
559 let storage = AccountStorage::default();
560 storage
561 .shrink_in_progress_map
562 .write()
563 .unwrap()
564 .insert(0, storage.get_test_storage(storage_access));
565 storage.get_slot_storage_entry(0);
566 }
567
568 #[test_case(StorageAccess::Mmap)]
569 #[test_case(StorageAccess::File)]
570 #[should_panic(expected = "self.no_shrink_in_progress()")]
571 fn test_all_slots_fail(storage_access: StorageAccess) {
572 let storage = AccountStorage::default();
573 storage
574 .shrink_in_progress_map
575 .write()
576 .unwrap()
577 .insert(0, storage.get_test_storage(storage_access));
578 storage.all_slots();
579 }
580
581 #[test_case(StorageAccess::Mmap)]
582 #[test_case(StorageAccess::File)]
583 #[should_panic(expected = "self.no_shrink_in_progress()")]
584 fn test_initialize_fail(storage_access: StorageAccess) {
585 let mut storage = AccountStorage::default();
586 storage
587 .shrink_in_progress_map
588 .write()
589 .unwrap()
590 .insert(0, storage.get_test_storage(storage_access));
591 storage.initialize(AccountStorageMap::default());
592 }
593
594 #[test_case(StorageAccess::Mmap)]
595 #[test_case(StorageAccess::File)]
596 #[should_panic(
597 expected = "shrink_can_be_active || self.shrink_in_progress_map.read().unwrap().is_empty()"
598 )]
599 fn test_remove_fail(storage_access: StorageAccess) {
600 let storage = AccountStorage::default();
601 storage
602 .shrink_in_progress_map
603 .write()
604 .unwrap()
605 .insert(0, storage.get_test_storage(storage_access));
606 storage.remove(&0, false);
607 }
608
609 #[test_case(StorageAccess::Mmap)]
610 #[test_case(StorageAccess::File)]
611 #[should_panic(expected = "self.no_shrink_in_progress()")]
612 fn test_iter_fail(storage_access: StorageAccess) {
613 let storage = AccountStorage::default();
614 storage
615 .shrink_in_progress_map
616 .write()
617 .unwrap()
618 .insert(0, storage.get_test_storage(storage_access));
619 storage.iter();
620 }
621
622 #[test_case(StorageAccess::Mmap)]
623 #[test_case(StorageAccess::File)]
624 #[should_panic(expected = "self.no_shrink_in_progress()")]
625 fn test_insert_fail(storage_access: StorageAccess) {
626 let storage = AccountStorage::default();
627 let sample = storage.get_test_storage(storage_access);
628 storage
629 .shrink_in_progress_map
630 .write()
631 .unwrap()
632 .insert(0, sample.clone());
633 storage.insert(0, sample);
634 }
635
636 #[test_case(StorageAccess::Mmap)]
637 #[test_case(StorageAccess::File)]
638 #[should_panic(expected = "duplicate call")]
639 fn test_shrinking_in_progress_fail3(storage_access: StorageAccess) {
640 let storage = AccountStorage::default();
642 let sample = storage.get_test_storage(storage_access);
643 storage.map.insert(0, sample.clone());
644 storage
645 .shrink_in_progress_map
646 .write()
647 .unwrap()
648 .insert(0, sample.clone());
649 storage.shrinking_in_progress(0, sample);
650 }
651
652 #[test_case(StorageAccess::Mmap)]
653 #[test_case(StorageAccess::File)]
654 #[should_panic(expected = "duplicate call")]
655 fn test_shrinking_in_progress_fail4(storage_access: StorageAccess) {
656 let storage = AccountStorage::default();
658 let sample_to_shrink = storage.get_test_storage(storage_access);
659 let sample = storage.get_test_storage(storage_access);
660 storage.map.insert(0, sample_to_shrink);
661 let _shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone());
662 storage.shrinking_in_progress(0, sample);
663 }
664
665 #[test_case(StorageAccess::Mmap)]
666 #[test_case(StorageAccess::File)]
667 fn test_shrinking_in_progress_second_call(storage_access: StorageAccess) {
668 let storage = AccountStorage::default();
671 let slot = 0;
672 let id_to_shrink = 1;
673 let id_shrunk = 0;
674 let sample_to_shrink = storage.get_test_storage_with_id(id_to_shrink, storage_access);
675 let sample = storage.get_test_storage(storage_access);
676 storage.map.insert(slot, sample_to_shrink);
677 let shrinking_in_progress = storage.shrinking_in_progress(slot, sample.clone());
678 assert!(storage.map.contains_key(&slot));
679 assert_eq!(id_to_shrink, storage.map.get(&slot).unwrap().id());
680 assert_eq!(
681 (slot, id_shrunk),
682 storage
683 .shrink_in_progress_map
684 .read()
685 .unwrap()
686 .iter()
687 .next()
688 .map(|r| (*r.0, r.1.id()))
689 .unwrap()
690 );
691 drop(shrinking_in_progress);
692 assert!(storage.map.contains_key(&slot));
693 assert_eq!(id_shrunk, storage.map.get(&slot).unwrap().id());
694 assert!(storage.shrink_in_progress_map.read().unwrap().is_empty());
695 storage.shrinking_in_progress(slot, sample);
696 }
697
698 #[test_case(StorageAccess::Mmap)]
699 #[test_case(StorageAccess::File)]
700 #[should_panic(expected = "no pre-existing storage for shrinking slot")]
701 fn test_shrinking_in_progress_fail1(storage_access: StorageAccess) {
702 let storage = AccountStorage::default();
704 let sample = storage.get_test_storage(storage_access);
705 storage.shrinking_in_progress(0, sample);
706 }
707
708 #[test_case(StorageAccess::Mmap)]
709 #[test_case(StorageAccess::File)]
710 #[should_panic(expected = "no pre-existing storage for shrinking slot")]
711 fn test_shrinking_in_progress_fail2(storage_access: StorageAccess) {
712 let storage = AccountStorage::default();
714 let sample = storage.get_test_storage(storage_access);
715 storage.shrinking_in_progress(0, sample);
716 }
717
718 #[test_case(StorageAccess::Mmap)]
719 #[test_case(StorageAccess::File)]
720 fn test_missing(storage_access: StorageAccess) {
721 let storage = AccountStorage::default();
724 let sample = storage.get_test_storage(storage_access);
725 let id = sample.id();
726 let missing_id = 9999;
727 let slot = sample.slot();
728 assert!(storage.get_account_storage_entry(slot, id).is_none());
730 assert!(storage
732 .get_account_storage_entry(slot, missing_id)
733 .is_none());
734 storage.map.insert(slot, sample.clone());
735 assert!(storage.get_account_storage_entry(slot, id).is_some());
737 assert!(storage
738 .get_account_storage_entry(slot, missing_id)
739 .is_none());
740 storage
741 .shrink_in_progress_map
742 .write()
743 .unwrap()
744 .insert(slot, Arc::clone(&sample));
745 assert!(storage
747 .get_account_storage_entry(slot, missing_id)
748 .is_none());
749 assert!(storage.get_account_storage_entry(slot, id).is_some());
750 storage.map.remove(&slot);
751 assert!(storage
753 .get_account_storage_entry(slot, missing_id)
754 .is_none());
755 assert!(storage.get_account_storage_entry(slot, id).is_some());
756 }
757
758 #[test_case(StorageAccess::Mmap)]
759 #[test_case(StorageAccess::File)]
760 fn test_get_if(storage_access: StorageAccess) {
761 let storage = AccountStorage::default();
762 assert!(storage.get_if(|_, _| true).is_empty());
763
764 let ids = [123, 456, 789];
766 for id in ids {
767 let slot = id as Slot;
768 let entry = AccountStorageEntry::new(
769 Path::new(""),
770 slot,
771 id,
772 5000,
773 AccountsFileProvider::AppendVec,
774 storage_access,
775 );
776 storage.map.insert(slot, entry.into());
777 }
778
779 for id in ids {
781 let found = storage.get_if(|slot, _| *slot == id as Slot);
782 assert!(found
783 .iter()
784 .map(|(slot, _)| *slot)
785 .eq(iter::once(id as Slot)));
786 }
787
788 assert!(storage.get_if(|_, _| false).is_empty());
789 assert_eq!(storage.get_if(|_, _| true).len(), ids.len());
790 }
791
792 #[test_case(StorageAccess::Mmap)]
793 #[test_case(StorageAccess::File)]
794 #[should_panic(expected = "self.no_shrink_in_progress()")]
795 fn test_get_if_fail(storage_access: StorageAccess) {
796 let storage = AccountStorage::default();
797 storage
798 .shrink_in_progress_map
799 .write()
800 .unwrap()
801 .insert(0, storage.get_test_storage(storage_access));
802 storage.get_if(|_, _| true);
803 }
804
805 #[test]
806 fn test_select_range_with_start_end_rates() {
807 let interleaved: Vec<_> = (0..10)
808 .map(|i| select_from_range_with_start_end_rates(1..11, i, (2, 1)))
809 .collect();
810 assert_eq!(interleaved, vec![1, 2, 10, 3, 4, 9, 5, 6, 8, 7]);
811
812 let interleaved: Vec<_> = (0..10)
813 .map(|i| select_from_range_with_start_end_rates(1..11, i, (1, 1)))
814 .collect();
815 assert_eq!(interleaved, vec![1, 10, 2, 9, 3, 8, 4, 7, 5, 6]);
816
817 let interleaved: Vec<_> = (0..9)
818 .map(|i| select_from_range_with_start_end_rates(1..10, i, (2, 1)))
819 .collect();
820 assert_eq!(interleaved, vec![1, 2, 9, 3, 4, 8, 5, 6, 7]);
821
822 let interleaved: Vec<_> = (0..9)
823 .map(|i| select_from_range_with_start_end_rates(1..10, i, (1, 2)))
824 .collect();
825 assert_eq!(interleaved, vec![1, 9, 8, 2, 7, 6, 3, 5, 4]);
826
827 let interleaved: Vec<_> = (0..13)
828 .map(|i| select_from_range_with_start_end_rates(1..14, i, (2, 3)))
829 .collect();
830 assert_eq!(interleaved, vec![1, 2, 13, 12, 11, 3, 4, 10, 9, 8, 5, 6, 7]);
831 }
832}