solana_runtime/
status_cache.rs

1use {
2    crate::ancestors::Ancestors,
3    log::*,
4    rand::{thread_rng, Rng},
5    serde::Serialize,
6    solana_sdk::{
7        clock::{Slot, MAX_RECENT_BLOCKHASHES},
8        hash::Hash,
9    },
10    std::{
11        collections::{hash_map::Entry, HashMap, HashSet},
12        sync::{Arc, Mutex},
13    },
14};
15
16pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
17const CACHED_KEY_SIZE: usize = 20;
18
19// Store forks in a single chunk of memory to avoid another lookup.
20pub type ForkStatus<T> = Vec<(Slot, T)>;
21type KeySlice = [u8; CACHED_KEY_SIZE];
22type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;
23// Map of Hash and status
24pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;
25// A Map of hash + the highest fork it's been observed on along with
26// the key offset and a Map of the key slice + Fork status for that key
27type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;
28
29// A map of keys recorded in each fork; used to serialize for snapshots easily.
30// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
31type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;
32
33// The statuses added during a slot, can be used to build on top of a status cache or to
34// construct a new one. Usually derived from a status cache's `SlotDeltaMap`
35pub type SlotDelta<T> = (Slot, bool, Status<T>);
36
37#[derive(Clone, Debug, AbiExample)]
38pub struct StatusCache<T: Serialize + Clone> {
39    cache: KeyStatusMap<T>,
40    roots: HashSet<Slot>,
41    /// all keys seen during a fork/slot
42    slot_deltas: SlotDeltaMap<T>,
43}
44
45impl<T: Serialize + Clone> Default for StatusCache<T> {
46    fn default() -> Self {
47        Self {
48            cache: HashMap::default(),
49            // 0 is always a root
50            roots: HashSet::from([0]),
51            slot_deltas: HashMap::default(),
52        }
53    }
54}
55
56impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
57    fn eq(&self, other: &Self) -> bool {
58        self.roots == other.roots
59            && self
60                .cache
61                .iter()
62                .all(|(hash, (slot, key_index, hash_map))| {
63                    if let Some((other_slot, other_key_index, other_hash_map)) =
64                        other.cache.get(hash)
65                    {
66                        if slot == other_slot && key_index == other_key_index {
67                            return hash_map.iter().all(|(slice, fork_map)| {
68                                if let Some(other_fork_map) = other_hash_map.get(slice) {
69                                    // all this work just to compare the highest forks in the fork map
70                                    // per entry
71                                    return fork_map.last() == other_fork_map.last();
72                                }
73                                false
74                            });
75                        }
76                    }
77                    false
78                })
79    }
80}
81
82impl<T: Serialize + Clone> StatusCache<T> {
83    pub fn clear_slot_entries(&mut self, slot: Slot) {
84        let slot_deltas = self.slot_deltas.remove(&slot);
85        if let Some(slot_deltas) = slot_deltas {
86            let slot_deltas = slot_deltas.lock().unwrap();
87            for (blockhash, (_, key_list)) in slot_deltas.iter() {
88                // Any blockhash that exists in self.slot_deltas must also exist
89                // in self.cache, because in self.purge_roots(), when an entry
90                // (b, (max_slot, _, _)) is removed from self.cache, this implies
91                // all entries in self.slot_deltas < max_slot are also removed
92                if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) {
93                    let (_, _, all_hash_maps) = o_blockhash_entries.get_mut();
94
95                    for (key_slice, _) in key_list {
96                        if let Entry::Occupied(mut o_key_list) = all_hash_maps.entry(*key_slice) {
97                            let key_list = o_key_list.get_mut();
98                            key_list.retain(|(updated_slot, _)| *updated_slot != slot);
99                            if key_list.is_empty() {
100                                o_key_list.remove_entry();
101                            }
102                        } else {
103                            panic!(
104                                "Map for key must exist if key exists in self.slot_deltas, slot: {}",
105                                slot
106                            )
107                        }
108                    }
109
110                    if all_hash_maps.is_empty() {
111                        o_blockhash_entries.remove_entry();
112                    }
113                } else {
114                    panic!(
115                        "Blockhash must exist if it exists in self.slot_deltas, slot: {}",
116                        slot
117                    )
118                }
119            }
120        }
121    }
122
123    /// Check if the key is in any of the forks in the ancestors set and
124    /// with a certain blockhash.
125    pub fn get_status<K: AsRef<[u8]>>(
126        &self,
127        key: K,
128        transaction_blockhash: &Hash,
129        ancestors: &Ancestors,
130    ) -> Option<(Slot, T)> {
131        let map = self.cache.get(transaction_blockhash)?;
132        let (_, index, keymap) = map;
133        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
134        let index = (*index).min(max_key_index);
135        let key_slice: &[u8; CACHED_KEY_SIZE] =
136            arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE];
137        if let Some(stored_forks) = keymap.get(key_slice) {
138            let res = stored_forks
139                .iter()
140                .find(|(f, _)| ancestors.get(f) || self.roots.get(f).is_some())
141                .cloned();
142            if res.is_some() {
143                return res;
144            }
145        }
146        None
147    }
148
149    /// Search for a key with any blockhash
150    /// Prefer get_status for performance reasons, it doesn't need
151    /// to search all blockhashes.
152    pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
153        &self,
154        key: K,
155        ancestors: &Ancestors,
156    ) -> Option<(Slot, T)> {
157        let keys: Vec<_> = self.cache.keys().copied().collect();
158
159        for blockhash in keys.iter() {
160            trace!("get_status_any_blockhash: trying {}", blockhash);
161            let status = self.get_status(&key, blockhash, ancestors);
162            if status.is_some() {
163                return status;
164            }
165        }
166        None
167    }
168
169    /// Add a known root fork.  Roots are always valid ancestors.
170    /// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
171    pub fn add_root(&mut self, fork: Slot) {
172        self.roots.insert(fork);
173        self.purge_roots();
174    }
175
176    pub fn roots(&self) -> &HashSet<Slot> {
177        &self.roots
178    }
179
180    /// Insert a new key for a specific slot.
181    pub fn insert<K: AsRef<[u8]>>(
182        &mut self,
183        transaction_blockhash: &Hash,
184        key: K,
185        slot: Slot,
186        res: T,
187    ) {
188        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
189        let hash_map = self.cache.entry(*transaction_blockhash).or_insert_with(|| {
190            let key_index = thread_rng().gen_range(0, max_key_index + 1);
191            (slot, key_index, HashMap::new())
192        });
193
194        hash_map.0 = std::cmp::max(slot, hash_map.0);
195        let key_index = hash_map.1.min(max_key_index);
196        let mut key_slice = [0u8; CACHED_KEY_SIZE];
197        key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]);
198        self.insert_with_slice(transaction_blockhash, slot, key_index, key_slice, res);
199    }
200
201    pub fn purge_roots(&mut self) {
202        if self.roots.len() > MAX_CACHE_ENTRIES {
203            if let Some(min) = self.roots.iter().min().cloned() {
204                self.roots.remove(&min);
205                self.cache.retain(|_, (fork, _, _)| *fork > min);
206                self.slot_deltas.retain(|slot, _| *slot > min);
207            }
208        }
209    }
210
211    /// Clear for testing
212    pub fn clear(&mut self) {
213        for v in self.cache.values_mut() {
214            v.2 = HashMap::new();
215        }
216
217        self.slot_deltas
218            .iter_mut()
219            .for_each(|(_, status)| status.lock().unwrap().clear());
220    }
221
222    // returns the statuses for each slot in the slots provided
223    pub fn slot_deltas(&self, slots: &[Slot]) -> Vec<SlotDelta<T>> {
224        let empty = Arc::new(Mutex::new(HashMap::new()));
225        slots
226            .iter()
227            .map(|slot| {
228                (
229                    *slot,
230                    self.roots.contains(slot),
231                    Arc::clone(self.slot_deltas.get(slot).unwrap_or(&empty)),
232                )
233            })
234            .collect()
235    }
236
237    /// Get the statuses for all the root slots
238    pub fn root_slot_deltas(&self) -> Vec<SlotDelta<T>> {
239        self.roots
240            .iter()
241            .map(|slot| {
242                (
243                    *slot,
244                    true,
245                    self.slot_deltas.get(slot).cloned().unwrap_or_default(),
246                )
247            })
248            .collect()
249    }
250
251    // replay deltas into a status_cache allows "appending" data
252    pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
253        for (slot, is_root, statuses) in slot_deltas {
254            statuses
255                .lock()
256                .unwrap()
257                .iter()
258                .for_each(|(tx_hash, (key_index, statuses))| {
259                    for (key_slice, res) in statuses.iter() {
260                        self.insert_with_slice(tx_hash, *slot, *key_index, *key_slice, res.clone())
261                    }
262                });
263            if *is_root {
264                self.add_root(*slot);
265            }
266        }
267    }
268
269    pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
270        // play all deltas back into the status cache
271        let mut me = Self::default();
272        me.append(slot_deltas);
273        me
274    }
275
276    fn insert_with_slice(
277        &mut self,
278        transaction_blockhash: &Hash,
279        slot: Slot,
280        key_index: usize,
281        key_slice: [u8; CACHED_KEY_SIZE],
282        res: T,
283    ) {
284        let hash_map =
285            self.cache
286                .entry(*transaction_blockhash)
287                .or_insert((slot, key_index, HashMap::new()));
288        hash_map.0 = std::cmp::max(slot, hash_map.0);
289
290        let forks = hash_map.2.entry(key_slice).or_insert_with(Vec::new);
291        forks.push((slot, res.clone()));
292        let slot_deltas = self.slot_deltas.entry(slot).or_default();
293        let mut fork_entry = slot_deltas.lock().unwrap();
294        let (_, hash_entry) = fork_entry
295            .entry(*transaction_blockhash)
296            .or_insert((key_index, vec![]));
297        hash_entry.push((key_slice, res))
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use {
304        super::*,
305        solana_sdk::{hash::hash, signature::Signature},
306    };
307
308    type BankStatusCache = StatusCache<()>;
309
310    #[test]
311    fn test_empty_has_no_sigs() {
312        let sig = Signature::default();
313        let blockhash = hash(Hash::default().as_ref());
314        let status_cache = BankStatusCache::default();
315        assert_eq!(
316            status_cache.get_status(sig, &blockhash, &Ancestors::default()),
317            None
318        );
319        assert_eq!(
320            status_cache.get_status_any_blockhash(sig, &Ancestors::default()),
321            None
322        );
323    }
324
325    #[test]
326    fn test_find_sig_with_ancestor_fork() {
327        let sig = Signature::default();
328        let mut status_cache = BankStatusCache::default();
329        let blockhash = hash(Hash::default().as_ref());
330        let ancestors = vec![(0, 1)].into_iter().collect();
331        status_cache.insert(&blockhash, sig, 0, ());
332        assert_eq!(
333            status_cache.get_status(sig, &blockhash, &ancestors),
334            Some((0, ()))
335        );
336        assert_eq!(
337            status_cache.get_status_any_blockhash(sig, &ancestors),
338            Some((0, ()))
339        );
340    }
341
342    #[test]
343    fn test_find_sig_without_ancestor_fork() {
344        let sig = Signature::default();
345        let mut status_cache = BankStatusCache::default();
346        let blockhash = hash(Hash::default().as_ref());
347        let ancestors = Ancestors::default();
348        status_cache.insert(&blockhash, sig, 1, ());
349        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
350        assert_eq!(status_cache.get_status_any_blockhash(sig, &ancestors), None);
351    }
352
353    #[test]
354    fn test_find_sig_with_root_ancestor_fork() {
355        let sig = Signature::default();
356        let mut status_cache = BankStatusCache::default();
357        let blockhash = hash(Hash::default().as_ref());
358        let ancestors = Ancestors::default();
359        status_cache.insert(&blockhash, sig, 0, ());
360        status_cache.add_root(0);
361        assert_eq!(
362            status_cache.get_status(sig, &blockhash, &ancestors),
363            Some((0, ()))
364        );
365    }
366
367    #[test]
368    fn test_insert_picks_latest_blockhash_fork() {
369        let sig = Signature::default();
370        let mut status_cache = BankStatusCache::default();
371        let blockhash = hash(Hash::default().as_ref());
372        let ancestors = vec![(0, 0)].into_iter().collect();
373        status_cache.insert(&blockhash, sig, 0, ());
374        status_cache.insert(&blockhash, sig, 1, ());
375        for i in 0..(MAX_CACHE_ENTRIES + 1) {
376            status_cache.add_root(i as u64);
377        }
378        assert!(status_cache
379            .get_status(sig, &blockhash, &ancestors)
380            .is_some());
381    }
382
383    #[test]
384    fn test_root_expires() {
385        let sig = Signature::default();
386        let mut status_cache = BankStatusCache::default();
387        let blockhash = hash(Hash::default().as_ref());
388        let ancestors = Ancestors::default();
389        status_cache.insert(&blockhash, sig, 0, ());
390        for i in 0..(MAX_CACHE_ENTRIES + 1) {
391            status_cache.add_root(i as u64);
392        }
393        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
394    }
395
396    #[test]
397    fn test_clear_signatures_sigs_are_gone() {
398        let sig = Signature::default();
399        let mut status_cache = BankStatusCache::default();
400        let blockhash = hash(Hash::default().as_ref());
401        let ancestors = Ancestors::default();
402        status_cache.insert(&blockhash, sig, 0, ());
403        status_cache.add_root(0);
404        status_cache.clear();
405        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
406    }
407
408    #[test]
409    fn test_clear_signatures_insert_works() {
410        let sig = Signature::default();
411        let mut status_cache = BankStatusCache::default();
412        let blockhash = hash(Hash::default().as_ref());
413        let ancestors = Ancestors::default();
414        status_cache.add_root(0);
415        status_cache.clear();
416        status_cache.insert(&blockhash, sig, 0, ());
417        assert!(status_cache
418            .get_status(sig, &blockhash, &ancestors)
419            .is_some());
420    }
421
422    #[test]
423    fn test_signatures_slice() {
424        let sig = Signature::default();
425        let mut status_cache = BankStatusCache::default();
426        let blockhash = hash(Hash::default().as_ref());
427        status_cache.clear();
428        status_cache.insert(&blockhash, sig, 0, ());
429        let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
430        let sig_slice: &[u8; CACHED_KEY_SIZE] =
431            arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE];
432        assert!(sig_map.get(sig_slice).is_some());
433    }
434
435    #[test]
436    fn test_slot_deltas() {
437        let sig = Signature::default();
438        let mut status_cache = BankStatusCache::default();
439        let blockhash = hash(Hash::default().as_ref());
440        status_cache.clear();
441        status_cache.insert(&blockhash, sig, 0, ());
442        let slot_deltas = status_cache.slot_deltas(&[0]);
443        let cache = StatusCache::from_slot_deltas(&slot_deltas);
444        assert_eq!(cache, status_cache);
445        let slot_deltas = cache.slot_deltas(&[0]);
446        let cache = StatusCache::from_slot_deltas(&slot_deltas);
447        assert_eq!(cache, status_cache);
448    }
449
450    #[test]
451    fn test_roots_deltas() {
452        let sig = Signature::default();
453        let mut status_cache = BankStatusCache::default();
454        let blockhash = hash(Hash::default().as_ref());
455        let blockhash2 = hash(blockhash.as_ref());
456        status_cache.insert(&blockhash, sig, 0, ());
457        status_cache.insert(&blockhash, sig, 1, ());
458        status_cache.insert(&blockhash2, sig, 1, ());
459        for i in 0..(MAX_CACHE_ENTRIES + 1) {
460            status_cache.add_root(i as u64);
461        }
462        let slots: Vec<_> = (0..MAX_CACHE_ENTRIES as u64 + 1).collect();
463        assert_eq!(status_cache.slot_deltas.len(), 1);
464        assert!(status_cache.slot_deltas.get(&1).is_some());
465        let slot_deltas = status_cache.slot_deltas(&slots);
466        let cache = StatusCache::from_slot_deltas(&slot_deltas);
467        assert_eq!(cache, status_cache);
468    }
469
470    #[test]
471    #[allow(clippy::assertions_on_constants)]
472    fn test_age_sanity() {
473        assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
474    }
475
476    #[test]
477    fn test_clear_slot_signatures() {
478        let sig = Signature::default();
479        let mut status_cache = BankStatusCache::default();
480        let blockhash = hash(Hash::default().as_ref());
481        let blockhash2 = hash(blockhash.as_ref());
482        status_cache.insert(&blockhash, sig, 0, ());
483        status_cache.insert(&blockhash, sig, 1, ());
484        status_cache.insert(&blockhash2, sig, 1, ());
485
486        let mut ancestors0 = Ancestors::default();
487        ancestors0.insert(0, 0);
488        let mut ancestors1 = Ancestors::default();
489        ancestors1.insert(1, 0);
490
491        // Clear slot 0 related data
492        assert!(status_cache
493            .get_status(sig, &blockhash, &ancestors0)
494            .is_some());
495        status_cache.clear_slot_entries(0);
496        assert!(status_cache
497            .get_status(sig, &blockhash, &ancestors0)
498            .is_none());
499        assert!(status_cache
500            .get_status(sig, &blockhash, &ancestors1)
501            .is_some());
502        assert!(status_cache
503            .get_status(sig, &blockhash2, &ancestors1)
504            .is_some());
505
506        // Check that the slot delta for slot 0 is gone, but slot 1 still
507        // exists
508        assert!(status_cache.slot_deltas.get(&0).is_none());
509        assert!(status_cache.slot_deltas.get(&1).is_some());
510
511        // Clear slot 1 related data
512        status_cache.clear_slot_entries(1);
513        assert!(status_cache.slot_deltas.is_empty());
514        assert!(status_cache
515            .get_status(sig, &blockhash, &ancestors1)
516            .is_none());
517        assert!(status_cache
518            .get_status(sig, &blockhash2, &ancestors1)
519            .is_none());
520        assert!(status_cache.cache.is_empty());
521    }
522
523    // Status cache uses a random key offset for each blockhash. Ensure that shorter
524    // keys can still be used if the offset if greater than the key length.
525    #[test]
526    fn test_different_sized_keys() {
527        let mut status_cache = BankStatusCache::default();
528        let ancestors = vec![(0, 0)].into_iter().collect();
529        let blockhash = Hash::default();
530        for _ in 0..100 {
531            let blockhash = hash(blockhash.as_ref());
532            let sig_key = Signature::default();
533            let hash_key = Hash::new_unique();
534            status_cache.insert(&blockhash, sig_key, 0, ());
535            status_cache.insert(&blockhash, hash_key, 0, ());
536            assert!(status_cache
537                .get_status(sig_key, &blockhash, &ancestors)
538                .is_some());
539            assert!(status_cache
540                .get_status(hash_key, &blockhash, &ancestors)
541                .is_some());
542        }
543    }
544}