gemachain_runtime/
status_cache.rs

1use crate::ancestors::Ancestors;
2
3use log::*;
4use rand::{thread_rng, Rng};
5use serde::Serialize;
6use gemachain_sdk::{
7    clock::{Slot, MAX_RECENT_BLOCKHASHES},
8    hash::Hash,
9};
10use std::{
11    collections::{hash_map::Entry, HashMap, HashSet},
12    sync::{Arc, Mutex},
13};
14
15pub const MAX_CACHE_ENTRIES: usize = MAX_RECENT_BLOCKHASHES;
16const CACHED_KEY_SIZE: usize = 20;
17
18// Store forks in a single chunk of memory to avoid another lookup.
19pub type ForkStatus<T> = Vec<(Slot, T)>;
20type KeySlice = [u8; CACHED_KEY_SIZE];
21type KeyMap<T> = HashMap<KeySlice, ForkStatus<T>>;
22// Map of Hash and status
23pub type Status<T> = Arc<Mutex<HashMap<Hash, (usize, Vec<(KeySlice, T)>)>>>;
24// A Map of hash + the highest fork it's been observed on along with
25// the key offset and a Map of the key slice + Fork status for that key
26type KeyStatusMap<T> = HashMap<Hash, (Slot, usize, KeyMap<T>)>;
27
28// A map of keys recorded in each fork; used to serialize for snapshots easily.
29// Doesn't store a `SlotDelta` in it because the bool `root` is usually set much later
30type SlotDeltaMap<T> = HashMap<Slot, Status<T>>;
31
32// The statuses added during a slot, can be used to build on top of a status cache or to
33// construct a new one. Usually derived from a status cache's `SlotDeltaMap`
34pub type SlotDelta<T> = (Slot, bool, Status<T>);
35
36#[derive(Debug, PartialEq)]
37pub struct SignatureConfirmationStatus<T> {
38    pub slot: Slot,
39    pub confirmations: usize,
40    pub status: T,
41}
42
43#[derive(Clone, Debug, AbiExample)]
44pub struct StatusCache<T: Serialize + Clone> {
45    cache: KeyStatusMap<T>,
46    roots: HashSet<Slot>,
47    /// all keys seen during a fork/slot
48    slot_deltas: SlotDeltaMap<T>,
49}
50
51impl<T: Serialize + Clone> Default for StatusCache<T> {
52    fn default() -> Self {
53        Self {
54            cache: HashMap::default(),
55            // 0 is always a root
56            roots: [0].iter().cloned().collect(),
57            slot_deltas: HashMap::default(),
58        }
59    }
60}
61
62impl<T: Serialize + Clone + PartialEq> PartialEq for StatusCache<T> {
63    fn eq(&self, other: &Self) -> bool {
64        self.roots == other.roots
65            && self
66                .cache
67                .iter()
68                .all(|(hash, (slot, key_index, hash_map))| {
69                    if let Some((other_slot, other_key_index, other_hash_map)) =
70                        other.cache.get(hash)
71                    {
72                        if slot == other_slot && key_index == other_key_index {
73                            return hash_map.iter().all(|(slice, fork_map)| {
74                                if let Some(other_fork_map) = other_hash_map.get(slice) {
75                                    // all this work just to compare the highest forks in the fork map
76                                    // per entry
77                                    return fork_map.last() == other_fork_map.last();
78                                }
79                                false
80                            });
81                        }
82                    }
83                    false
84                })
85    }
86}
87
88impl<T: Serialize + Clone> StatusCache<T> {
89    pub fn clear_slot_entries(&mut self, slot: Slot) {
90        let slot_deltas = self.slot_deltas.remove(&slot);
91        if let Some(slot_deltas) = slot_deltas {
92            let slot_deltas = slot_deltas.lock().unwrap();
93            for (blockhash, (_, key_list)) in slot_deltas.iter() {
94                // Any blockhash that exists in self.slot_deltas must also exist
95                // in self.cache, because in self.purge_roots(), when an entry
96                // (b, (max_slot, _, _)) is removed from self.cache, this implies
97                // all entries in self.slot_deltas < max_slot are also removed
98                if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) {
99                    let (_, _, all_hash_maps) = o_blockhash_entries.get_mut();
100
101                    for (key_slice, _) in key_list {
102                        if let Entry::Occupied(mut o_key_list) = all_hash_maps.entry(*key_slice) {
103                            let key_list = o_key_list.get_mut();
104                            key_list.retain(|(updated_slot, _)| *updated_slot != slot);
105                            if key_list.is_empty() {
106                                o_key_list.remove_entry();
107                            }
108                        } else {
109                            panic!(
110                                "Map for key must exist if key exists in self.slot_deltas, slot: {}",
111                                slot
112                            )
113                        }
114                    }
115
116                    if all_hash_maps.is_empty() {
117                        o_blockhash_entries.remove_entry();
118                    }
119                } else {
120                    panic!(
121                        "Blockhash must exist if it exists in self.slot_deltas, slot: {}",
122                        slot
123                    )
124                }
125            }
126        }
127    }
128
129    /// Check if the key is in any of the forks in the ancestors set and
130    /// with a certain blockhash.
131    pub fn get_status<K: AsRef<[u8]>>(
132        &self,
133        key: K,
134        transaction_blockhash: &Hash,
135        ancestors: &Ancestors,
136    ) -> Option<(Slot, T)> {
137        let map = self.cache.get(transaction_blockhash)?;
138        let (_, index, keymap) = map;
139        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
140        let index = (*index).min(max_key_index);
141        let key_slice: &[u8; CACHED_KEY_SIZE] =
142            arrayref::array_ref![key.as_ref(), index, CACHED_KEY_SIZE];
143        if let Some(stored_forks) = keymap.get(key_slice) {
144            let res = stored_forks
145                .iter()
146                .find(|(f, _)| ancestors.get(f) || self.roots.get(f).is_some())
147                .cloned();
148            if res.is_some() {
149                return res;
150            }
151        }
152        None
153    }
154
155    /// Search for a key with any blockhash
156    /// Prefer get_status for performance reasons, it doesn't need
157    /// to search all blockhashes.
158    pub fn get_status_any_blockhash<K: AsRef<[u8]>>(
159        &self,
160        key: &K,
161        ancestors: &Ancestors,
162    ) -> Option<(Slot, T)> {
163        let mut keys = vec![];
164        let mut val: Vec<_> = self.cache.iter().map(|(k, _)| *k).collect();
165        keys.append(&mut val);
166
167        for blockhash in keys.iter() {
168            trace!("get_status_any_blockhash: trying {}", blockhash);
169            let status = self.get_status(key, blockhash, ancestors);
170            if status.is_some() {
171                return status;
172            }
173        }
174        None
175    }
176
177    /// Add a known root fork.  Roots are always valid ancestors.
178    /// After MAX_CACHE_ENTRIES, roots are removed, and any old keys are cleared.
179    pub fn add_root(&mut self, fork: Slot) {
180        self.roots.insert(fork);
181        self.purge_roots();
182    }
183
184    pub fn roots(&self) -> &HashSet<u64> {
185        &self.roots
186    }
187
188    /// Insert a new key for a specific slot.
189    pub fn insert<K: AsRef<[u8]>>(
190        &mut self,
191        transaction_blockhash: &Hash,
192        key: &K,
193        slot: Slot,
194        res: T,
195    ) {
196        let max_key_index = key.as_ref().len().saturating_sub(CACHED_KEY_SIZE + 1);
197        let hash_map = self.cache.entry(*transaction_blockhash).or_insert_with(|| {
198            let key_index = thread_rng().gen_range(0, max_key_index + 1);
199            (slot, key_index, HashMap::new())
200        });
201
202        hash_map.0 = std::cmp::max(slot, hash_map.0);
203        let key_index = hash_map.1.min(max_key_index);
204        let mut key_slice = [0u8; CACHED_KEY_SIZE];
205        key_slice.clone_from_slice(&key.as_ref()[key_index..key_index + CACHED_KEY_SIZE]);
206        self.insert_with_slice(transaction_blockhash, slot, key_index, key_slice, res);
207    }
208
209    pub fn purge_roots(&mut self) {
210        if self.roots.len() > MAX_CACHE_ENTRIES {
211            if let Some(min) = self.roots.iter().min().cloned() {
212                self.roots.remove(&min);
213                self.cache.retain(|_, (fork, _, _)| *fork > min);
214                self.slot_deltas.retain(|slot, _| *slot > min);
215            }
216        }
217    }
218
219    /// Clear for testing
220    pub fn clear(&mut self) {
221        for v in self.cache.values_mut() {
222            v.2 = HashMap::new();
223        }
224
225        self.slot_deltas
226            .iter_mut()
227            .for_each(|(_, status)| status.lock().unwrap().clear());
228    }
229
230    // returns the statuses for each slot in the slots provided
231    pub fn slot_deltas(&self, slots: &[Slot]) -> Vec<SlotDelta<T>> {
232        let empty = Arc::new(Mutex::new(HashMap::new()));
233        slots
234            .iter()
235            .map(|slot| {
236                (
237                    *slot,
238                    self.roots.contains(slot),
239                    self.slot_deltas.get(slot).unwrap_or(&empty).clone(),
240                )
241            })
242            .collect()
243    }
244
245    // replay deltas into a status_cache allows "appending" data
246    pub fn append(&mut self, slot_deltas: &[SlotDelta<T>]) {
247        for (slot, is_root, statuses) in slot_deltas {
248            statuses
249                .lock()
250                .unwrap()
251                .iter()
252                .for_each(|(tx_hash, (key_index, statuses))| {
253                    for (key_slice, res) in statuses.iter() {
254                        self.insert_with_slice(tx_hash, *slot, *key_index, *key_slice, res.clone())
255                    }
256                });
257            if *is_root {
258                self.add_root(*slot);
259            }
260        }
261    }
262
263    pub fn from_slot_deltas(slot_deltas: &[SlotDelta<T>]) -> Self {
264        // play all deltas back into the status cache
265        let mut me = Self::default();
266        me.append(slot_deltas);
267        me
268    }
269
270    fn insert_with_slice(
271        &mut self,
272        transaction_blockhash: &Hash,
273        slot: Slot,
274        key_index: usize,
275        key_slice: [u8; CACHED_KEY_SIZE],
276        res: T,
277    ) {
278        let hash_map =
279            self.cache
280                .entry(*transaction_blockhash)
281                .or_insert((slot, key_index, HashMap::new()));
282        hash_map.0 = std::cmp::max(slot, hash_map.0);
283
284        let forks = hash_map.2.entry(key_slice).or_insert_with(Vec::new);
285        forks.push((slot, res.clone()));
286        let slot_deltas = self.slot_deltas.entry(slot).or_default();
287        let mut fork_entry = slot_deltas.lock().unwrap();
288        let (_, hash_entry) = fork_entry
289            .entry(*transaction_blockhash)
290            .or_insert((key_index, vec![]));
291        hash_entry.push((key_slice, res))
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use gemachain_sdk::{hash::hash, signature::Signature};
299
300    type BankStatusCache = StatusCache<()>;
301
302    #[test]
303    fn test_empty_has_no_sigs() {
304        let sig = Signature::default();
305        let blockhash = hash(Hash::default().as_ref());
306        let status_cache = BankStatusCache::default();
307        assert_eq!(
308            status_cache.get_status(&sig, &blockhash, &Ancestors::default()),
309            None
310        );
311        assert_eq!(
312            status_cache.get_status_any_blockhash(&sig, &Ancestors::default()),
313            None
314        );
315    }
316
317    #[test]
318    fn test_find_sig_with_ancestor_fork() {
319        let sig = Signature::default();
320        let mut status_cache = BankStatusCache::default();
321        let blockhash = hash(Hash::default().as_ref());
322        let ancestors = vec![(0, 1)].into_iter().collect();
323        status_cache.insert(&blockhash, &sig, 0, ());
324        assert_eq!(
325            status_cache.get_status(&sig, &blockhash, &ancestors),
326            Some((0, ()))
327        );
328        assert_eq!(
329            status_cache.get_status_any_blockhash(&sig, &ancestors),
330            Some((0, ()))
331        );
332    }
333
334    #[test]
335    fn test_find_sig_without_ancestor_fork() {
336        let sig = Signature::default();
337        let mut status_cache = BankStatusCache::default();
338        let blockhash = hash(Hash::default().as_ref());
339        let ancestors = Ancestors::default();
340        status_cache.insert(&blockhash, &sig, 1, ());
341        assert_eq!(status_cache.get_status(&sig, &blockhash, &ancestors), None);
342        assert_eq!(
343            status_cache.get_status_any_blockhash(&sig, &ancestors),
344            None
345        );
346    }
347
348    #[test]
349    fn test_find_sig_with_root_ancestor_fork() {
350        let sig = Signature::default();
351        let mut status_cache = BankStatusCache::default();
352        let blockhash = hash(Hash::default().as_ref());
353        let ancestors = Ancestors::default();
354        status_cache.insert(&blockhash, &sig, 0, ());
355        status_cache.add_root(0);
356        assert_eq!(
357            status_cache.get_status(&sig, &blockhash, &ancestors),
358            Some((0, ()))
359        );
360    }
361
362    #[test]
363    fn test_insert_picks_latest_blockhash_fork() {
364        let sig = Signature::default();
365        let mut status_cache = BankStatusCache::default();
366        let blockhash = hash(Hash::default().as_ref());
367        let ancestors = vec![(0, 0)].into_iter().collect();
368        status_cache.insert(&blockhash, &sig, 0, ());
369        status_cache.insert(&blockhash, &sig, 1, ());
370        for i in 0..(MAX_CACHE_ENTRIES + 1) {
371            status_cache.add_root(i as u64);
372        }
373        assert!(status_cache
374            .get_status(&sig, &blockhash, &ancestors)
375            .is_some());
376    }
377
378    #[test]
379    fn test_root_expires() {
380        let sig = Signature::default();
381        let mut status_cache = BankStatusCache::default();
382        let blockhash = hash(Hash::default().as_ref());
383        let ancestors = Ancestors::default();
384        status_cache.insert(&blockhash, &sig, 0, ());
385        for i in 0..(MAX_CACHE_ENTRIES + 1) {
386            status_cache.add_root(i as u64);
387        }
388        assert_eq!(status_cache.get_status(&sig, &blockhash, &ancestors), None);
389    }
390
391    #[test]
392    fn test_clear_signatures_sigs_are_gone() {
393        let sig = Signature::default();
394        let mut status_cache = BankStatusCache::default();
395        let blockhash = hash(Hash::default().as_ref());
396        let ancestors = Ancestors::default();
397        status_cache.insert(&blockhash, &sig, 0, ());
398        status_cache.add_root(0);
399        status_cache.clear();
400        assert_eq!(status_cache.get_status(&sig, &blockhash, &ancestors), None);
401    }
402
403    #[test]
404    fn test_clear_signatures_insert_works() {
405        let sig = Signature::default();
406        let mut status_cache = BankStatusCache::default();
407        let blockhash = hash(Hash::default().as_ref());
408        let ancestors = Ancestors::default();
409        status_cache.add_root(0);
410        status_cache.clear();
411        status_cache.insert(&blockhash, &sig, 0, ());
412        assert!(status_cache
413            .get_status(&sig, &blockhash, &ancestors)
414            .is_some());
415    }
416
417    #[test]
418    fn test_signatures_slice() {
419        let sig = Signature::default();
420        let mut status_cache = BankStatusCache::default();
421        let blockhash = hash(Hash::default().as_ref());
422        status_cache.clear();
423        status_cache.insert(&blockhash, &sig, 0, ());
424        let (_, index, sig_map) = status_cache.cache.get(&blockhash).unwrap();
425        let sig_slice: &[u8; CACHED_KEY_SIZE] =
426            arrayref::array_ref![sig.as_ref(), *index, CACHED_KEY_SIZE];
427        assert!(sig_map.get(sig_slice).is_some());
428    }
429
430    #[test]
431    fn test_slot_deltas() {
432        let sig = Signature::default();
433        let mut status_cache = BankStatusCache::default();
434        let blockhash = hash(Hash::default().as_ref());
435        status_cache.clear();
436        status_cache.insert(&blockhash, &sig, 0, ());
437        let slot_deltas = status_cache.slot_deltas(&[0]);
438        let cache = StatusCache::from_slot_deltas(&slot_deltas);
439        assert_eq!(cache, status_cache);
440        let slot_deltas = cache.slot_deltas(&[0]);
441        let cache = StatusCache::from_slot_deltas(&slot_deltas);
442        assert_eq!(cache, status_cache);
443    }
444
445    #[test]
446    fn test_roots_deltas() {
447        let sig = Signature::default();
448        let mut status_cache = BankStatusCache::default();
449        let blockhash = hash(Hash::default().as_ref());
450        let blockhash2 = hash(blockhash.as_ref());
451        status_cache.insert(&blockhash, &sig, 0, ());
452        status_cache.insert(&blockhash, &sig, 1, ());
453        status_cache.insert(&blockhash2, &sig, 1, ());
454        for i in 0..(MAX_CACHE_ENTRIES + 1) {
455            status_cache.add_root(i as u64);
456        }
457        let slots: Vec<_> = (0_u64..MAX_CACHE_ENTRIES as u64 + 1).collect();
458        assert_eq!(status_cache.slot_deltas.len(), 1);
459        assert!(status_cache.slot_deltas.get(&1).is_some());
460        let slot_deltas = status_cache.slot_deltas(&slots);
461        let cache = StatusCache::from_slot_deltas(&slot_deltas);
462        assert_eq!(cache, status_cache);
463    }
464
465    #[test]
466    #[allow(clippy::assertions_on_constants)]
467    fn test_age_sanity() {
468        assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES);
469    }
470
471    #[test]
472    fn test_clear_slot_signatures() {
473        let sig = Signature::default();
474        let mut status_cache = BankStatusCache::default();
475        let blockhash = hash(Hash::default().as_ref());
476        let blockhash2 = hash(blockhash.as_ref());
477        status_cache.insert(&blockhash, &sig, 0, ());
478        status_cache.insert(&blockhash, &sig, 1, ());
479        status_cache.insert(&blockhash2, &sig, 1, ());
480
481        let mut ancestors0 = Ancestors::default();
482        ancestors0.insert(0, 0);
483        let mut ancestors1 = Ancestors::default();
484        ancestors1.insert(1, 0);
485
486        // Clear slot 0 related data
487        assert!(status_cache
488            .get_status(&sig, &blockhash, &ancestors0)
489            .is_some());
490        status_cache.clear_slot_entries(0);
491        assert!(status_cache
492            .get_status(&sig, &blockhash, &ancestors0)
493            .is_none());
494        assert!(status_cache
495            .get_status(&sig, &blockhash, &ancestors1)
496            .is_some());
497        assert!(status_cache
498            .get_status(&sig, &blockhash2, &ancestors1)
499            .is_some());
500
501        // Check that the slot delta for slot 0 is gone, but slot 1 still
502        // exists
503        assert!(status_cache.slot_deltas.get(&0).is_none());
504        assert!(status_cache.slot_deltas.get(&1).is_some());
505
506        // Clear slot 1 related data
507        status_cache.clear_slot_entries(1);
508        assert!(status_cache.slot_deltas.is_empty());
509        assert!(status_cache
510            .get_status(&sig, &blockhash, &ancestors1)
511            .is_none());
512        assert!(status_cache
513            .get_status(&sig, &blockhash2, &ancestors1)
514            .is_none());
515        assert!(status_cache.cache.is_empty());
516    }
517
518    // Status cache uses a random key offset for each blockhash. Ensure that shorter
519    // keys can still be used if the offset if greater than the key length.
520    #[test]
521    fn test_different_sized_keys() {
522        let mut status_cache = BankStatusCache::default();
523        let ancestors = vec![(0, 0)].into_iter().collect();
524        let blockhash = Hash::default();
525        for _ in 0..100 {
526            let blockhash = hash(blockhash.as_ref());
527            let sig_key = Signature::default();
528            let hash_key = Hash::new_unique();
529            status_cache.insert(&blockhash, &sig_key, 0, ());
530            status_cache.insert(&blockhash, &hash_key, 0, ());
531            assert!(status_cache
532                .get_status(&sig_key, &blockhash, &ancestors)
533                .is_some());
534            assert!(status_cache
535                .get_status(&hash_key, &blockhash, &ancestors)
536                .is_some());
537        }
538    }
539}