solana_accounts_db/accounts_index/
secondary.rs

1use {
2    dashmap::{mapref::entry::Entry::Occupied, DashMap},
3    log::*,
4    solana_pubkey::Pubkey,
5    solana_time_utils::AtomicInterval,
6    std::{
7        collections::HashSet,
8        fmt::Debug,
9        sync::{
10            atomic::{AtomicU64, Ordering},
11            RwLock,
12        },
13    },
14};
15
16#[derive(Debug, Default, Clone, PartialEq)]
17pub struct AccountSecondaryIndexes {
18    pub keys: Option<AccountSecondaryIndexesIncludeExclude>,
19    pub indexes: HashSet<AccountIndex>,
20}
21
22impl AccountSecondaryIndexes {
23    pub fn is_empty(&self) -> bool {
24        self.indexes.is_empty()
25    }
26    pub fn contains(&self, index: &AccountIndex) -> bool {
27        self.indexes.contains(index)
28    }
29    pub fn include_key(&self, key: &Pubkey) -> bool {
30        match &self.keys {
31            Some(options) => options.exclude ^ options.keys.contains(key),
32            None => true, // include all keys
33        }
34    }
35}
36
37#[derive(Debug, PartialEq, Eq, Clone)]
38pub struct AccountSecondaryIndexesIncludeExclude {
39    pub exclude: bool,
40    pub keys: HashSet<Pubkey>,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum AccountIndex {
45    ProgramId,
46    SplTokenMint,
47    SplTokenOwner,
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum IndexKey {
52    ProgramId(Pubkey),
53    SplTokenMint(Pubkey),
54    SplTokenOwner(Pubkey),
55}
56
57// The only cases where an inner key should map to a different outer key is
58// if the key had different account data for the indexed key across different
59// slots. As this is rare, it should be ok to use a Vec here over a HashSet, even
60// though we are running some key existence checks.
61type SecondaryReverseIndexEntry = RwLock<Vec<Pubkey>>;
62
63pub trait SecondaryIndexEntry: Debug {
64    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64);
65    // Removes a value from the set. Returns whether the value was present in the set.
66    fn remove_inner_key(&self, key: &Pubkey) -> bool;
67    fn is_empty(&self) -> bool;
68    fn keys(&self) -> Vec<Pubkey>;
69    fn len(&self) -> usize;
70}
71
72#[derive(Debug, Default)]
73struct SecondaryIndexStats {
74    last_report: AtomicInterval,
75    num_inner_keys: AtomicU64,
76}
77
78#[derive(Debug, Default)]
79pub struct DashMapSecondaryIndexEntry {
80    account_keys: DashMap<Pubkey, ()>,
81}
82
83impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
84    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
85        if self.account_keys.get(key).is_none() {
86            self.account_keys.entry(*key).or_insert_with(|| {
87                inner_keys_count.fetch_add(1, Ordering::Relaxed);
88            });
89        }
90    }
91
92    fn remove_inner_key(&self, key: &Pubkey) -> bool {
93        self.account_keys.remove(key).is_some()
94    }
95
96    fn is_empty(&self) -> bool {
97        self.account_keys.is_empty()
98    }
99
100    fn keys(&self) -> Vec<Pubkey> {
101        self.account_keys
102            .iter()
103            .map(|entry_ref| *entry_ref.key())
104            .collect()
105    }
106
107    fn len(&self) -> usize {
108        self.account_keys.len()
109    }
110}
111
112#[derive(Debug, Default)]
113pub struct RwLockSecondaryIndexEntry {
114    account_keys: RwLock<HashSet<Pubkey>>,
115}
116
117impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
118    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
119        if self.account_keys.read().unwrap().contains(key) {
120            // the key already exists, so nothing to do here
121            return;
122        }
123
124        let was_newly_inserted = self.account_keys.write().unwrap().insert(*key);
125        if was_newly_inserted {
126            inner_keys_count.fetch_add(1, Ordering::Relaxed);
127        }
128    }
129
130    fn remove_inner_key(&self, key: &Pubkey) -> bool {
131        self.account_keys.write().unwrap().remove(key)
132    }
133
134    fn is_empty(&self) -> bool {
135        self.account_keys.read().unwrap().is_empty()
136    }
137
138    fn keys(&self) -> Vec<Pubkey> {
139        self.account_keys.read().unwrap().iter().cloned().collect()
140    }
141
142    fn len(&self) -> usize {
143        self.account_keys.read().unwrap().len()
144    }
145}
146
147#[derive(Debug, Default)]
148pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
149    metrics_name: &'static str,
150    // Map from index keys to index values
151    pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
152    pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
153    stats: SecondaryIndexStats,
154}
155
156impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
157    SecondaryIndex<SecondaryIndexEntryType>
158{
159    pub fn new(metrics_name: &'static str) -> Self {
160        Self {
161            metrics_name,
162            ..Self::default()
163        }
164    }
165
166    pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey) {
167        {
168            let pubkeys_map = self
169                .index
170                .get(key)
171                .unwrap_or_else(|| self.index.entry(*key).or_default().downgrade());
172
173            pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
174        }
175
176        {
177            let outer_keys = self.reverse_index.get(inner_key).unwrap_or_else(|| {
178                self.reverse_index
179                    .entry(*inner_key)
180                    .or_insert(RwLock::new(Vec::with_capacity(1)))
181                    .downgrade()
182            });
183
184            let should_insert = !outer_keys.read().unwrap().contains(key);
185            if should_insert {
186                let mut w_outer_keys = outer_keys.write().unwrap();
187                if !w_outer_keys.contains(key) {
188                    w_outer_keys.push(*key);
189                }
190            }
191        }
192
193        if self.stats.last_report.should_update(1000) {
194            datapoint_info!(
195                self.metrics_name,
196                ("num_secondary_keys", self.index.len() as i64, i64),
197                (
198                    "num_inner_keys",
199                    self.stats.num_inner_keys.load(Ordering::Relaxed) as i64,
200                    i64
201                ),
202                (
203                    "num_reverse_index_keys",
204                    self.reverse_index.len() as i64,
205                    i64
206                ),
207            );
208        }
209    }
210
211    // Only safe to call from `remove_by_inner_key()` due to asserts
212    fn remove_index_entries(&self, outer_key: &Pubkey, removed_inner_key: &Pubkey) {
213        let is_outer_key_empty = {
214            let inner_key_map = self
215                .index
216                .get_mut(outer_key)
217                .expect("If we're removing a key, then it must have an entry in the map");
218            // If we deleted a pubkey from the reverse_index, then the corresponding entry
219            // better exist in this index as well or the two indexes are out of sync!
220            assert!(inner_key_map.value().remove_inner_key(removed_inner_key));
221            inner_key_map.is_empty()
222        };
223
224        // Delete the `key` if the set of inner keys is empty
225        if is_outer_key_empty {
226            // Other threads may have interleaved writes to this `key`,
227            // so double-check again for its emptiness
228            if let Occupied(key_entry) = self.index.entry(*outer_key) {
229                if key_entry.get().is_empty() {
230                    key_entry.remove();
231                }
232            }
233        }
234    }
235
236    pub fn remove_by_inner_key(&self, inner_key: &Pubkey) {
237        // Save off which keys in `self.index` had slots removed so we can remove them
238        // after we purge the reverse index
239        let mut removed_outer_keys: HashSet<Pubkey> = HashSet::new();
240
241        // Check if the entry for `inner_key` in the reverse index is empty
242        // and can be removed
243        if let Some((_, outer_keys_set)) = self.reverse_index.remove(inner_key) {
244            for removed_outer_key in outer_keys_set.into_inner().unwrap().into_iter() {
245                removed_outer_keys.insert(removed_outer_key);
246            }
247        }
248
249        // Remove this value from those keys
250        for outer_key in &removed_outer_keys {
251            self.remove_index_entries(outer_key, inner_key);
252        }
253
254        // Safe to `fetch_sub()` here because a dead key cannot be removed more than once,
255        // and the `num_inner_keys` must have been incremented by exactly removed_outer_keys.len()
256        // in previous unique insertions of `inner_key` into `self.index` for each key
257        // in `removed_outer_keys`
258        self.stats
259            .num_inner_keys
260            .fetch_sub(removed_outer_keys.len() as u64, Ordering::Relaxed);
261    }
262
263    pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {
264        if let Some(inner_keys_map) = self.index.get(key) {
265            inner_keys_map.keys()
266        } else {
267            vec![]
268        }
269    }
270
271    /// log top 20 (owner, # accounts) in descending order of # accounts
272    pub fn log_contents(&self) {
273        let mut entries = self
274            .index
275            .iter()
276            .map(|entry| (entry.value().len(), *entry.key()))
277            .collect::<Vec<_>>();
278        entries.sort_unstable();
279        entries
280            .iter()
281            .rev()
282            .take(20)
283            .for_each(|(v, k)| info!("owner: {k}, accounts: {v}"));
284    }
285}