gemachain_runtime/
accounts_cache.rs

1use dashmap::DashMap;
2use gemachain_sdk::{
3    account::{AccountSharedData, ReadableAccount},
4    clock::Slot,
5    hash::Hash,
6    pubkey::Pubkey,
7};
8use std::{
9    borrow::Borrow,
10    collections::BTreeSet,
11    ops::Deref,
12    sync::{
13        atomic::{AtomicBool, AtomicU64, Ordering},
14        Arc, RwLock,
15    },
16};
17
18pub type SlotCache = Arc<SlotCacheInner>;
19
20#[derive(Default, Debug)]
21pub struct SlotCacheInner {
22    cache: DashMap<Pubkey, CachedAccount>,
23    same_account_writes: AtomicU64,
24    same_account_writes_size: AtomicU64,
25    unique_account_writes_size: AtomicU64,
26    is_frozen: AtomicBool,
27}
28
29impl SlotCacheInner {
30    pub fn report_slot_store_metrics(&self) {
31        datapoint_info!(
32            "slot_repeated_writes",
33            (
34                "same_account_writes",
35                self.same_account_writes.load(Ordering::Relaxed),
36                i64
37            ),
38            (
39                "same_account_writes_size",
40                self.same_account_writes_size.load(Ordering::Relaxed),
41                i64
42            ),
43            (
44                "unique_account_writes_size",
45                self.unique_account_writes_size.load(Ordering::Relaxed),
46                i64
47            )
48        );
49    }
50
51    pub fn get_all_pubkeys(&self) -> Vec<Pubkey> {
52        self.cache.iter().map(|item| *item.key()).collect()
53    }
54
55    pub fn insert(
56        &self,
57        pubkey: &Pubkey,
58        account: AccountSharedData,
59        hash: Option<impl Borrow<Hash>>,
60        slot: Slot,
61    ) -> CachedAccount {
62        if self.cache.contains_key(pubkey) {
63            self.same_account_writes.fetch_add(1, Ordering::Relaxed);
64            self.same_account_writes_size
65                .fetch_add(account.data().len() as u64, Ordering::Relaxed);
66        } else {
67            self.unique_account_writes_size
68                .fetch_add(account.data().len() as u64, Ordering::Relaxed);
69        }
70        let item = Arc::new(CachedAccountInner {
71            account,
72            hash: RwLock::new(hash.map(|h| *h.borrow())),
73            slot,
74            pubkey: *pubkey,
75        });
76        self.cache.insert(*pubkey, item.clone());
77        item
78    }
79
80    pub fn get_cloned(&self, pubkey: &Pubkey) -> Option<CachedAccount> {
81        self.cache
82            .get(pubkey)
83            // 1) Maybe can eventually use a Cow to avoid a clone on every read
84            // 2) Popping is only safe if it's guaranteed that only
85            //    replay/banking threads are reading from the AccountsDb
86            .map(|account_ref| account_ref.value().clone())
87    }
88
89    pub fn mark_slot_frozen(&self) {
90        self.is_frozen.store(true, Ordering::SeqCst);
91    }
92
93    pub fn is_frozen(&self) -> bool {
94        self.is_frozen.load(Ordering::SeqCst)
95    }
96
97    pub fn total_bytes(&self) -> u64 {
98        self.unique_account_writes_size.load(Ordering::Relaxed)
99            + self.same_account_writes_size.load(Ordering::Relaxed)
100    }
101}
102
103impl Deref for SlotCacheInner {
104    type Target = DashMap<Pubkey, CachedAccount>;
105    fn deref(&self) -> &Self::Target {
106        &self.cache
107    }
108}
109
110pub type CachedAccount = Arc<CachedAccountInner>;
111
112#[derive(Debug)]
113pub struct CachedAccountInner {
114    pub account: AccountSharedData,
115    hash: RwLock<Option<Hash>>,
116    slot: Slot,
117    pubkey: Pubkey,
118}
119
120impl CachedAccountInner {
121    pub fn hash(&self) -> Hash {
122        let hash = self.hash.read().unwrap();
123        match *hash {
124            Some(hash) => hash,
125            None => {
126                drop(hash);
127                let hash = crate::accounts_db::AccountsDb::hash_account(
128                    self.slot,
129                    &self.account,
130                    &self.pubkey,
131                );
132                *self.hash.write().unwrap() = Some(hash);
133                hash
134            }
135        }
136    }
137    pub fn pubkey(&self) -> Pubkey {
138        self.pubkey
139    }
140}
141
142#[derive(Debug, Default)]
143pub struct AccountsCache {
144    cache: DashMap<Slot, SlotCache>,
145    // Queue of potentially unflushed roots. Random eviction + cache too large
146    // could have triggered a flush of this slot already
147    maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
148    max_flushed_root: AtomicU64,
149}
150
151impl AccountsCache {
152    pub fn report_size(&self) {
153        let total_unique_writes_size: u64 = self
154            .cache
155            .iter()
156            .map(|item| {
157                let slot_cache = item.value();
158                slot_cache
159                    .unique_account_writes_size
160                    .load(Ordering::Relaxed)
161            })
162            .sum();
163        datapoint_info!(
164            "accounts_cache_size",
165            (
166                "num_roots",
167                self.maybe_unflushed_roots.read().unwrap().len(),
168                i64
169            ),
170            ("num_slots", self.cache.len(), i64),
171            ("total_unique_writes_size", total_unique_writes_size, i64),
172        );
173    }
174
175    pub fn store(
176        &self,
177        slot: Slot,
178        pubkey: &Pubkey,
179        account: AccountSharedData,
180        hash: Option<impl Borrow<Hash>>,
181    ) -> CachedAccount {
182        let slot_cache = self.slot_cache(slot).unwrap_or_else(||
183            // DashMap entry.or_insert() returns a RefMut, essentially a write lock,
184            // which is dropped after this block ends, minimizing time held by the lock.
185            // However, we still want to persist the reference to the `SlotStores` behind
186            // the lock, hence we clone it out, (`SlotStores` is an Arc so is cheap to clone).
187            self
188                .cache
189                .entry(slot)
190                .or_insert(Arc::new(SlotCacheInner::default()))
191                .clone());
192
193        slot_cache.insert(pubkey, account, hash, slot)
194    }
195
196    pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option<CachedAccount> {
197        self.slot_cache(slot)
198            .and_then(|slot_cache| slot_cache.get_cloned(pubkey))
199    }
200
201    pub fn remove_slot(&self, slot: Slot) -> Option<SlotCache> {
202        self.cache.remove(&slot).map(|(_, slot_cache)| slot_cache)
203    }
204
205    pub fn slot_cache(&self, slot: Slot) -> Option<SlotCache> {
206        self.cache.get(&slot).map(|result| result.value().clone())
207    }
208
209    pub fn add_root(&self, root: Slot) {
210        let max_flushed_root = self.fetch_max_flush_root();
211        if root > max_flushed_root || (root == max_flushed_root && root == 0) {
212            self.maybe_unflushed_roots.write().unwrap().insert(root);
213        }
214    }
215
216    pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
217        let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
218        if let Some(max_root) = max_root {
219            // `greater_than_max_root` contains all slots >= `max_root + 1`, or alternatively,
220            // all slots > `max_root`. Meanwhile, `w_maybe_unflushed_roots` is left with all slots
221            // <= `max_root`.
222            let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
223            // After the replace, `w_maybe_unflushed_roots` contains slots > `max_root`, and
224            // we return all slots <= `max_root`
225            std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
226        } else {
227            std::mem::take(&mut *w_maybe_unflushed_roots)
228        }
229    }
230
231    // Removes slots less than or equal to `max_root`. Only safe to pass in a rooted slot,
232    // otherwise the slot removed could still be undergoing replay!
233    pub fn remove_slots_le(&self, max_root: Slot) -> Vec<(Slot, SlotCache)> {
234        let mut removed_slots = vec![];
235        self.cache.retain(|slot, slot_cache| {
236            let should_remove = *slot <= max_root;
237            if should_remove {
238                removed_slots.push((*slot, slot_cache.clone()))
239            }
240            !should_remove
241        });
242        removed_slots
243    }
244
245    pub fn find_older_frozen_slots(&self, num_to_retain: usize) -> Vec<Slot> {
246        if self.cache.len() > num_to_retain {
247            let mut slots: Vec<_> = self
248                .cache
249                .iter()
250                .filter_map(|item| {
251                    let (slot, slot_cache) = item.pair();
252                    if slot_cache.is_frozen() {
253                        Some(*slot)
254                    } else {
255                        None
256                    }
257                })
258                .collect();
259            slots.sort_unstable();
260            slots.truncate(slots.len().saturating_sub(num_to_retain));
261            slots
262        } else {
263            vec![]
264        }
265    }
266
267    pub fn num_slots(&self) -> usize {
268        self.cache.len()
269    }
270
271    pub fn fetch_max_flush_root(&self) -> Slot {
272        self.max_flushed_root.load(Ordering::Relaxed)
273    }
274
275    pub fn set_max_flush_root(&self, root: Slot) {
276        self.max_flushed_root.fetch_max(root, Ordering::Relaxed);
277    }
278}
279
280#[cfg(test)]
281pub mod tests {
282    use super::*;
283
284    #[test]
285    fn test_remove_slots_le() {
286        let cache = AccountsCache::default();
287        // Cache is empty, should return nothing
288        assert!(cache.remove_slots_le(1).is_empty());
289        let inserted_slot = 0;
290        cache.store(
291            inserted_slot,
292            &Pubkey::new_unique(),
293            AccountSharedData::new(1, 0, &Pubkey::default()),
294            Some(&Hash::default()),
295        );
296        // If the cache is told the size limit is 0, it should return the one slot
297        let removed = cache.remove_slots_le(0);
298        assert_eq!(removed.len(), 1);
299        assert_eq!(removed[0].0, inserted_slot);
300    }
301
302    #[test]
303    fn test_find_older_frozen_slots() {
304        let cache = AccountsCache::default();
305        // Cache is empty, should return nothing
306        assert!(cache.find_older_frozen_slots(0).is_empty());
307        let inserted_slot = 0;
308        cache.store(
309            inserted_slot,
310            &Pubkey::new_unique(),
311            AccountSharedData::new(1, 0, &Pubkey::default()),
312            Some(&Hash::default()),
313        );
314
315        // If the cache is told the size limit is 0, it should return nothing because there's only
316        // one cached slot
317        assert!(cache.find_older_frozen_slots(1).is_empty());
318        // If the cache is told the size limit is 0, it should return nothing, because there's no
319        // frozen slots
320        assert!(cache.find_older_frozen_slots(0).is_empty());
321        cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
322        // If the cache is told the size limit is 0, it should return the one frozen slot
323        assert_eq!(cache.find_older_frozen_slots(0), vec![inserted_slot]);
324    }
325}