solana_runtime/
status_cache.rs

1use {
2    log::*,
3    rand::{thread_rng, Rng},
4    serde::Serialize,
5    solana_accounts_db::ancestors::Ancestors,
6    solana_clock::{Slot, MAX_RECENT_BLOCKHASHES},
7    solana_hash::Hash,
8    std::{
9        collections::{hash_map::Entry, HashMap, HashSet},
10        sync::{Arc, Mutex},
11    },
12};
13
14pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
15const CACHED_KEY_SIZE: usize = 20;
16
17// Store forks in a single chunk of memory to avoid another lookup.
18pub type ForkStatus<T> = Vec<(Slot, T)>;
19type KeySlice = [u8; CACHED_KEY_SIZE];
20type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;
21// Map of Hash and status
22pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;
23// A Map of hash + the highest fork it's been observed on along with
24// the key offset and a Map of the key slice + Fork status for that key
25type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;
26
27// A map of keys recorded in each fork; used to serialize for snapshots easily.
28// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
29type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;
30
31// The statuses added during a slot, can be used to build on top of a status cache or to
32// construct a new one. Usually derived from a status cache's `SlotDeltaMap`
33pub type SlotDelta<T> = (Slot, bool, Status<T>);
34
35#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
36#[derive(Clone, Debug)]
37pub struct StatusCache<T: Serialize + Clone> {
38    cache: KeyStatusMap<T>,
39    roots: HashSet<Slot>,
40    /// all keys seen during a fork/slot
41    slot_deltas: SlotDeltaMap<T>,
42}
43
44impl<T: Serialize + Clone> Default for StatusCache<T> {
45    fn default() -> Self {
46        Self {
47            cache: HashMap::default(),
48            // 0 is always a root
49            roots: HashSet::from([0]),
50            slot_deltas: HashMap::default(),
51        }
52    }
53}
54
55impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
56    fn eq(&self, other: &Self) -> bool {
57        self.roots == other.roots
58            && self
59                .cache
60                .iter()
61                .all(|(hash, (slot, key_index, hash_map))| {
62                    if let Some((other_slot, other_key_index, other_hash_map)) =
63                        other.cache.get(hash)
64                    {
65                        if slot == other_slot && key_index == other_key_index {
66                            return hash_map.iter().all(|(slice, fork_map)| {
67                                if let Some(other_fork_map) = other_hash_map.get(slice) {
68                                    // all this work just to compare the highest forks in the fork map
69                                    // per entry
70                                    return fork_map.last() == other_fork_map.last();
71                                }
72                                false
73                            });
74                        }
75                    }
76                    false
77                })
78    }
79}
80
81impl<T: Serialize + Clone> StatusCache<T> {
82    pub fn clear_slot_entries(&mut self, slot: Slot) {
83        let slot_deltas = self.slot_deltas.remove(&slot);
84        if let Some(slot_deltas) = slot_deltas {
85            let slot_deltas = slot_deltas.lock().unwrap();
86            for (blockhash, (_, key_list)) in slot_deltas.iter() {
87                // Any blockhash that exists in self.slot_deltas must also exist
88                // in self.cache, because in self.purge_roots(), when an entry
89                // (b, (max_slot, _, _)) is removed from self.cache, this implies
90                // all entries in self.slot_deltas < max_slot are also removed
91                if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) {
92                    let (_, _, all_hash_maps) = o_blockhash_entries.get_mut();
93
94                    for (key_slice, _) in key_list {
95                        if let Entry::Occupied(mut o_key_list) = all_hash_maps.entry(*key_slice) {
96                            let key_list = o_key_list.get_mut();
97                            key_list.retain(|(updated_slot, _)| *updated_slot != slot);
98                            if key_list.is_empty() {
99                                o_key_list.remove_entry();
100                            }
101                        } else {
102                            panic!(
103                                "Map for key must exist if key exists in self.slot_deltas, slot: {slot}"
104                            )
105                        }
106                    }
107
108                    if all_hash_maps.is_empty() {
109                        o_blockhash_entries.remove_entry();
110                    }
111                } else {
112                    panic!("Blockhash must exist if it exists in self.slot_deltas, slot: {slot}")
113                }
114            }
115        }
116    }
117
118    /// Check if the key is in any of the forks in the ancestors set and
119    /// with a certain blockhash.
120    pub fn get_status<K: AsRef<[u8]>>(
121        &self,
122        key: K,
123        transaction_blockhash: &Hash,
124        ancestors: &Ancestors,
125    ) -> Option<(Slot, T)> {
126        let map = self.cache.get(transaction_blockhash)?;
127        let (_, index, keymap) = map;
128        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
129        let index = (*index).min(max_key_index);
130        let key_slice: &[u8; CACHED_KEY_SIZE] =
131            arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE];
132        if let Some(stored_forks) = keymap.get(key_slice) {
133            let res = stored_forks
134                .iter()
135                .find(|(f, _)| ancestors.contains_key(f) || self.roots.contains(f))
136                .cloned();
137            if res.is_some() {
138                return res;
139            }
140        }
141        None
142    }
143
144    /// Search for a key with any blockhash
145    /// Prefer get_status for performance reasons, it doesn't need
146    /// to search all blockhashes.
147    pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
148        &self,
149        key: K,
150        ancestors: &Ancestors,
151    ) -> Option<(Slot, T)> {
152        let keys: Vec<_> = self.cache.keys().copied().collect();
153
154        for blockhash in keys.iter() {
155            trace!("get_status_any_blockhash: trying {}", blockhash);
156            let status = self.get_status(&key, blockhash, ancestors);
157            if status.is_some() {
158                return status;
159            }
160        }
161        None
162    }
163
164    /// Add a known root fork.  Roots are always valid ancestors.
165    /// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
166    pub fn add_root(&mut self, fork: Slot) {
167        self.roots.insert(fork);
168        self.purge_roots();
169    }
170
171    pub fn roots(&self) -> &HashSet<Slot> {
172        &self.roots
173    }
174
175    /// Insert a new key for a specific slot.
176    pub fn insert<K: AsRef<[u8]>>(
177        &mut self,
178        transaction_blockhash: &Hash,
179        key: K,
180        slot: Slot,
181        res: T,
182    ) {
183        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
184
185        // Get the cache entry for this blockhash.
186        let (max_slot, key_index, hash_map) =
187            self.cache.entry(*transaction_blockhash).or_insert_with(|| {
188                let key_index = thread_rng().gen_range(0..max_key_index + 1);
189                (slot, key_index, HashMap::new())
190            });
191
192        // Update the max slot observed to contain txs using this blockhash.
193        *max_slot = std::cmp::max(slot, *max_slot);
194
195        // Grab the key slice.
196        let key_index = (*key_index).min(max_key_index);
197        let mut key_slice = [0u8; CACHED_KEY_SIZE];
198        key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]);
199
200        // Insert the slot and tx result into the cache entry associated with
201        // this blockhash and keyslice.
202        let forks = hash_map.entry(key_slice).or_default();
203        forks.push((slot, res.clone()));
204
205        self.add_to_slot_delta(transaction_blockhash, slot, key_index, key_slice, res);
206    }
207
208    pub fn purge_roots(&mut self) {
209        if self.roots.len() > MAX_CACHE_ENTRIES {
210            if let Some(min) = self.roots.iter().min().cloned() {
211                self.roots.remove(&min);
212                self.cache.retain(|_, (fork, _, _)| *fork > min);
213                self.slot_deltas.retain(|slot, _| *slot > min);
214            }
215        }
216    }
217
218    /// Clear for testing
219    pub fn clear(&mut self) {
220        for v in self.cache.values_mut() {
221            v.2 = HashMap::new();
222        }
223
224        self.slot_deltas
225            .iter_mut()
226            .for_each(|(_, status)| status.lock().unwrap().clear());
227    }
228
229    /// Get the statuses for all the root slots
230    pub fn root_slot_deltas(&self) -> Vec<SlotDelta<T>> {
231        self.roots()
232            .iter()
233            .map(|root| {
234                (
235                    *root,
236                    true, // <-- is_root
237                    self.slot_deltas.get(root).cloned().unwrap_or_default(),
238                )
239            })
240            .collect()
241    }
242
243    // replay deltas into a status_cache allows "appending" data
244    pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
245        for (slot, is_root, statuses) in slot_deltas {
246            statuses
247                .lock()
248                .unwrap()
249                .iter()
250                .for_each(|(tx_hash, (key_index, statuses))| {
251                    for (key_slice, res) in statuses.iter() {
252                        self.insert_with_slice(tx_hash, *slot, *key_index, *key_slice, res.clone())
253                    }
254                });
255            if *is_root {
256                self.add_root(*slot);
257            }
258        }
259    }
260
261    pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
262        // play all deltas back into the status cache
263        let mut me = Self::default();
264        me.append(slot_deltas);
265        me
266    }
267
268    fn insert_with_slice(
269        &mut self,
270        transaction_blockhash: &Hash,
271        slot: Slot,
272        key_index: usize,
273        key_slice: [u8; CACHED_KEY_SIZE],
274        res: T,
275    ) {
276        let hash_map =
277            self.cache
278                .entry(*transaction_blockhash)
279                .or_insert((slot, key_index, HashMap::new()));
280        hash_map.0 = std::cmp::max(slot, hash_map.0);
281
282        let forks = hash_map.2.entry(key_slice).or_default();
283        forks.push((slot, res.clone()));
284
285        self.add_to_slot_delta(transaction_blockhash, slot, key_index, key_slice, res);
286    }
287
288    // Add this key slice to the list of key slices for this slot and blockhash
289    // combo.
290    fn add_to_slot_delta(
291        &mut self,
292        transaction_blockhash: &Hash,
293        slot: Slot,
294        key_index: usize,
295        key_slice: [u8; CACHED_KEY_SIZE],
296        res: T,
297    ) {
298        let mut fork_entry = self.slot_deltas.entry(slot).or_default().lock().unwrap();
299        let (_key_index, hash_entry) = fork_entry
300            .entry(*transaction_blockhash)
301            .or_insert((key_index, vec![]));
302        hash_entry.push((key_slice, res))
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use {super::*, solana_sha256_hasher::hash, solana_signature::Signature};
309
310    type BankStatusCache = StatusCache<()>;
311
312    #[test]
313    fn test_empty_has_no_sigs() {
314        let sig = Signature::default();
315        let blockhash = hash(Hash::default().as_ref());
316        let status_cache = BankStatusCache::default();
317        assert_eq!(
318            status_cache.get_status(sig, &blockhash, &Ancestors::default()),
319            None
320        );
321        assert_eq!(
322            status_cache.get_status_any_blockhash(sig, &Ancestors::default()),
323            None
324        );
325    }
326
327    #[test]
328    fn test_find_sig_with_ancestor_fork() {
329        let sig = Signature::default();
330        let mut status_cache = BankStatusCache::default();
331        let blockhash = hash(Hash::default().as_ref());
332        let ancestors = vec![(0, 1)].into_iter().collect();
333        status_cache.insert(&blockhash, sig, 0, ());
334        assert_eq!(
335            status_cache.get_status(sig, &blockhash, &ancestors),
336            Some((0, ()))
337        );
338        assert_eq!(
339            status_cache.get_status_any_blockhash(sig, &ancestors),
340            Some((0, ()))
341        );
342    }
343
344    #[test]
345    fn test_find_sig_without_ancestor_fork() {
346        let sig = Signature::default();
347        let mut status_cache = BankStatusCache::default();
348        let blockhash = hash(Hash::default().as_ref());
349        let ancestors = Ancestors::default();
350        status_cache.insert(&blockhash, sig, 1, ());
351        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
352        assert_eq!(status_cache.get_status_any_blockhash(sig, &ancestors), None);
353    }
354
355    #[test]
356    fn test_find_sig_with_root_ancestor_fork() {
357        let sig = Signature::default();
358        let mut status_cache = BankStatusCache::default();
359        let blockhash = hash(Hash::default().as_ref());
360        let ancestors = Ancestors::default();
361        status_cache.insert(&blockhash, sig, 0, ());
362        status_cache.add_root(0);
363        assert_eq!(
364            status_cache.get_status(sig, &blockhash, &ancestors),
365            Some((0, ()))
366        );
367    }
368
369    #[test]
370    fn test_insert_picks_latest_blockhash_fork() {
371        let sig = Signature::default();
372        let mut status_cache = BankStatusCache::default();
373        let blockhash = hash(Hash::default().as_ref());
374        let ancestors = vec![(0, 0)].into_iter().collect();
375        status_cache.insert(&blockhash, sig, 0, ());
376        status_cache.insert(&blockhash, sig, 1, ());
377        for i in 0..(MAX_CACHE_ENTRIES + 1) {
378            status_cache.add_root(i as u64);
379        }
380        assert!(status_cache
381            .get_status(sig, &blockhash, &ancestors)
382            .is_some());
383    }
384
385    #[test]
386    fn test_root_expires() {
387        let sig = Signature::default();
388        let mut status_cache = BankStatusCache::default();
389        let blockhash = hash(Hash::default().as_ref());
390        let ancestors = Ancestors::default();
391        status_cache.insert(&blockhash, sig, 0, ());
392        for i in 0..(MAX_CACHE_ENTRIES + 1) {
393            status_cache.add_root(i as u64);
394        }
395        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
396    }
397
398    #[test]
399    fn test_clear_signatures_sigs_are_gone() {
400        let sig = Signature::default();
401        let mut status_cache = BankStatusCache::default();
402        let blockhash = hash(Hash::default().as_ref());
403        let ancestors = Ancestors::default();
404        status_cache.insert(&blockhash, sig, 0, ());
405        status_cache.add_root(0);
406        status_cache.clear();
407        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
408    }
409
410    #[test]
411    fn test_clear_signatures_insert_works() {
412        let sig = Signature::default();
413        let mut status_cache = BankStatusCache::default();
414        let blockhash = hash(Hash::default().as_ref());
415        let ancestors = Ancestors::default();
416        status_cache.add_root(0);
417        status_cache.clear();
418        status_cache.insert(&blockhash, sig, 0, ());
419        assert!(status_cache
420            .get_status(sig, &blockhash, &ancestors)
421            .is_some());
422    }
423
424    #[test]
425    fn test_signatures_slice() {
426        let sig = Signature::default();
427        let mut status_cache = BankStatusCache::default();
428        let blockhash = hash(Hash::default().as_ref());
429        status_cache.clear();
430        status_cache.insert(&blockhash, sig, 0, ());
431        let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
432        let sig_slice: &[u8; CACHED_KEY_SIZE] =
433            arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE];
434        assert!(sig_map.get(sig_slice).is_some());
435    }
436
437    #[test]
438    fn test_slot_deltas() {
439        let sig = Signature::default();
440        let mut status_cache = BankStatusCache::default();
441        let blockhash = hash(Hash::default().as_ref());
442        status_cache.clear();
443        status_cache.insert(&blockhash, sig, 0, ());
444        assert!(status_cache.roots().contains(&0));
445        let slot_deltas = status_cache.root_slot_deltas();
446        let cache = StatusCache::from_slot_deltas(&slot_deltas);
447        assert_eq!(cache, status_cache);
448        let slot_deltas = cache.root_slot_deltas();
449        let cache = StatusCache::from_slot_deltas(&slot_deltas);
450        assert_eq!(cache, status_cache);
451    }
452
453    #[test]
454    fn test_roots_deltas() {
455        let sig = Signature::default();
456        let mut status_cache = BankStatusCache::default();
457        let blockhash = hash(Hash::default().as_ref());
458        let blockhash2 = hash(blockhash.as_ref());
459        status_cache.insert(&blockhash, sig, 0, ());
460        status_cache.insert(&blockhash, sig, 1, ());
461        status_cache.insert(&blockhash2, sig, 1, ());
462        for i in 0..(MAX_CACHE_ENTRIES + 1) {
463            status_cache.add_root(i as u64);
464        }
465        assert_eq!(status_cache.slot_deltas.len(), 1);
466        assert!(status_cache.slot_deltas.contains_key(&1));
467        let slot_deltas = status_cache.root_slot_deltas();
468        let cache = StatusCache::from_slot_deltas(&slot_deltas);
469        assert_eq!(cache, status_cache);
470    }
471
472    #[test]
473    #[allow(clippy::assertions_on_constants)]
474    fn test_age_sanity() {
475        assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
476    }
477
478    #[test]
479    fn test_clear_slot_signatures() {
480        let sig = Signature::default();
481        let mut status_cache = BankStatusCache::default();
482        let blockhash = hash(Hash::default().as_ref());
483        let blockhash2 = hash(blockhash.as_ref());
484        status_cache.insert(&blockhash, sig, 0, ());
485        status_cache.insert(&blockhash, sig, 1, ());
486        status_cache.insert(&blockhash2, sig, 1, ());
487
488        let mut ancestors0 = Ancestors::default();
489        ancestors0.insert(0, 0);
490        let mut ancestors1 = Ancestors::default();
491        ancestors1.insert(1, 0);
492
493        // Clear slot 0 related data
494        assert!(status_cache
495            .get_status(sig, &blockhash, &ancestors0)
496            .is_some());
497        status_cache.clear_slot_entries(0);
498        assert!(status_cache
499            .get_status(sig, &blockhash, &ancestors0)
500            .is_none());
501        assert!(status_cache
502            .get_status(sig, &blockhash, &ancestors1)
503            .is_some());
504        assert!(status_cache
505            .get_status(sig, &blockhash2, &ancestors1)
506            .is_some());
507
508        // Check that the slot delta for slot 0 is gone, but slot 1 still
509        // exists
510        assert!(!status_cache.slot_deltas.contains_key(&0));
511        assert!(status_cache.slot_deltas.contains_key(&1));
512
513        // Clear slot 1 related data
514        status_cache.clear_slot_entries(1);
515        assert!(status_cache.slot_deltas.is_empty());
516        assert!(status_cache
517            .get_status(sig, &blockhash, &ancestors1)
518            .is_none());
519        assert!(status_cache
520            .get_status(sig, &blockhash2, &ancestors1)
521            .is_none());
522        assert!(status_cache.cache.is_empty());
523    }
524
525    // Status cache uses a random key offset for each blockhash. Ensure that shorter
526    // keys can still be used if the offset if greater than the key length.
527    #[test]
528    fn test_different_sized_keys() {
529        let mut status_cache = BankStatusCache::default();
530        let ancestors = vec![(0, 0)].into_iter().collect();
531        let blockhash = Hash::default();
532        for _ in 0..100 {
533            let blockhash = hash(blockhash.as_ref());
534            let sig_key = Signature::default();
535            let hash_key = Hash::new_unique();
536            status_cache.insert(&blockhash, sig_key, 0, ());
537            status_cache.insert(&blockhash, hash_key, 0, ());
538            assert!(status_cache
539                .get_status(sig_key, &blockhash, &ancestors)
540                .is_some());
541            assert!(status_cache
542                .get_status(hash_key, &blockhash, &ancestors)
543                .is_some());
544        }
545    }
546}