atlas_runtime/
status_cache.rs

1use {
2    log::*,
3    rand::{thread_rng, Rng},
4    serde::Serialize,
5    solana_accounts_db::ancestors::Ancestors,
6    solana_clock::{Slot, MAX_RECENT_BLOCKHASHES},
7    solana_hash::Hash,
8    std::{
9        collections::{hash_map::Entry, HashMap, HashSet},
10        sync::{Arc, Mutex},
11    },
12};
13
14pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
15const CACHED_KEY_SIZE: usize = 20;
16
17// Store forks in a single chunk of memory to avoid another lookup.
18pub type ForkStatus<T> = Vec<(Slot, T)>;
19pub(crate) type KeySlice = [u8; CACHED_KEY_SIZE];
20type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;
21// Map of Hash and status
22pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;
23// A Map of hash + the highest fork it's been observed on along with
24// the key offset and a Map of the key slice + Fork status for that key
25type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;
26
27// A map of keys recorded in each fork; used to serialize for snapshots easily.
28// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
29type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;
30
31// The statuses added during a slot, can be used to build on top of a status cache or to
32// construct a new one. Usually derived from a status cache's `SlotDeltaMap`
33pub type SlotDelta<T> = (Slot, bool, Status<T>);
34
35#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
36#[derive(Clone, Debug)]
37pub struct StatusCache<T: Serialize + Clone> {
38    cache: KeyStatusMap<T>,
39    roots: HashSet<Slot>,
40    /// all keys seen during a fork/slot
41    slot_deltas: SlotDeltaMap<T>,
42}
43
44impl<T: Serialize + Clone> Default for StatusCache<T> {
45    fn default() -> Self {
46        Self {
47            cache: HashMap::default(),
48            // 0 is always a root
49            roots: HashSet::from([0]),
50            slot_deltas: HashMap::default(),
51        }
52    }
53}
54
55impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
56    fn eq(&self, other: &Self) -> bool {
57        self.roots == other.roots
58            && self
59                .cache
60                .iter()
61                .all(|(hash, (slot, key_index, hash_map))| {
62                    if let Some((other_slot, other_key_index, other_hash_map)) =
63                        other.cache.get(hash)
64                    {
65                        if slot == other_slot && key_index == other_key_index {
66                            return hash_map.iter().all(|(slice, fork_map)| {
67                                if let Some(other_fork_map) = other_hash_map.get(slice) {
68                                    // all this work just to compare the highest forks in the fork map
69                                    // per entry
70                                    return fork_map.last() == other_fork_map.last();
71                                }
72                                false
73                            });
74                        }
75                    }
76                    false
77                })
78    }
79}
80
81impl<T: Serialize + Clone> StatusCache<T> {
82    pub fn clear_slot_entries(&mut self, slot: Slot) {
83        let slot_deltas = self.slot_deltas.remove(&slot);
84        if let Some(slot_deltas) = slot_deltas {
85            let slot_deltas = slot_deltas.lock().unwrap();
86            for (blockhash, (_, key_list)) in slot_deltas.iter() {
87                // Any blockhash that exists in self.slot_deltas must also exist
88                // in self.cache, because in self.purge_roots(), when an entry
89                // (b, (max_slot, _, _)) is removed from self.cache, this implies
90                // all entries in self.slot_deltas < max_slot are also removed
91                if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) {
92                    let (_, _, all_hash_maps) = o_blockhash_entries.get_mut();
93
94                    for (key_slice, _) in key_list {
95                        if let Entry::Occupied(mut o_key_list) = all_hash_maps.entry(*key_slice) {
96                            let key_list = o_key_list.get_mut();
97                            key_list.retain(|(updated_slot, _)| *updated_slot != slot);
98                            if key_list.is_empty() {
99                                o_key_list.remove_entry();
100                            }
101                        } else {
102                            panic!(
103                                "Map for key must exist if key exists in self.slot_deltas, slot: \
104                                 {slot}"
105                            )
106                        }
107                    }
108
109                    if all_hash_maps.is_empty() {
110                        o_blockhash_entries.remove_entry();
111                    }
112                } else {
113                    panic!("Blockhash must exist if it exists in self.slot_deltas, slot: {slot}")
114                }
115            }
116        }
117    }
118
119    /// Check if the key is in any of the forks in the ancestors set and
120    /// with a certain blockhash.
121    pub fn get_status<K: AsRef<[u8]>>(
122        &self,
123        key: K,
124        transaction_blockhash: &Hash,
125        ancestors: &Ancestors,
126    ) -> Option<(Slot, T)> {
127        let map = self.cache.get(transaction_blockhash)?;
128        let (_, index, keymap) = map;
129        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
130        let index = (*index).min(max_key_index);
131        let key_slice: &[u8; CACHED_KEY_SIZE] =
132            arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE];
133        if let Some(stored_forks) = keymap.get(key_slice) {
134            let res = stored_forks
135                .iter()
136                .find(|(f, _)| ancestors.contains_key(f) || self.roots.contains(f))
137                .cloned();
138            if res.is_some() {
139                return res;
140            }
141        }
142        None
143    }
144
145    /// Search for a key with any blockhash
146    /// Prefer get_status for performance reasons, it doesn't need
147    /// to search all blockhashes.
148    pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
149        &self,
150        key: K,
151        ancestors: &Ancestors,
152    ) -> Option<(Slot, T)> {
153        let keys: Vec<_> = self.cache.keys().copied().collect();
154
155        for blockhash in keys.iter() {
156            trace!("get_status_any_blockhash: trying {blockhash}");
157            let status = self.get_status(&key, blockhash, ancestors);
158            if status.is_some() {
159                return status;
160            }
161        }
162        None
163    }
164
165    /// Add a known root fork.  Roots are always valid ancestors.
166    /// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
167    pub fn add_root(&mut self, fork: Slot) {
168        self.roots.insert(fork);
169        self.purge_roots();
170    }
171
172    pub fn roots(&self) -> &HashSet<Slot> {
173        &self.roots
174    }
175
176    /// Insert a new key for a specific slot.
177    pub fn insert<K: AsRef<[u8]>>(
178        &mut self,
179        transaction_blockhash: &Hash,
180        key: K,
181        slot: Slot,
182        res: T,
183    ) {
184        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
185
186        // Get the cache entry for this blockhash.
187        let (max_slot, key_index, hash_map) =
188            self.cache.entry(*transaction_blockhash).or_insert_with(|| {
189                let key_index = thread_rng().gen_range(0..max_key_index + 1);
190                (slot, key_index, HashMap::new())
191            });
192
193        // Update the max slot observed to contain txs using this blockhash.
194        *max_slot = std::cmp::max(slot, *max_slot);
195
196        // Grab the key slice.
197        let key_index = (*key_index).min(max_key_index);
198        let mut key_slice = [0u8; CACHED_KEY_SIZE];
199        key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]);
200
201        // Insert the slot and tx result into the cache entry associated with
202        // this blockhash and keyslice.
203        let forks = hash_map.entry(key_slice).or_default();
204        forks.push((slot, res.clone()));
205
206        self.add_to_slot_delta(transaction_blockhash, slot, key_index, key_slice, res);
207    }
208
209    pub fn purge_roots(&mut self) {
210        if self.roots.len() > MAX_CACHE_ENTRIES {
211            if let Some(min) = self.roots.iter().min().cloned() {
212                self.roots.remove(&min);
213                self.cache.retain(|_, (fork, _, _)| *fork > min);
214                self.slot_deltas.retain(|slot, _| *slot > min);
215            }
216        }
217    }
218
219    /// Clear for testing
220    pub fn clear(&mut self) {
221        for v in self.cache.values_mut() {
222            v.2 = HashMap::new();
223        }
224
225        self.slot_deltas
226            .iter_mut()
227            .for_each(|(_, status)| status.lock().unwrap().clear());
228    }
229
230    /// Get the statuses for all the root slots
231    pub fn root_slot_deltas(&self) -> Vec<SlotDelta<T>> {
232        self.roots()
233            .iter()
234            .map(|root| {
235                (
236                    *root,
237                    true, // <-- is_root
238                    self.slot_deltas.get(root).cloned().unwrap_or_default(),
239                )
240            })
241            .collect()
242    }
243
244    // replay deltas into a status_cache allows "appending" data
245    pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
246        for (slot, is_root, statuses) in slot_deltas {
247            statuses
248                .lock()
249                .unwrap()
250                .iter()
251                .for_each(|(tx_hash, (key_index, statuses))| {
252                    for (key_slice, res) in statuses.iter() {
253                        self.insert_with_slice(tx_hash, *slot, *key_index, *key_slice, res.clone())
254                    }
255                });
256            if *is_root {
257                self.add_root(*slot);
258            }
259        }
260    }
261
262    pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
263        // play all deltas back into the status cache
264        let mut me = Self::default();
265        me.append(slot_deltas);
266        me
267    }
268
269    fn insert_with_slice(
270        &mut self,
271        transaction_blockhash: &Hash,
272        slot: Slot,
273        key_index: usize,
274        key_slice: [u8; CACHED_KEY_SIZE],
275        res: T,
276    ) {
277        let hash_map =
278            self.cache
279                .entry(*transaction_blockhash)
280                .or_insert((slot, key_index, HashMap::new()));
281        hash_map.0 = std::cmp::max(slot, hash_map.0);
282
283        let forks = hash_map.2.entry(key_slice).or_default();
284        forks.push((slot, res.clone()));
285
286        self.add_to_slot_delta(transaction_blockhash, slot, key_index, key_slice, res);
287    }
288
289    // Add this key slice to the list of key slices for this slot and blockhash
290    // combo.
291    fn add_to_slot_delta(
292        &mut self,
293        transaction_blockhash: &Hash,
294        slot: Slot,
295        key_index: usize,
296        key_slice: [u8; CACHED_KEY_SIZE],
297        res: T,
298    ) {
299        let mut fork_entry = self.slot_deltas.entry(slot).or_default().lock().unwrap();
300        let (_key_index, hash_entry) = fork_entry
301            .entry(*transaction_blockhash)
302            .or_insert((key_index, vec![]));
303        hash_entry.push((key_slice, res))
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use {super::*, solana_sha256_hasher::hash, solana_signature::Signature};
310
311    type BankStatusCache = StatusCache<()>;
312
313    #[test]
314    fn test_empty_has_no_sigs() {
315        let sig = Signature::default();
316        let blockhash = hash(Hash::default().as_ref());
317        let status_cache = BankStatusCache::default();
318        assert_eq!(
319            status_cache.get_status(sig, &blockhash, &Ancestors::default()),
320            None
321        );
322        assert_eq!(
323            status_cache.get_status_any_blockhash(sig, &Ancestors::default()),
324            None
325        );
326    }
327
328    #[test]
329    fn test_find_sig_with_ancestor_fork() {
330        let sig = Signature::default();
331        let mut status_cache = BankStatusCache::default();
332        let blockhash = hash(Hash::default().as_ref());
333        let ancestors = vec![(0, 1)].into_iter().collect();
334        status_cache.insert(&blockhash, sig, 0, ());
335        assert_eq!(
336            status_cache.get_status(sig, &blockhash, &ancestors),
337            Some((0, ()))
338        );
339        assert_eq!(
340            status_cache.get_status_any_blockhash(sig, &ancestors),
341            Some((0, ()))
342        );
343    }
344
345    #[test]
346    fn test_find_sig_without_ancestor_fork() {
347        let sig = Signature::default();
348        let mut status_cache = BankStatusCache::default();
349        let blockhash = hash(Hash::default().as_ref());
350        let ancestors = Ancestors::default();
351        status_cache.insert(&blockhash, sig, 1, ());
352        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
353        assert_eq!(status_cache.get_status_any_blockhash(sig, &ancestors), None);
354    }
355
356    #[test]
357    fn test_find_sig_with_root_ancestor_fork() {
358        let sig = Signature::default();
359        let mut status_cache = BankStatusCache::default();
360        let blockhash = hash(Hash::default().as_ref());
361        let ancestors = Ancestors::default();
362        status_cache.insert(&blockhash, sig, 0, ());
363        status_cache.add_root(0);
364        assert_eq!(
365            status_cache.get_status(sig, &blockhash, &ancestors),
366            Some((0, ()))
367        );
368    }
369
370    #[test]
371    fn test_insert_picks_latest_blockhash_fork() {
372        let sig = Signature::default();
373        let mut status_cache = BankStatusCache::default();
374        let blockhash = hash(Hash::default().as_ref());
375        let ancestors = vec![(0, 0)].into_iter().collect();
376        status_cache.insert(&blockhash, sig, 0, ());
377        status_cache.insert(&blockhash, sig, 1, ());
378        for i in 0..(MAX_CACHE_ENTRIES + 1) {
379            status_cache.add_root(i as u64);
380        }
381        assert!(status_cache
382            .get_status(sig, &blockhash, &ancestors)
383            .is_some());
384    }
385
386    #[test]
387    fn test_root_expires() {
388        let sig = Signature::default();
389        let mut status_cache = BankStatusCache::default();
390        let blockhash = hash(Hash::default().as_ref());
391        let ancestors = Ancestors::default();
392        status_cache.insert(&blockhash, sig, 0, ());
393        for i in 0..(MAX_CACHE_ENTRIES + 1) {
394            status_cache.add_root(i as u64);
395        }
396        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
397    }
398
399    #[test]
400    fn test_clear_signatures_sigs_are_gone() {
401        let sig = Signature::default();
402        let mut status_cache = BankStatusCache::default();
403        let blockhash = hash(Hash::default().as_ref());
404        let ancestors = Ancestors::default();
405        status_cache.insert(&blockhash, sig, 0, ());
406        status_cache.add_root(0);
407        status_cache.clear();
408        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
409    }
410
411    #[test]
412    fn test_clear_signatures_insert_works() {
413        let sig = Signature::default();
414        let mut status_cache = BankStatusCache::default();
415        let blockhash = hash(Hash::default().as_ref());
416        let ancestors = Ancestors::default();
417        status_cache.add_root(0);
418        status_cache.clear();
419        status_cache.insert(&blockhash, sig, 0, ());
420        assert!(status_cache
421            .get_status(sig, &blockhash, &ancestors)
422            .is_some());
423    }
424
425    #[test]
426    fn test_signatures_slice() {
427        let sig = Signature::default();
428        let mut status_cache = BankStatusCache::default();
429        let blockhash = hash(Hash::default().as_ref());
430        status_cache.clear();
431        status_cache.insert(&blockhash, sig, 0, ());
432        let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
433        let sig_slice: &[u8; CACHED_KEY_SIZE] =
434            arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE];
435        assert!(sig_map.get(sig_slice).is_some());
436    }
437
438    #[test]
439    fn test_slot_deltas() {
440        let sig = Signature::default();
441        let mut status_cache = BankStatusCache::default();
442        let blockhash = hash(Hash::default().as_ref());
443        status_cache.clear();
444        status_cache.insert(&blockhash, sig, 0, ());
445        assert!(status_cache.roots().contains(&0));
446        let slot_deltas = status_cache.root_slot_deltas();
447        let cache = StatusCache::from_slot_deltas(&slot_deltas);
448        assert_eq!(cache, status_cache);
449        let slot_deltas = cache.root_slot_deltas();
450        let cache = StatusCache::from_slot_deltas(&slot_deltas);
451        assert_eq!(cache, status_cache);
452    }
453
454    #[test]
455    fn test_roots_deltas() {
456        let sig = Signature::default();
457        let mut status_cache = BankStatusCache::default();
458        let blockhash = hash(Hash::default().as_ref());
459        let blockhash2 = hash(blockhash.as_ref());
460        status_cache.insert(&blockhash, sig, 0, ());
461        status_cache.insert(&blockhash, sig, 1, ());
462        status_cache.insert(&blockhash2, sig, 1, ());
463        for i in 0..(MAX_CACHE_ENTRIES + 1) {
464            status_cache.add_root(i as u64);
465        }
466        assert_eq!(status_cache.slot_deltas.len(), 1);
467        assert!(status_cache.slot_deltas.contains_key(&1));
468        let slot_deltas = status_cache.root_slot_deltas();
469        let cache = StatusCache::from_slot_deltas(&slot_deltas);
470        assert_eq!(cache, status_cache);
471    }
472
473    #[test]
474    #[allow(clippy::assertions_on_constants)]
475    fn test_age_sanity() {
476        assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
477    }
478
479    #[test]
480    fn test_clear_slot_signatures() {
481        let sig = Signature::default();
482        let mut status_cache = BankStatusCache::default();
483        let blockhash = hash(Hash::default().as_ref());
484        let blockhash2 = hash(blockhash.as_ref());
485        status_cache.insert(&blockhash, sig, 0, ());
486        status_cache.insert(&blockhash, sig, 1, ());
487        status_cache.insert(&blockhash2, sig, 1, ());
488
489        let mut ancestors0 = Ancestors::default();
490        ancestors0.insert(0, 0);
491        let mut ancestors1 = Ancestors::default();
492        ancestors1.insert(1, 0);
493
494        // Clear slot 0 related data
495        assert!(status_cache
496            .get_status(sig, &blockhash, &ancestors0)
497            .is_some());
498        status_cache.clear_slot_entries(0);
499        assert!(status_cache
500            .get_status(sig, &blockhash, &ancestors0)
501            .is_none());
502        assert!(status_cache
503            .get_status(sig, &blockhash, &ancestors1)
504            .is_some());
505        assert!(status_cache
506            .get_status(sig, &blockhash2, &ancestors1)
507            .is_some());
508
509        // Check that the slot delta for slot 0 is gone, but slot 1 still
510        // exists
511        assert!(!status_cache.slot_deltas.contains_key(&0));
512        assert!(status_cache.slot_deltas.contains_key(&1));
513
514        // Clear slot 1 related data
515        status_cache.clear_slot_entries(1);
516        assert!(status_cache.slot_deltas.is_empty());
517        assert!(status_cache
518            .get_status(sig, &blockhash, &ancestors1)
519            .is_none());
520        assert!(status_cache
521            .get_status(sig, &blockhash2, &ancestors1)
522            .is_none());
523        assert!(status_cache.cache.is_empty());
524    }
525
526    // Status cache uses a random key offset for each blockhash. Ensure that shorter
527    // keys can still be used if the offset if greater than the key length.
528    #[test]
529    fn test_different_sized_keys() {
530        let mut status_cache = BankStatusCache::default();
531        let ancestors = vec![(0, 0)].into_iter().collect();
532        let blockhash = Hash::default();
533        for _ in 0..100 {
534            let blockhash = hash(blockhash.as_ref());
535            let sig_key = Signature::default();
536            let hash_key = Hash::new_unique();
537            status_cache.insert(&blockhash, sig_key, 0, ());
538            status_cache.insert(&blockhash, hash_key, 0, ());
539            assert!(status_cache
540                .get_status(sig_key, &blockhash, &ancestors)
541                .is_some());
542            assert!(status_cache
543                .get_status(hash_key, &blockhash, &ancestors)
544                .is_some());
545        }
546    }
547}