1use {
4 crate::accounts_db::{AccountStorageEntry, AccountsFileId},
5 dashmap::DashMap,
6 rand::seq::SliceRandom,
7 rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator},
8 solana_clock::Slot,
9 solana_nohash_hasher::{BuildNoHashHasher, IntMap},
10 std::{
11 ops::{Index, Range},
12 sync::{
13 atomic::{AtomicUsize, Ordering},
14 Arc, RwLock,
15 },
16 },
17};
18
19pub mod stored_account_info;
20
21pub type AccountStorageMap = DashMap<Slot, Arc<AccountStorageEntry>, BuildNoHashHasher<Slot>>;
22
23#[derive(Default, Debug)]
24pub struct AccountStorage {
25 map: AccountStorageMap,
27 shrink_in_progress_map: RwLock<IntMap<Slot, Arc<AccountStorageEntry>>>,
31}
32
33impl AccountStorage {
34 pub(crate) fn get_account_storage_entry(
53 &self,
54 slot: Slot,
55 store_id: AccountsFileId,
56 ) -> Option<Arc<AccountStorageEntry>> {
57 let lookup_in_map = || {
58 self.map.get(&slot).and_then(|entry| {
59 (entry.value().id() == store_id).then_some(Arc::clone(entry.value()))
60 })
61 };
62
63 lookup_in_map()
64 .or_else(|| {
65 self.shrink_in_progress_map
66 .read()
67 .unwrap()
68 .get(&slot)
69 .and_then(|entry| (entry.id() == store_id).then(|| Arc::clone(entry)))
70 })
71 .or_else(lookup_in_map)
72 }
73
74 pub(crate) fn no_shrink_in_progress(&self) -> bool {
76 self.shrink_in_progress_map.read().unwrap().is_empty()
77 }
78
79 pub fn get_slot_storage_entry(&self, slot: Slot) -> Option<Arc<AccountStorageEntry>> {
82 assert!(
83 self.no_shrink_in_progress(),
84 "self.no_shrink_in_progress(): {slot}"
85 );
86 self.get_slot_storage_entry_shrinking_in_progress_ok(slot)
87 }
88
89 pub(super) fn all_storages(&self) -> Vec<Arc<AccountStorageEntry>> {
90 assert!(self.no_shrink_in_progress());
91 self.map
92 .iter()
93 .map(|item| Arc::clone(item.value()))
94 .collect()
95 }
96
97 pub(crate) fn replace_storage_with_equivalent(
98 &self,
99 slot: Slot,
100 storage: Arc<AccountStorageEntry>,
101 ) {
102 assert_eq!(storage.slot(), slot);
103 if let Some(mut existing_storage) = self.map.get_mut(&slot) {
104 assert_eq!(slot, existing_storage.value().slot());
105 *existing_storage.value_mut() = storage;
106 }
107 }
108
109 pub(crate) fn get_slot_storage_entry_shrinking_in_progress_ok(
111 &self,
112 slot: Slot,
113 ) -> Option<Arc<AccountStorageEntry>> {
114 self.map.get(&slot).map(|entry| Arc::clone(entry.value()))
115 }
116
117 pub(crate) fn all_slots(&self) -> Vec<Slot> {
118 assert!(self.no_shrink_in_progress());
119 self.map.iter().map(|iter_item| *iter_item.key()).collect()
120 }
121
122 #[cfg(test)]
124 pub(crate) fn is_empty_entry(&self, slot: Slot) -> bool {
125 assert!(
126 self.no_shrink_in_progress(),
127 "self.no_shrink_in_progress(): {slot}"
128 );
129 self.map.get(&slot).is_none()
130 }
131
132 pub fn initialize(&mut self, all_storages: AccountStorageMap) {
134 assert!(self.map.is_empty());
135 assert!(self.no_shrink_in_progress());
136 self.map.extend(all_storages)
137 }
138
139 pub(crate) fn remove(
142 &self,
143 slot: &Slot,
144 shrink_can_be_active: bool,
145 ) -> Option<Arc<AccountStorageEntry>> {
146 assert!(shrink_can_be_active || self.shrink_in_progress_map.read().unwrap().is_empty());
147 self.map.remove(slot).map(|(_, storage)| storage)
148 }
149
150 pub(crate) fn iter(&self) -> AccountStorageIter<'_> {
152 assert!(self.no_shrink_in_progress());
153 AccountStorageIter::new(self)
154 }
155
156 pub(crate) fn insert(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
157 assert!(
158 self.no_shrink_in_progress(),
159 "self.no_shrink_in_progress(): {slot}"
160 );
161 assert!(self.map.insert(slot, store).is_none());
162 }
163
164 pub(crate) fn shrinking_in_progress(
171 &self,
172 slot: Slot,
173 new_store: Arc<AccountStorageEntry>,
174 ) -> ShrinkInProgress<'_> {
175 let shrinking_store = Arc::clone(
176 self.map
177 .get(&slot)
178 .expect("no pre-existing storage for shrinking slot")
179 .value(),
180 );
181
182 assert!(
184 self.shrink_in_progress_map
185 .write()
186 .unwrap()
187 .insert(slot, Arc::clone(&new_store))
188 .is_none(),
189 "duplicate call"
190 );
191
192 ShrinkInProgress {
193 storage: self,
194 slot,
195 new_store,
196 old_store: shrinking_store,
197 }
198 }
199
200 #[cfg(test)]
201 pub(crate) fn len(&self) -> usize {
202 self.map.len()
203 }
204
205 pub fn get_if(
214 &self,
215 predicate: impl Fn(&Slot, &AccountStorageEntry) -> bool,
216 ) -> Box<[(Slot, Arc<AccountStorageEntry>)]> {
217 assert!(self.no_shrink_in_progress());
218 self.map
219 .iter()
220 .filter_map(|entry| {
221 let slot = entry.key();
222 let storage = entry.value();
223 predicate(slot, storage).then(|| (*slot, Arc::clone(storage)))
224 })
225 .collect()
226 }
227}
228
229pub struct AccountStorageIter<'a> {
231 iter: dashmap::iter::Iter<'a, Slot, Arc<AccountStorageEntry>, BuildNoHashHasher<Slot>>,
232}
233
234impl<'a> AccountStorageIter<'a> {
235 pub fn new(storage: &'a AccountStorage) -> Self {
236 Self {
237 iter: storage.map.iter(),
238 }
239 }
240}
241
242impl Iterator for AccountStorageIter<'_> {
243 type Item = (Slot, Arc<AccountStorageEntry>);
244
245 fn next(&mut self) -> Option<Self::Item> {
246 if let Some(entry) = self.iter.next() {
247 let slot = entry.key();
248 let store = entry.value();
249 return Some((*slot, Arc::clone(store)));
250 }
251 None
252 }
253}
254
255#[derive(Debug)]
258pub struct ShrinkInProgress<'a> {
259 storage: &'a AccountStorage,
260 old_store: Arc<AccountStorageEntry>,
262 new_store: Arc<AccountStorageEntry>,
264 slot: Slot,
265}
266
267impl Drop for ShrinkInProgress<'_> {
269 fn drop(&mut self) {
270 assert_eq!(
271 self.storage
272 .map
273 .insert(self.slot, Arc::clone(&self.new_store))
274 .map(|store| store.id()),
275 Some(self.old_store.id())
276 );
277
278 assert!(self
280 .storage
281 .shrink_in_progress_map
282 .write()
283 .unwrap()
284 .remove(&self.slot)
285 .is_some());
286 }
287}
288
289impl ShrinkInProgress<'_> {
290 pub fn new_storage(&self) -> &Arc<AccountStorageEntry> {
291 &self.new_store
292 }
293 pub(crate) fn old_storage(&self) -> &Arc<AccountStorageEntry> {
294 &self.old_store
295 }
296}
297
298#[cfg_attr(feature = "frozen-abi", derive(AbiExample, AbiEnumVisitor))]
299#[derive(Debug, Eq, PartialEq, Copy, Clone, Deserialize, Serialize)]
300pub enum AccountStorageStatus {
301 Available = 0,
302 Full = 1,
303 Candidate = 2,
304}
305
306impl Default for AccountStorageStatus {
307 fn default() -> Self {
308 Self::Available
309 }
310}
311
312pub struct AccountStoragesOrderer<'a> {
318 storages: &'a [Arc<AccountStorageEntry>],
319 indices: Box<[usize]>,
320}
321
322impl<'a> AccountStoragesOrderer<'a> {
323 pub fn with_small_to_large_ratio(
328 storages: &'a [Arc<AccountStorageEntry>],
329 small_to_large_ratio: (usize, usize),
330 ) -> Self {
331 let len_range = 0..storages.len();
332 let mut indices: Vec<_> = len_range.clone().collect();
333 indices.sort_unstable_by_key(|i| storages[*i].capacity());
334 indices.iter_mut().for_each(|i| {
335 *i = select_from_range_with_start_end_rates(len_range.clone(), *i, small_to_large_ratio)
336 });
337 Self {
338 storages,
339 indices: indices.into_boxed_slice(),
340 }
341 }
342
343 pub fn with_random_order(storages: &'a [Arc<AccountStorageEntry>]) -> Self {
345 let mut indices: Vec<usize> = (0..storages.len()).collect();
346 indices.shuffle(&mut rand::thread_rng());
347 Self {
348 storages,
349 indices: indices.into_boxed_slice(),
350 }
351 }
352
353 pub fn entries_len(&self) -> usize {
354 self.indices.len()
355 }
356
357 pub fn iter(&'a self) -> impl ExactSizeIterator<Item = &'a AccountStorageEntry> + 'a {
358 self.indices.iter().map(|i| self.storages[*i].as_ref())
359 }
360
361 pub fn par_iter(&'a self) -> impl IndexedParallelIterator<Item = &'a AccountStorageEntry> + 'a {
362 self.indices.par_iter().map(|i| self.storages[*i].as_ref())
363 }
364
365 pub fn into_concurrent_consumer(self) -> AccountStoragesConcurrentConsumer<'a> {
366 AccountStoragesConcurrentConsumer::new(self)
367 }
368}
369
370impl Index<usize> for AccountStoragesOrderer<'_> {
371 type Output = AccountStorageEntry;
372
373 fn index(&self, index: usize) -> &Self::Output {
374 self.storages[self.indices[index]].as_ref()
375 }
376}
377
378pub struct AccountStoragesConcurrentConsumer<'a> {
386 orderer: AccountStoragesOrderer<'a>,
387 current_index: AtomicUsize,
388}
389
390impl<'a> AccountStoragesConcurrentConsumer<'a> {
391 pub fn new(orderer: AccountStoragesOrderer<'a>) -> Self {
392 Self {
393 orderer,
394 current_index: AtomicUsize::new(0),
395 }
396 }
397
398 pub fn next(&'a self) -> Option<&'a AccountStorageEntry> {
401 let index = self.current_index.fetch_add(1, Ordering::Relaxed);
402 if index < self.orderer.entries_len() {
403 Some(&self.orderer[index])
404 } else {
405 None
406 }
407 }
408}
409
410fn select_from_range_with_start_end_rates(
417 range: Range<usize>,
418 nth: usize,
419 (start_rate, end_rate): (usize, usize),
420) -> usize {
421 let range_len = range.len();
422 let cycle = start_rate + end_rate;
423 let cycle_index = nth % cycle;
424 let cycle_num = nth.checked_div(cycle).expect("rates sum must be positive");
425
426 let index = if cycle_index < start_rate {
427 cycle_num * start_rate + cycle_index
428 } else {
429 let end_index = cycle_num * end_rate + cycle_index - start_rate;
430 range_len - end_index - 1
431 };
432 range.start + index
433}
434
435#[cfg(test)]
436pub(crate) mod tests {
437 use {
438 super::*,
439 crate::accounts_file::AccountsFileProvider,
440 std::{iter, path::Path},
441 };
442
443 #[test]
444 fn test_shrink_in_progress() {
445 let storage = AccountStorage::default();
447 let slot = 0;
448 let id = 0;
449 assert!(storage.get_account_storage_entry(slot, id).is_none());
451
452 let common_store_path = Path::new("");
454 let store_file_size = 4000;
455 let store_file_size2 = store_file_size * 2;
456 let entry = Arc::new(AccountStorageEntry::new(
458 common_store_path,
459 slot,
460 id,
461 store_file_size,
462 AccountsFileProvider::AppendVec,
463 ));
464 let entry2 = Arc::new(AccountStorageEntry::new(
465 common_store_path,
466 slot,
467 id,
468 store_file_size2,
469 AccountsFileProvider::AppendVec,
470 ));
471 storage.map.insert(slot, entry);
472
473 assert_eq!(
475 store_file_size,
476 storage
477 .get_account_storage_entry(slot, id)
478 .map(|entry| entry.accounts.capacity())
479 .unwrap_or_default()
480 );
481
482 storage
484 .shrink_in_progress_map
485 .write()
486 .unwrap()
487 .insert(slot, entry2);
488
489 assert_eq!(
491 store_file_size,
492 storage
493 .get_account_storage_entry(slot, id)
494 .map(|entry| entry.accounts.capacity())
495 .unwrap_or_default()
496 );
497
498 storage.map.remove(&slot).unwrap();
500
501 assert_eq!(
503 store_file_size2,
504 storage
505 .get_account_storage_entry(slot, id)
506 .map(|entry| entry.accounts.capacity())
507 .unwrap_or_default()
508 );
509 }
510
511 impl AccountStorage {
512 fn get_test_storage_with_id(&self, id: AccountsFileId) -> Arc<AccountStorageEntry> {
513 let slot = 0;
514 let common_store_path = Path::new("");
516 let store_file_size = 4000;
517 Arc::new(AccountStorageEntry::new(
518 common_store_path,
519 slot,
520 id,
521 store_file_size,
522 AccountsFileProvider::AppendVec,
523 ))
524 }
525 fn get_test_storage(&self) -> Arc<AccountStorageEntry> {
526 self.get_test_storage_with_id(0)
527 }
528 }
529
530 #[test]
531 #[should_panic(expected = "self.no_shrink_in_progress()")]
532 fn test_get_slot_storage_entry_fail() {
533 let storage = AccountStorage::default();
534 storage
535 .shrink_in_progress_map
536 .write()
537 .unwrap()
538 .insert(0, storage.get_test_storage());
539 storage.get_slot_storage_entry(0);
540 }
541
542 #[test]
543 #[should_panic(expected = "self.no_shrink_in_progress()")]
544 fn test_all_slots_fail() {
545 let storage = AccountStorage::default();
546 storage
547 .shrink_in_progress_map
548 .write()
549 .unwrap()
550 .insert(0, storage.get_test_storage());
551 storage.all_slots();
552 }
553
554 #[test]
555 #[should_panic(expected = "self.no_shrink_in_progress()")]
556 fn test_initialize_fail() {
557 let mut storage = AccountStorage::default();
558 storage
559 .shrink_in_progress_map
560 .write()
561 .unwrap()
562 .insert(0, storage.get_test_storage());
563 storage.initialize(AccountStorageMap::default());
564 }
565
566 #[test]
567 #[should_panic(
568 expected = "shrink_can_be_active || self.shrink_in_progress_map.read().unwrap().is_empty()"
569 )]
570 fn test_remove_fail() {
571 let storage = AccountStorage::default();
572 storage
573 .shrink_in_progress_map
574 .write()
575 .unwrap()
576 .insert(0, storage.get_test_storage());
577 storage.remove(&0, false);
578 }
579
580 #[test]
581 #[should_panic(expected = "self.no_shrink_in_progress()")]
582 fn test_iter_fail() {
583 let storage = AccountStorage::default();
584 storage
585 .shrink_in_progress_map
586 .write()
587 .unwrap()
588 .insert(0, storage.get_test_storage());
589 storage.iter();
590 }
591
592 #[test]
593 #[should_panic(expected = "self.no_shrink_in_progress()")]
594 fn test_insert_fail() {
595 let storage = AccountStorage::default();
596 let sample = storage.get_test_storage();
597 storage
598 .shrink_in_progress_map
599 .write()
600 .unwrap()
601 .insert(0, sample.clone());
602 storage.insert(0, sample);
603 }
604
605 #[test]
606 #[should_panic(expected = "duplicate call")]
607 fn test_shrinking_in_progress_fail3() {
608 let storage = AccountStorage::default();
610 let sample = storage.get_test_storage();
611 storage.map.insert(0, sample.clone());
612 storage
613 .shrink_in_progress_map
614 .write()
615 .unwrap()
616 .insert(0, sample.clone());
617 storage.shrinking_in_progress(0, sample);
618 }
619
620 #[test]
621 #[should_panic(expected = "duplicate call")]
622 fn test_shrinking_in_progress_fail4() {
623 let storage = AccountStorage::default();
625 let sample_to_shrink = storage.get_test_storage();
626 let sample = storage.get_test_storage();
627 storage.map.insert(0, sample_to_shrink);
628 let _shrinking_in_progress = storage.shrinking_in_progress(0, sample.clone());
629 storage.shrinking_in_progress(0, sample);
630 }
631
632 #[test]
633 fn test_shrinking_in_progress_second_call() {
634 let storage = AccountStorage::default();
637 let slot = 0;
638 let id_to_shrink = 1;
639 let id_shrunk = 0;
640 let sample_to_shrink = storage.get_test_storage_with_id(id_to_shrink);
641 let sample = storage.get_test_storage();
642 storage.map.insert(slot, sample_to_shrink);
643 let shrinking_in_progress = storage.shrinking_in_progress(slot, sample.clone());
644 assert!(storage.map.contains_key(&slot));
645 assert_eq!(id_to_shrink, storage.map.get(&slot).unwrap().id());
646 assert_eq!(
647 (slot, id_shrunk),
648 storage
649 .shrink_in_progress_map
650 .read()
651 .unwrap()
652 .iter()
653 .next()
654 .map(|r| (*r.0, r.1.id()))
655 .unwrap()
656 );
657 drop(shrinking_in_progress);
658 assert!(storage.map.contains_key(&slot));
659 assert_eq!(id_shrunk, storage.map.get(&slot).unwrap().id());
660 assert!(storage.shrink_in_progress_map.read().unwrap().is_empty());
661 storage.shrinking_in_progress(slot, sample);
662 }
663
664 #[test]
665 #[should_panic(expected = "no pre-existing storage for shrinking slot")]
666 fn test_shrinking_in_progress_fail1() {
667 let storage = AccountStorage::default();
669 let sample = storage.get_test_storage();
670 storage.shrinking_in_progress(0, sample);
671 }
672
673 #[test]
674 #[should_panic(expected = "no pre-existing storage for shrinking slot")]
675 fn test_shrinking_in_progress_fail2() {
676 let storage = AccountStorage::default();
678 let sample = storage.get_test_storage();
679 storage.shrinking_in_progress(0, sample);
680 }
681
682 #[test]
683 fn test_missing() {
684 let storage = AccountStorage::default();
687 let sample = storage.get_test_storage();
688 let id = sample.id();
689 let missing_id = 9999;
690 let slot = sample.slot();
691 assert!(storage.get_account_storage_entry(slot, id).is_none());
693 assert!(storage
695 .get_account_storage_entry(slot, missing_id)
696 .is_none());
697 storage.map.insert(slot, sample.clone());
698 assert!(storage.get_account_storage_entry(slot, id).is_some());
700 assert!(storage
701 .get_account_storage_entry(slot, missing_id)
702 .is_none());
703 storage
704 .shrink_in_progress_map
705 .write()
706 .unwrap()
707 .insert(slot, Arc::clone(&sample));
708 assert!(storage
710 .get_account_storage_entry(slot, missing_id)
711 .is_none());
712 assert!(storage.get_account_storage_entry(slot, id).is_some());
713 storage.map.remove(&slot);
714 assert!(storage
716 .get_account_storage_entry(slot, missing_id)
717 .is_none());
718 assert!(storage.get_account_storage_entry(slot, id).is_some());
719 }
720
721 #[test]
722 fn test_get_if() {
723 let storage = AccountStorage::default();
724 assert!(storage.get_if(|_, _| true).is_empty());
725
726 let ids = [123, 456, 789];
728 for id in ids {
729 let slot = id as Slot;
730 let entry = AccountStorageEntry::new(
731 Path::new(""),
732 slot,
733 id,
734 5000,
735 AccountsFileProvider::AppendVec,
736 );
737 storage.map.insert(slot, entry.into());
738 }
739
740 for id in ids {
742 let found = storage.get_if(|slot, _| *slot == id as Slot);
743 assert!(found
744 .iter()
745 .map(|(slot, _)| *slot)
746 .eq(iter::once(id as Slot)));
747 }
748
749 assert!(storage.get_if(|_, _| false).is_empty());
750 assert_eq!(storage.get_if(|_, _| true).len(), ids.len());
751 }
752
753 #[test]
754 #[should_panic(expected = "self.no_shrink_in_progress()")]
755 fn test_get_if_fail() {
756 let storage = AccountStorage::default();
757 storage
758 .shrink_in_progress_map
759 .write()
760 .unwrap()
761 .insert(0, storage.get_test_storage());
762 storage.get_if(|_, _| true);
763 }
764
765 #[test]
766 fn test_select_range_with_start_end_rates() {
767 let interleaved: Vec<_> = (0..10)
768 .map(|i| select_from_range_with_start_end_rates(1..11, i, (2, 1)))
769 .collect();
770 assert_eq!(interleaved, vec![1, 2, 10, 3, 4, 9, 5, 6, 8, 7]);
771
772 let interleaved: Vec<_> = (0..10)
773 .map(|i| select_from_range_with_start_end_rates(1..11, i, (1, 1)))
774 .collect();
775 assert_eq!(interleaved, vec![1, 10, 2, 9, 3, 8, 4, 7, 5, 6]);
776
777 let interleaved: Vec<_> = (0..9)
778 .map(|i| select_from_range_with_start_end_rates(1..10, i, (2, 1)))
779 .collect();
780 assert_eq!(interleaved, vec![1, 2, 9, 3, 4, 8, 5, 6, 7]);
781
782 let interleaved: Vec<_> = (0..9)
783 .map(|i| select_from_range_with_start_end_rates(1..10, i, (1, 2)))
784 .collect();
785 assert_eq!(interleaved, vec![1, 9, 8, 2, 7, 6, 3, 5, 4]);
786
787 let interleaved: Vec<_> = (0..13)
788 .map(|i| select_from_range_with_start_end_rates(1..14, i, (2, 3)))
789 .collect();
790 assert_eq!(interleaved, vec![1, 2, 13, 12, 11, 3, 4, 10, 9, 8, 5, 6, 7]);
791 }
792}