miraland_runtime/
status_cache.rs

1use {
2    log::*,
3    miraland_accounts_db::ancestors::Ancestors,
4    rand::{thread_rng, Rng},
5    serde::Serialize,
6    miraland_sdk::{
7        clock::{Slot, MAX_RECENT_BLOCKHASHES},
8        hash::Hash,
9    },
10    std::{
11        collections::{hash_map::Entry, HashMap, HashSet},
12        sync::{Arc, Mutex},
13    },
14};
15
16pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
17const CACHED_KEY_SIZE: usize = 20;
18
19// Store forks in a single chunk of memory to avoid another lookup.
20pub type ForkStatus<T> = Vec<(Slot, T)>;
21type KeySlice = [u8; CACHED_KEY_SIZE];
22type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;
23// Map of Hash and status
24pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;
25// A Map of hash + the highest fork it's been observed on along with
26// the key offset and a Map of the key slice + Fork status for that key
27type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;
28
29// A map of keys recorded in each fork; used to serialize for snapshots easily.
30// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
31type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;
32
33// The statuses added during a slot, can be used to build on top of a status cache or to
34// construct a new one. Usually derived from a status cache's `SlotDeltaMap`
35pub type SlotDelta<T> = (Slot, bool, Status<T>);
36
37#[derive(Clone, Debug, AbiExample)]
38pub struct StatusCache<T: Serialize + Clone> {
39    cache: KeyStatusMap<T>,
40    roots: HashSet<Slot>,
41    /// all keys seen during a fork/slot
42    slot_deltas: SlotDeltaMap<T>,
43}
44
45impl<T: Serialize + Clone> Default for StatusCache<T> {
46    fn default() -> Self {
47        Self {
48            cache: HashMap::default(),
49            // 0 is always a root
50            roots: HashSet::from([0]),
51            slot_deltas: HashMap::default(),
52        }
53    }
54}
55
56impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
57    fn eq(&self, other: &Self) -> bool {
58        self.roots == other.roots
59            && self
60                .cache
61                .iter()
62                .all(|(hash, (slot, key_index, hash_map))| {
63                    if let Some((other_slot, other_key_index, other_hash_map)) =
64                        other.cache.get(hash)
65                    {
66                        if slot == other_slot && key_index == other_key_index {
67                            return hash_map.iter().all(|(slice, fork_map)| {
68                                if let Some(other_fork_map) = other_hash_map.get(slice) {
69                                    // all this work just to compare the highest forks in the fork map
70                                    // per entry
71                                    return fork_map.last() == other_fork_map.last();
72                                }
73                                false
74                            });
75                        }
76                    }
77                    false
78                })
79    }
80}
81
82impl<T: Serialize + Clone> StatusCache<T> {
83    pub fn clear_slot_entries(&mut self, slot: Slot) {
84        let slot_deltas = self.slot_deltas.remove(&slot);
85        if let Some(slot_deltas) = slot_deltas {
86            let slot_deltas = slot_deltas.lock().unwrap();
87            for (blockhash, (_, key_list)) in slot_deltas.iter() {
88                // Any blockhash that exists in self.slot_deltas must also exist
89                // in self.cache, because in self.purge_roots(), when an entry
90                // (b, (max_slot, _, _)) is removed from self.cache, this implies
91                // all entries in self.slot_deltas < max_slot are also removed
92                if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) {
93                    let (_, _, all_hash_maps) = o_blockhash_entries.get_mut();
94
95                    for (key_slice, _) in key_list {
96                        if let Entry::Occupied(mut o_key_list) = all_hash_maps.entry(*key_slice) {
97                            let key_list = o_key_list.get_mut();
98                            key_list.retain(|(updated_slot, _)| *updated_slot != slot);
99                            if key_list.is_empty() {
100                                o_key_list.remove_entry();
101                            }
102                        } else {
103                            panic!(
104                                "Map for key must exist if key exists in self.slot_deltas, slot: {slot}"
105                            )
106                        }
107                    }
108
109                    if all_hash_maps.is_empty() {
110                        o_blockhash_entries.remove_entry();
111                    }
112                } else {
113                    panic!("Blockhash must exist if it exists in self.slot_deltas, slot: {slot}")
114                }
115            }
116        }
117    }
118
119    /// Check if the key is in any of the forks in the ancestors set and
120    /// with a certain blockhash.
121    pub fn get_status<K: AsRef<[u8]>>(
122        &self,
123        key: K,
124        transaction_blockhash: &Hash,
125        ancestors: &Ancestors,
126    ) -> Option<(Slot, T)> {
127        let map = self.cache.get(transaction_blockhash)?;
128        let (_, index, keymap) = map;
129        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
130        let index = (*index).min(max_key_index);
131        let key_slice: &[u8; CACHED_KEY_SIZE] =
132            arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE];
133        if let Some(stored_forks) = keymap.get(key_slice) {
134            let res = stored_forks
135                .iter()
136                .find(|(f, _)| ancestors.contains_key(f) || self.roots.get(f).is_some())
137                .cloned();
138            if res.is_some() {
139                return res;
140            }
141        }
142        None
143    }
144
145    /// Search for a key with any blockhash
146    /// Prefer get_status for performance reasons, it doesn't need
147    /// to search all blockhashes.
148    pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
149        &self,
150        key: K,
151        ancestors: &Ancestors,
152    ) -> Option<(Slot, T)> {
153        let keys: Vec<_> = self.cache.keys().copied().collect();
154
155        for blockhash in keys.iter() {
156            trace!("get_status_any_blockhash: trying {}", blockhash);
157            let status = self.get_status(&key, blockhash, ancestors);
158            if status.is_some() {
159                return status;
160            }
161        }
162        None
163    }
164
165    /// Add a known root fork.  Roots are always valid ancestors.
166    /// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
167    pub fn add_root(&mut self, fork: Slot) {
168        self.roots.insert(fork);
169        self.purge_roots();
170    }
171
172    pub fn roots(&self) -> &HashSet<Slot> {
173        &self.roots
174    }
175
176    /// Insert a new key for a specific slot.
177    pub fn insert<K: AsRef<[u8]>>(
178        &mut self,
179        transaction_blockhash: &Hash,
180        key: K,
181        slot: Slot,
182        res: T,
183    ) {
184        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
185        let hash_map = self.cache.entry(*transaction_blockhash).or_insert_with(|| {
186            let key_index = thread_rng().gen_range(0..max_key_index + 1);
187            (slot, key_index, HashMap::new())
188        });
189
190        hash_map.0 = std::cmp::max(slot, hash_map.0);
191        let key_index = hash_map.1.min(max_key_index);
192        let mut key_slice = [0u8; CACHED_KEY_SIZE];
193        key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]);
194        self.insert_with_slice(transaction_blockhash, slot, key_index, key_slice, res);
195    }
196
197    pub fn purge_roots(&mut self) {
198        if self.roots.len() > MAX_CACHE_ENTRIES {
199            if let Some(min) = self.roots.iter().min().cloned() {
200                self.roots.remove(&min);
201                self.cache.retain(|_, (fork, _, _)| *fork > min);
202                self.slot_deltas.retain(|slot, _| *slot > min);
203            }
204        }
205    }
206
207    /// Clear for testing
208    pub fn clear(&mut self) {
209        for v in self.cache.values_mut() {
210            v.2 = HashMap::new();
211        }
212
213        self.slot_deltas
214            .iter_mut()
215            .for_each(|(_, status)| status.lock().unwrap().clear());
216    }
217
218    /// Get the statuses for all the root slots
219    pub fn root_slot_deltas(&self) -> Vec<SlotDelta<T>> {
220        self.roots()
221            .iter()
222            .map(|root| {
223                (
224                    *root,
225                    true, // <-- is_root
226                    self.slot_deltas.get(root).cloned().unwrap_or_default(),
227                )
228            })
229            .collect()
230    }
231
232    // replay deltas into a status_cache allows "appending" data
233    pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
234        for (slot, is_root, statuses) in slot_deltas {
235            statuses
236                .lock()
237                .unwrap()
238                .iter()
239                .for_each(|(tx_hash, (key_index, statuses))| {
240                    for (key_slice, res) in statuses.iter() {
241                        self.insert_with_slice(tx_hash, *slot, *key_index, *key_slice, res.clone())
242                    }
243                });
244            if *is_root {
245                self.add_root(*slot);
246            }
247        }
248    }
249
250    pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
251        // play all deltas back into the status cache
252        let mut me = Self::default();
253        me.append(slot_deltas);
254        me
255    }
256
257    fn insert_with_slice(
258        &mut self,
259        transaction_blockhash: &Hash,
260        slot: Slot,
261        key_index: usize,
262        key_slice: [u8; CACHED_KEY_SIZE],
263        res: T,
264    ) {
265        let hash_map =
266            self.cache
267                .entry(*transaction_blockhash)
268                .or_insert((slot, key_index, HashMap::new()));
269        hash_map.0 = std::cmp::max(slot, hash_map.0);
270
271        let forks = hash_map.2.entry(key_slice).or_default();
272        forks.push((slot, res.clone()));
273        let slot_deltas = self.slot_deltas.entry(slot).or_default();
274        let mut fork_entry = slot_deltas.lock().unwrap();
275        let (_, hash_entry) = fork_entry
276            .entry(*transaction_blockhash)
277            .or_insert((key_index, vec![]));
278        hash_entry.push((key_slice, res))
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use {
285        super::*,
286        miraland_sdk::{hash::hash, signature::Signature},
287    };
288
289    type BankStatusCache = StatusCache<()>;
290
291    #[test]
292    fn test_empty_has_no_sigs() {
293        let sig = Signature::default();
294        let blockhash = hash(Hash::default().as_ref());
295        let status_cache = BankStatusCache::default();
296        assert_eq!(
297            status_cache.get_status(sig, &blockhash, &Ancestors::default()),
298            None
299        );
300        assert_eq!(
301            status_cache.get_status_any_blockhash(sig, &Ancestors::default()),
302            None
303        );
304    }
305
306    #[test]
307    fn test_find_sig_with_ancestor_fork() {
308        let sig = Signature::default();
309        let mut status_cache = BankStatusCache::default();
310        let blockhash = hash(Hash::default().as_ref());
311        let ancestors = vec![(0, 1)].into_iter().collect();
312        status_cache.insert(&blockhash, sig, 0, ());
313        assert_eq!(
314            status_cache.get_status(sig, &blockhash, &ancestors),
315            Some((0, ()))
316        );
317        assert_eq!(
318            status_cache.get_status_any_blockhash(sig, &ancestors),
319            Some((0, ()))
320        );
321    }
322
323    #[test]
324    fn test_find_sig_without_ancestor_fork() {
325        let sig = Signature::default();
326        let mut status_cache = BankStatusCache::default();
327        let blockhash = hash(Hash::default().as_ref());
328        let ancestors = Ancestors::default();
329        status_cache.insert(&blockhash, sig, 1, ());
330        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
331        assert_eq!(status_cache.get_status_any_blockhash(sig, &ancestors), None);
332    }
333
334    #[test]
335    fn test_find_sig_with_root_ancestor_fork() {
336        let sig = Signature::default();
337        let mut status_cache = BankStatusCache::default();
338        let blockhash = hash(Hash::default().as_ref());
339        let ancestors = Ancestors::default();
340        status_cache.insert(&blockhash, sig, 0, ());
341        status_cache.add_root(0);
342        assert_eq!(
343            status_cache.get_status(sig, &blockhash, &ancestors),
344            Some((0, ()))
345        );
346    }
347
348    #[test]
349    fn test_insert_picks_latest_blockhash_fork() {
350        let sig = Signature::default();
351        let mut status_cache = BankStatusCache::default();
352        let blockhash = hash(Hash::default().as_ref());
353        let ancestors = vec![(0, 0)].into_iter().collect();
354        status_cache.insert(&blockhash, sig, 0, ());
355        status_cache.insert(&blockhash, sig, 1, ());
356        for i in 0..(MAX_CACHE_ENTRIES + 1) {
357            status_cache.add_root(i as u64);
358        }
359        assert!(status_cache
360            .get_status(sig, &blockhash, &ancestors)
361            .is_some());
362    }
363
364    #[test]
365    fn test_root_expires() {
366        let sig = Signature::default();
367        let mut status_cache = BankStatusCache::default();
368        let blockhash = hash(Hash::default().as_ref());
369        let ancestors = Ancestors::default();
370        status_cache.insert(&blockhash, sig, 0, ());
371        for i in 0..(MAX_CACHE_ENTRIES + 1) {
372            status_cache.add_root(i as u64);
373        }
374        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
375    }
376
377    #[test]
378    fn test_clear_signatures_sigs_are_gone() {
379        let sig = Signature::default();
380        let mut status_cache = BankStatusCache::default();
381        let blockhash = hash(Hash::default().as_ref());
382        let ancestors = Ancestors::default();
383        status_cache.insert(&blockhash, sig, 0, ());
384        status_cache.add_root(0);
385        status_cache.clear();
386        assert_eq!(status_cache.get_status(sig, &blockhash, &ancestors), None);
387    }
388
389    #[test]
390    fn test_clear_signatures_insert_works() {
391        let sig = Signature::default();
392        let mut status_cache = BankStatusCache::default();
393        let blockhash = hash(Hash::default().as_ref());
394        let ancestors = Ancestors::default();
395        status_cache.add_root(0);
396        status_cache.clear();
397        status_cache.insert(&blockhash, sig, 0, ());
398        assert!(status_cache
399            .get_status(sig, &blockhash, &ancestors)
400            .is_some());
401    }
402
403    #[test]
404    fn test_signatures_slice() {
405        let sig = Signature::default();
406        let mut status_cache = BankStatusCache::default();
407        let blockhash = hash(Hash::default().as_ref());
408        status_cache.clear();
409        status_cache.insert(&blockhash, sig, 0, ());
410        let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
411        let sig_slice: &[u8; CACHED_KEY_SIZE] =
412            arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE];
413        assert!(sig_map.get(sig_slice).is_some());
414    }
415
416    #[test]
417    fn test_slot_deltas() {
418        let sig = Signature::default();
419        let mut status_cache = BankStatusCache::default();
420        let blockhash = hash(Hash::default().as_ref());
421        status_cache.clear();
422        status_cache.insert(&blockhash, sig, 0, ());
423        assert!(status_cache.roots().contains(&0));
424        let slot_deltas = status_cache.root_slot_deltas();
425        let cache = StatusCache::from_slot_deltas(&slot_deltas);
426        assert_eq!(cache, status_cache);
427        let slot_deltas = cache.root_slot_deltas();
428        let cache = StatusCache::from_slot_deltas(&slot_deltas);
429        assert_eq!(cache, status_cache);
430    }
431
432    #[test]
433    fn test_roots_deltas() {
434        let sig = Signature::default();
435        let mut status_cache = BankStatusCache::default();
436        let blockhash = hash(Hash::default().as_ref());
437        let blockhash2 = hash(blockhash.as_ref());
438        status_cache.insert(&blockhash, sig, 0, ());
439        status_cache.insert(&blockhash, sig, 1, ());
440        status_cache.insert(&blockhash2, sig, 1, ());
441        for i in 0..(MAX_CACHE_ENTRIES + 1) {
442            status_cache.add_root(i as u64);
443        }
444        assert_eq!(status_cache.slot_deltas.len(), 1);
445        assert!(status_cache.slot_deltas.get(&1).is_some());
446        let slot_deltas = status_cache.root_slot_deltas();
447        let cache = StatusCache::from_slot_deltas(&slot_deltas);
448        assert_eq!(cache, status_cache);
449    }
450
451    #[test]
452    #[allow(clippy::assertions_on_constants)]
453    fn test_age_sanity() {
454        assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
455    }
456
457    #[test]
458    fn test_clear_slot_signatures() {
459        let sig = Signature::default();
460        let mut status_cache = BankStatusCache::default();
461        let blockhash = hash(Hash::default().as_ref());
462        let blockhash2 = hash(blockhash.as_ref());
463        status_cache.insert(&blockhash, sig, 0, ());
464        status_cache.insert(&blockhash, sig, 1, ());
465        status_cache.insert(&blockhash2, sig, 1, ());
466
467        let mut ancestors0 = Ancestors::default();
468        ancestors0.insert(0, 0);
469        let mut ancestors1 = Ancestors::default();
470        ancestors1.insert(1, 0);
471
472        // Clear slot 0 related data
473        assert!(status_cache
474            .get_status(sig, &blockhash, &ancestors0)
475            .is_some());
476        status_cache.clear_slot_entries(0);
477        assert!(status_cache
478            .get_status(sig, &blockhash, &ancestors0)
479            .is_none());
480        assert!(status_cache
481            .get_status(sig, &blockhash, &ancestors1)
482            .is_some());
483        assert!(status_cache
484            .get_status(sig, &blockhash2, &ancestors1)
485            .is_some());
486
487        // Check that the slot delta for slot 0 is gone, but slot 1 still
488        // exists
489        assert!(status_cache.slot_deltas.get(&0).is_none());
490        assert!(status_cache.slot_deltas.get(&1).is_some());
491
492        // Clear slot 1 related data
493        status_cache.clear_slot_entries(1);
494        assert!(status_cache.slot_deltas.is_empty());
495        assert!(status_cache
496            .get_status(sig, &blockhash, &ancestors1)
497            .is_none());
498        assert!(status_cache
499            .get_status(sig, &blockhash2, &ancestors1)
500            .is_none());
501        assert!(status_cache.cache.is_empty());
502    }
503
504    // Status cache uses a random key offset for each blockhash. Ensure that shorter
505    // keys can still be used if the offset if greater than the key length.
506    #[test]
507    fn test_different_sized_keys() {
508        let mut status_cache = BankStatusCache::default();
509        let ancestors = vec![(0, 0)].into_iter().collect();
510        let blockhash = Hash::default();
511        for _ in 0..100 {
512            let blockhash = hash(blockhash.as_ref());
513            let sig_key = Signature::default();
514            let hash_key = Hash::new_unique();
515            status_cache.insert(&blockhash, sig_key, 0, ());
516            status_cache.insert(&blockhash, hash_key, 0, ());
517            assert!(status_cache
518                .get_status(sig_key, &blockhash, &ancestors)
519                .is_some());
520            assert!(status_cache
521                .get_status(hash_key, &blockhash, &ancestors)
522                .is_some());
523        }
524    }
525}