solana_accounts_db/
accounts_cache.rs

1use {
2    dashmap::DashMap,
3    solana_account::{AccountSharedData, ReadableAccount},
4    solana_clock::Slot,
5    solana_nohash_hasher::BuildNoHashHasher,
6    solana_pubkey::{Pubkey, PubkeyHasherBuilder},
7    std::{
8        collections::BTreeSet,
9        ops::Deref,
10        sync::{
11            atomic::{AtomicBool, AtomicU64, Ordering},
12            Arc, RwLock,
13        },
14    },
15};
16
17#[derive(Debug)]
18pub struct SlotCache {
19    cache: DashMap<Pubkey, Arc<CachedAccount>, PubkeyHasherBuilder>,
20    same_account_writes: AtomicU64,
21    same_account_writes_size: AtomicU64,
22    unique_account_writes_size: AtomicU64,
23    /// The size of account data stored in `cache` (just this slot), in bytes
24    size: AtomicU64,
25    /// The size of account data stored in the whole AccountsCache, in bytes
26    total_size: Arc<AtomicU64>,
27    is_frozen: AtomicBool,
28    /// The number of accounts stored in `cache` (just this slot)
29    accounts_count: AtomicU64,
30    /// The number of accounts stored in the whole AccountsCache
31    total_accounts_count: Arc<AtomicU64>,
32}
33
34impl Drop for SlotCache {
35    fn drop(&mut self) {
36        // broader cache no longer holds our size/counts in memory
37        self.total_size
38            .fetch_sub(*self.size.get_mut(), Ordering::Relaxed);
39        self.total_accounts_count
40            .fetch_sub(*self.accounts_count.get_mut(), Ordering::Relaxed);
41    }
42}
43
44impl SlotCache {
45    pub fn report_slot_store_metrics(&self) {
46        datapoint_info!(
47            "slot_repeated_writes",
48            (
49                "same_account_writes",
50                self.same_account_writes.load(Ordering::Relaxed),
51                i64
52            ),
53            (
54                "same_account_writes_size",
55                self.same_account_writes_size.load(Ordering::Relaxed),
56                i64
57            ),
58            (
59                "unique_account_writes_size",
60                self.unique_account_writes_size.load(Ordering::Relaxed),
61                i64
62            ),
63            ("size", self.size.load(Ordering::Relaxed), i64),
64            (
65                "accounts_count",
66                self.accounts_count.load(Ordering::Relaxed),
67                i64
68            )
69        );
70    }
71
72    pub fn insert(&self, pubkey: &Pubkey, account: AccountSharedData) -> Arc<CachedAccount> {
73        let data_len = account.data().len() as u64;
74        let item = Arc::new(CachedAccount {
75            account,
76            pubkey: *pubkey,
77        });
78        if let Some(old) = self.cache.insert(*pubkey, item.clone()) {
79            self.same_account_writes.fetch_add(1, Ordering::Relaxed);
80            self.same_account_writes_size
81                .fetch_add(data_len, Ordering::Relaxed);
82
83            let old_len = old.account.data().len() as u64;
84            let grow = data_len.saturating_sub(old_len);
85            if grow > 0 {
86                self.size.fetch_add(grow, Ordering::Relaxed);
87                self.total_size.fetch_add(grow, Ordering::Relaxed);
88            } else {
89                let shrink = old_len.saturating_sub(data_len);
90                if shrink > 0 {
91                    self.size.fetch_sub(shrink, Ordering::Relaxed);
92                    self.total_size.fetch_sub(shrink, Ordering::Relaxed);
93                }
94            }
95        } else {
96            self.size.fetch_add(data_len, Ordering::Relaxed);
97            self.total_size.fetch_add(data_len, Ordering::Relaxed);
98            self.unique_account_writes_size
99                .fetch_add(data_len, Ordering::Relaxed);
100            self.accounts_count.fetch_add(1, Ordering::Relaxed);
101            self.total_accounts_count.fetch_add(1, Ordering::Relaxed);
102        }
103        item
104    }
105
106    pub fn get_cloned(&self, pubkey: &Pubkey) -> Option<Arc<CachedAccount>> {
107        self.cache
108            .get(pubkey)
109            // 1) Maybe can eventually use a Cow to avoid a clone on every read
110            // 2) Popping is only safe if it's guaranteed that only
111            //    replay/banking threads are reading from the AccountsDb
112            .map(|account_ref| account_ref.value().clone())
113    }
114
115    pub fn mark_slot_frozen(&self) {
116        self.is_frozen.store(true, Ordering::Release);
117    }
118
119    pub fn is_frozen(&self) -> bool {
120        self.is_frozen.load(Ordering::Acquire)
121    }
122
123    pub fn total_bytes(&self) -> u64 {
124        self.unique_account_writes_size.load(Ordering::Relaxed)
125            + self.same_account_writes_size.load(Ordering::Relaxed)
126    }
127}
128
129impl Deref for SlotCache {
130    type Target = DashMap<Pubkey, Arc<CachedAccount>, PubkeyHasherBuilder>;
131    fn deref(&self) -> &Self::Target {
132        &self.cache
133    }
134}
135
136#[derive(Debug)]
137pub struct CachedAccount {
138    pub account: AccountSharedData,
139    pubkey: Pubkey,
140}
141
142impl CachedAccount {
143    pub fn pubkey(&self) -> &Pubkey {
144        &self.pubkey
145    }
146}
147
148#[derive(Debug, Default)]
149pub struct AccountsCache {
150    cache: DashMap<Slot, Arc<SlotCache>, BuildNoHashHasher<Slot>>,
151    // Queue of potentially unflushed roots. Random eviction + cache too large
152    // could have triggered a flush of this slot already
153    maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
154    max_flushed_root: AtomicU64,
155    /// The size of account data stored in the whole AccountsCache, in bytes
156    total_size: Arc<AtomicU64>,
157    /// The number of accounts stored in the whole AccountsCache
158    total_accounts_counts: Arc<AtomicU64>,
159}
160
161impl AccountsCache {
162    pub fn new_inner(&self) -> Arc<SlotCache> {
163        Arc::new(SlotCache {
164            cache: DashMap::default(),
165            same_account_writes: AtomicU64::default(),
166            same_account_writes_size: AtomicU64::default(),
167            unique_account_writes_size: AtomicU64::default(),
168            size: AtomicU64::default(),
169            total_size: Arc::clone(&self.total_size),
170            is_frozen: AtomicBool::default(),
171            accounts_count: AtomicU64::new(0),
172            total_accounts_count: Arc::clone(&self.total_accounts_counts),
173        })
174    }
175    pub fn size(&self) -> u64 {
176        self.total_size.load(Ordering::Relaxed)
177    }
178    pub fn report_size(&self) {
179        datapoint_info!(
180            "accounts_cache_size",
181            (
182                "num_roots",
183                self.maybe_unflushed_roots.read().unwrap().len(),
184                i64
185            ),
186            ("num_slots", self.cache.len(), i64),
187            ("total_size", self.size(), i64),
188            (
189                "total_accounts_count",
190                self.total_accounts_counts.load(Ordering::Relaxed),
191                i64
192            ),
193        );
194    }
195
196    pub fn store(
197        &self,
198        slot: Slot,
199        pubkey: &Pubkey,
200        account: AccountSharedData,
201    ) -> Arc<CachedAccount> {
202        let slot_cache = self.slot_cache(slot).unwrap_or_else(||
203            // DashMap entry.or_insert() returns a RefMut, essentially a write lock,
204            // which is dropped after this block ends, minimizing time held by the lock.
205            // However, we still want to persist the reference to the `SlotStores` behind
206            // the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone).
207            self
208                .cache
209                .entry(slot)
210                .or_insert_with(|| self.new_inner())
211                .clone());
212
213        slot_cache.insert(pubkey, account)
214    }
215
216    pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option<Arc<CachedAccount>> {
217        self.slot_cache(slot)
218            .and_then(|slot_cache| slot_cache.get_cloned(pubkey))
219    }
220
221    pub fn remove_slot(&self, slot: Slot) -> Option<Arc<SlotCache>> {
222        self.cache.remove(&slot).map(|(_, slot_cache)| slot_cache)
223    }
224
225    pub fn slot_cache(&self, slot: Slot) -> Option<Arc<SlotCache>> {
226        self.cache.get(&slot).map(|result| result.value().clone())
227    }
228
229    pub fn add_root(&self, root: Slot) {
230        self.maybe_unflushed_roots.write().unwrap().insert(root);
231    }
232
233    pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
234        let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
235        if let Some(max_root) = max_root {
236            // `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively,
237            // all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots
238            // <= `max_root`.
239            let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
240            // After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and
241            // we return all slots <= `max_root`
242            std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
243        } else {
244            std::mem::take(&mut *w_maybe_unflushed_roots)
245        }
246    }
247
248    pub fn cached_frozen_slots(&self) -> Vec<Slot> {
249        self.cache
250            .iter()
251            .filter_map(|item| {
252                let (slot, slot_cache) = item.pair();
253                slot_cache.is_frozen().then_some(*slot)
254            })
255            .collect()
256    }
257
258    pub fn contains(&self, slot: Slot) -> bool {
259        self.cache.contains_key(&slot)
260    }
261
262    pub fn num_slots(&self) -> usize {
263        self.cache.len()
264    }
265
266    pub fn fetch_max_flush_root(&self) -> Slot {
267        self.max_flushed_root.load(Ordering::Acquire)
268    }
269
270    pub fn set_max_flush_root(&self, root: Slot) {
271        self.max_flushed_root.fetch_max(root, Ordering::Release);
272    }
273}
274
275#[cfg(test)]
276pub mod tests {
277    use super::*;
278
279    impl AccountsCache {
280        // Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot,
281        // otherwise the slot removed could still be undergoing replay!
282        pub fn remove_slots_le(&self, max_root: Slot) -> Vec<(Slot, Arc<SlotCache>)> {
283            let mut removed_slots = vec![];
284            self.cache.retain(|slot, slot_cache| {
285                let should_remove = *slot <= max_root;
286                if should_remove {
287                    removed_slots.push((*slot, slot_cache.clone()))
288                }
289                !should_remove
290            });
291            removed_slots
292        }
293    }
294
295    #[test]
296    fn test_remove_slots_le() {
297        let cache = AccountsCache::default();
298        // Cache is empty, should return nothing
299        assert!(cache.remove_slots_le(1).is_empty());
300        let inserted_slot = 0;
301        cache.store(
302            inserted_slot,
303            &Pubkey::new_unique(),
304            AccountSharedData::new(1, 0, &Pubkey::default()),
305        );
306        // If the cache is told the size limit is 0, it should return the one slot
307        let removed = cache.remove_slots_le(0);
308        assert_eq!(removed.len(), 1);
309        assert_eq!(removed[0].0, inserted_slot);
310    }
311
312    #[test]
313    fn test_cached_frozen_slots() {
314        let cache = AccountsCache::default();
315        // Cache is empty, should return nothing
316        assert!(cache.cached_frozen_slots().is_empty());
317        let inserted_slot = 0;
318        cache.store(
319            inserted_slot,
320            &Pubkey::new_unique(),
321            AccountSharedData::new(1, 0, &Pubkey::default()),
322        );
323
324        // If the cache is told the size limit is 0, it should return nothing, because there's no
325        // frozen slots
326        assert!(cache.cached_frozen_slots().is_empty());
327        cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
328        // If the cache is told the size limit is 0, it should return the one frozen slot
329        assert_eq!(cache.cached_frozen_slots(), vec![inserted_slot]);
330    }
331}