1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_sdk::pubkey::Pubkey;
use std::{
    collections::HashSet,
    fmt::Debug,
    sync::{
        atomic::{AtomicU64, Ordering},
        RwLock,
    },
};

// The only cases where an inner key should map to a different outer key is
// if the key had different account data for the indexed key across different
// slots. As this is rare, it should be ok to use a Vec here over a HashSet, even
// though we are running some key existence checks.
pub type SecondaryReverseIndexEntry = RwLock<Vec<Pubkey>>;

pub trait SecondaryIndexEntry: Debug {
    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64);
    // Removes a value from the set. Returns whether the value was present in the set.
    fn remove_inner_key(&self, key: &Pubkey) -> bool;
    fn is_empty(&self) -> bool;
    fn keys(&self) -> Vec<Pubkey>;
    fn len(&self) -> usize;
}

#[derive(Debug, Default)]
pub struct SecondaryIndexStats {
    last_report: AtomicU64,
    num_inner_keys: AtomicU64,
}

#[derive(Debug, Default)]
pub struct DashMapSecondaryIndexEntry {
    account_keys: DashMap<Pubkey, ()>,
}

impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
        if self.account_keys.get(key).is_none() {
            self.account_keys.entry(*key).or_insert_with(|| {
                inner_keys_count.fetch_add(1, Ordering::Relaxed);
            });
        }
    }

    fn remove_inner_key(&self, key: &Pubkey) -> bool {
        self.account_keys.remove(key).is_some()
    }

    fn is_empty(&self) -> bool {
        self.account_keys.is_empty()
    }

    fn keys(&self) -> Vec<Pubkey> {
        self.account_keys
            .iter()
            .map(|entry_ref| *entry_ref.key())
            .collect()
    }

    fn len(&self) -> usize {
        self.account_keys.len()
    }
}

#[derive(Debug, Default)]
pub struct RwLockSecondaryIndexEntry {
    account_keys: RwLock<HashSet<Pubkey>>,
}

impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
    fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
        let exists = self.account_keys.read().unwrap().contains(key);
        if !exists {
            let mut w_account_keys = self.account_keys.write().unwrap();
            w_account_keys.insert(*key);
            inner_keys_count.fetch_add(1, Ordering::Relaxed);
        };
    }

    fn remove_inner_key(&self, key: &Pubkey) -> bool {
        self.account_keys.write().unwrap().remove(key)
    }

    fn is_empty(&self) -> bool {
        self.account_keys.read().unwrap().is_empty()
    }

    fn keys(&self) -> Vec<Pubkey> {
        self.account_keys.read().unwrap().iter().cloned().collect()
    }

    fn len(&self) -> usize {
        self.account_keys.read().unwrap().len()
    }
}

#[derive(Debug, Default)]
pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
    metrics_name: &'static str,
    // Map from index keys to index values
    pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
    pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
    stats: SecondaryIndexStats,
}

impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
    SecondaryIndex<SecondaryIndexEntryType>
{
    pub fn new(metrics_name: &'static str) -> Self {
        Self {
            metrics_name,
            ..Self::default()
        }
    }

    pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey) {
        {
            let pubkeys_map = self.index.get(key).unwrap_or_else(|| {
                self.index
                    .entry(*key)
                    .or_insert(SecondaryIndexEntryType::default())
                    .downgrade()
            });

            pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
        }

        let outer_keys = self.reverse_index.get(inner_key).unwrap_or_else(|| {
            self.reverse_index
                .entry(*inner_key)
                .or_insert(RwLock::new(Vec::with_capacity(1)))
                .downgrade()
        });

        let should_insert = !outer_keys.read().unwrap().contains(&key);
        if should_insert {
            let mut w_outer_keys = outer_keys.write().unwrap();
            if !w_outer_keys.contains(&key) {
                w_outer_keys.push(*key);
            }
        }

        let now = solana_sdk::timing::timestamp();
        let last = self.stats.last_report.load(Ordering::Relaxed);
        let should_report = now.saturating_sub(last) > 1000
            && self.stats.last_report.compare_exchange(
                last,
                now,
                Ordering::Relaxed,
                Ordering::Relaxed,
            ) == Ok(last);

        if should_report {
            datapoint_info!(
                self.metrics_name,
                ("num_secondary_keys", self.index.len() as i64, i64),
                (
                    "num_inner_keys",
                    self.stats.num_inner_keys.load(Ordering::Relaxed) as i64,
                    i64
                ),
                (
                    "num_reverse_index_keys",
                    self.reverse_index.len() as i64,
                    i64
                ),
            );
        }
    }

    // Only safe to call from `remove_by_inner_key()` due to asserts
    fn remove_index_entries(&self, outer_key: &Pubkey, removed_inner_key: &Pubkey) {
        let is_outer_key_empty = {
            let inner_key_map = self
                .index
                .get_mut(&outer_key)
                .expect("If we're removing a key, then it must have an entry in the map");
            // If we deleted a pubkey from the reverse_index, then the corresponding entry
            // better exist in this index as well or the two indexes are out of sync!
            assert!(inner_key_map.value().remove_inner_key(&removed_inner_key));
            inner_key_map.is_empty()
        };

        // Delete the `key` if the set of inner keys is empty
        if is_outer_key_empty {
            // Other threads may have interleaved writes to this `key`,
            // so double-check again for its emptiness
            if let Occupied(key_entry) = self.index.entry(*outer_key) {
                if key_entry.get().is_empty() {
                    key_entry.remove();
                }
            }
        }
    }

    pub fn remove_by_inner_key(&self, inner_key: &Pubkey) {
        // Save off which keys in `self.index` had slots removed so we can remove them
        // after we purge the reverse index
        let mut removed_outer_keys: HashSet<Pubkey> = HashSet::new();

        // Check if the entry for `inner_key` in the reverse index is empty
        // and can be removed
        if let Some((_, outer_keys_set)) = self.reverse_index.remove(inner_key) {
            for removed_outer_key in outer_keys_set.into_inner().unwrap().into_iter() {
                removed_outer_keys.insert(removed_outer_key);
            }
        }

        // Remove this value from those keys
        for outer_key in &removed_outer_keys {
            self.remove_index_entries(outer_key, inner_key);
        }

        // Safe to `fetch_sub()` here because a dead key cannot be removed more than once,
        // and the `num_inner_keys` must have been incremented by exactly removed_outer_keys.len()
        // in previous unique insertions of `inner_key` into `self.index` for each key
        // in `removed_outer_keys`
        self.stats
            .num_inner_keys
            .fetch_sub(removed_outer_keys.len() as u64, Ordering::Relaxed);
    }

    pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {
        if let Some(inner_keys_map) = self.index.get(key) {
            inner_keys_map.keys()
        } else {
            vec![]
        }
    }
}