1use dashmap::DashMap;
2use gemachain_sdk::{
3 account::{AccountSharedData, ReadableAccount},
4 clock::Slot,
5 hash::Hash,
6 pubkey::Pubkey,
7};
8use std::{
9 borrow::Borrow,
10 collections::BTreeSet,
11 ops::Deref,
12 sync::{
13 atomic::{AtomicBool, AtomicU64, Ordering},
14 Arc, RwLock,
15 },
16};
17
18pub type SlotCache = Arc<SlotCacheInner>;
19
20#[derive(Default, Debug)]
21pub struct SlotCacheInner {
22 cache: DashMap<Pubkey, CachedAccount>,
23 same_account_writes: AtomicU64,
24 same_account_writes_size: AtomicU64,
25 unique_account_writes_size: AtomicU64,
26 is_frozen: AtomicBool,
27}
28
29impl SlotCacheInner {
30 pub fn report_slot_store_metrics(&self) {
31 datapoint_info!(
32 "slot_repeated_writes",
33 (
34 "same_account_writes",
35 self.same_account_writes.load(Ordering::Relaxed),
36 i64
37 ),
38 (
39 "same_account_writes_size",
40 self.same_account_writes_size.load(Ordering::Relaxed),
41 i64
42 ),
43 (
44 "unique_account_writes_size",
45 self.unique_account_writes_size.load(Ordering::Relaxed),
46 i64
47 )
48 );
49 }
50
51 pub fn get_all_pubkeys(&self) -> Vec<Pubkey> {
52 self.cache.iter().map(|item| *item.key()).collect()
53 }
54
55 pub fn insert(
56 &self,
57 pubkey: &Pubkey,
58 account: AccountSharedData,
59 hash: Option<impl Borrow<Hash>>,
60 slot: Slot,
61 ) -> CachedAccount {
62 if self.cache.contains_key(pubkey) {
63 self.same_account_writes.fetch_add(1, Ordering::Relaxed);
64 self.same_account_writes_size
65 .fetch_add(account.data().len() as u64, Ordering::Relaxed);
66 } else {
67 self.unique_account_writes_size
68 .fetch_add(account.data().len() as u64, Ordering::Relaxed);
69 }
70 let item = Arc::new(CachedAccountInner {
71 account,
72 hash: RwLock::new(hash.map(|h| *h.borrow())),
73 slot,
74 pubkey: *pubkey,
75 });
76 self.cache.insert(*pubkey, item.clone());
77 item
78 }
79
80 pub fn get_cloned(&self, pubkey: &Pubkey) -> Option<CachedAccount> {
81 self.cache
82 .get(pubkey)
83 .map(|account_ref| account_ref.value().clone())
87 }
88
89 pub fn mark_slot_frozen(&self) {
90 self.is_frozen.store(true, Ordering::SeqCst);
91 }
92
93 pub fn is_frozen(&self) -> bool {
94 self.is_frozen.load(Ordering::SeqCst)
95 }
96
97 pub fn total_bytes(&self) -> u64 {
98 self.unique_account_writes_size.load(Ordering::Relaxed)
99 + self.same_account_writes_size.load(Ordering::Relaxed)
100 }
101}
102
103impl Deref for SlotCacheInner {
104 type Target = DashMap<Pubkey, CachedAccount>;
105 fn deref(&self) -> &Self::Target {
106 &self.cache
107 }
108}
109
110pub type CachedAccount = Arc<CachedAccountInner>;
111
112#[derive(Debug)]
113pub struct CachedAccountInner {
114 pub account: AccountSharedData,
115 hash: RwLock<Option<Hash>>,
116 slot: Slot,
117 pubkey: Pubkey,
118}
119
120impl CachedAccountInner {
121 pub fn hash(&self) -> Hash {
122 let hash = self.hash.read().unwrap();
123 match *hash {
124 Some(hash) => hash,
125 None => {
126 drop(hash);
127 let hash = crate::accounts_db::AccountsDb::hash_account(
128 self.slot,
129 &self.account,
130 &self.pubkey,
131 );
132 *self.hash.write().unwrap() = Some(hash);
133 hash
134 }
135 }
136 }
137 pub fn pubkey(&self) -> Pubkey {
138 self.pubkey
139 }
140}
141
142#[derive(Debug, Default)]
143pub struct AccountsCache {
144 cache: DashMap<Slot, SlotCache>,
145 maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
148 max_flushed_root: AtomicU64,
149}
150
151impl AccountsCache {
152 pub fn report_size(&self) {
153 let total_unique_writes_size: u64 = self
154 .cache
155 .iter()
156 .map(|item| {
157 let slot_cache = item.value();
158 slot_cache
159 .unique_account_writes_size
160 .load(Ordering::Relaxed)
161 })
162 .sum();
163 datapoint_info!(
164 "accounts_cache_size",
165 (
166 "num_roots",
167 self.maybe_unflushed_roots.read().unwrap().len(),
168 i64
169 ),
170 ("num_slots", self.cache.len(), i64),
171 ("total_unique_writes_size", total_unique_writes_size, i64),
172 );
173 }
174
175 pub fn store(
176 &self,
177 slot: Slot,
178 pubkey: &Pubkey,
179 account: AccountSharedData,
180 hash: Option<impl Borrow<Hash>>,
181 ) -> CachedAccount {
182 let slot_cache = self.slot_cache(slot).unwrap_or_else(||
183 self
188 .cache
189 .entry(slot)
190 .or_insert(Arc::new(SlotCacheInner::default()))
191 .clone());
192
193 slot_cache.insert(pubkey, account, hash, slot)
194 }
195
196 pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option<CachedAccount> {
197 self.slot_cache(slot)
198 .and_then(|slot_cache| slot_cache.get_cloned(pubkey))
199 }
200
201 pub fn remove_slot(&self, slot: Slot) -> Option<SlotCache> {
202 self.cache.remove(&slot).map(|(_, slot_cache)| slot_cache)
203 }
204
205 pub fn slot_cache(&self, slot: Slot) -> Option<SlotCache> {
206 self.cache.get(&slot).map(|result| result.value().clone())
207 }
208
209 pub fn add_root(&self, root: Slot) {
210 let max_flushed_root = self.fetch_max_flush_root();
211 if root > max_flushed_root || (root == max_flushed_root && root == 0) {
212 self.maybe_unflushed_roots.write().unwrap().insert(root);
213 }
214 }
215
216 pub fn clear_roots(&self, max_root: Option<Slot>) -> BTreeSet<Slot> {
217 let mut w_maybe_unflushed_roots = self.maybe_unflushed_roots.write().unwrap();
218 if let Some(max_root) = max_root {
219 let greater_than_max_root = w_maybe_unflushed_roots.split_off(&(max_root + 1));
223 std::mem::replace(&mut w_maybe_unflushed_roots, greater_than_max_root)
226 } else {
227 std::mem::take(&mut *w_maybe_unflushed_roots)
228 }
229 }
230
231 pub fn remove_slots_le(&self, max_root: Slot) -> Vec<(Slot, SlotCache)> {
234 let mut removed_slots = vec![];
235 self.cache.retain(|slot, slot_cache| {
236 let should_remove = *slot <= max_root;
237 if should_remove {
238 removed_slots.push((*slot, slot_cache.clone()))
239 }
240 !should_remove
241 });
242 removed_slots
243 }
244
245 pub fn find_older_frozen_slots(&self, num_to_retain: usize) -> Vec<Slot> {
246 if self.cache.len() > num_to_retain {
247 let mut slots: Vec<_> = self
248 .cache
249 .iter()
250 .filter_map(|item| {
251 let (slot, slot_cache) = item.pair();
252 if slot_cache.is_frozen() {
253 Some(*slot)
254 } else {
255 None
256 }
257 })
258 .collect();
259 slots.sort_unstable();
260 slots.truncate(slots.len().saturating_sub(num_to_retain));
261 slots
262 } else {
263 vec![]
264 }
265 }
266
267 pub fn num_slots(&self) -> usize {
268 self.cache.len()
269 }
270
271 pub fn fetch_max_flush_root(&self) -> Slot {
272 self.max_flushed_root.load(Ordering::Relaxed)
273 }
274
275 pub fn set_max_flush_root(&self, root: Slot) {
276 self.max_flushed_root.fetch_max(root, Ordering::Relaxed);
277 }
278}
279
280#[cfg(test)]
281pub mod tests {
282 use super::*;
283
284 #[test]
285 fn test_remove_slots_le() {
286 let cache = AccountsCache::default();
287 assert!(cache.remove_slots_le(1).is_empty());
289 let inserted_slot = 0;
290 cache.store(
291 inserted_slot,
292 &Pubkey::new_unique(),
293 AccountSharedData::new(1, 0, &Pubkey::default()),
294 Some(&Hash::default()),
295 );
296 let removed = cache.remove_slots_le(0);
298 assert_eq!(removed.len(), 1);
299 assert_eq!(removed[0].0, inserted_slot);
300 }
301
302 #[test]
303 fn test_find_older_frozen_slots() {
304 let cache = AccountsCache::default();
305 assert!(cache.find_older_frozen_slots(0).is_empty());
307 let inserted_slot = 0;
308 cache.store(
309 inserted_slot,
310 &Pubkey::new_unique(),
311 AccountSharedData::new(1, 0, &Pubkey::default()),
312 Some(&Hash::default()),
313 );
314
315 assert!(cache.find_older_frozen_slots(1).is_empty());
318 assert!(cache.find_older_frozen_slots(0).is_empty());
321 cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
322 assert_eq!(cache.find_older_frozen_slots(0), vec![inserted_slot]);
324 }
325}