solana_accounts_db/accounts_index/
secondary.rs1use {
2 dashmap::{mapref::entry::Entry::Occupied, DashMap},
3 log::*,
4 solana_pubkey::Pubkey,
5 solana_time_utils::AtomicInterval,
6 std::{
7 collections::HashSet,
8 fmt::Debug,
9 sync::{
10 atomic::{AtomicU64, Ordering},
11 RwLock,
12 },
13 },
14};
15
16#[derive(Debug, Default, Clone, PartialEq)]
17pub struct AccountSecondaryIndexes {
18 pub keys: Option<AccountSecondaryIndexesIncludeExclude>,
19 pub indexes: HashSet<AccountIndex>,
20}
21
22impl AccountSecondaryIndexes {
23 pub fn is_empty(&self) -> bool {
24 self.indexes.is_empty()
25 }
26 pub fn contains(&self, index: &AccountIndex) -> bool {
27 self.indexes.contains(index)
28 }
29 pub fn include_key(&self, key: &Pubkey) -> bool {
30 match &self.keys {
31 Some(options) => options.exclude ^ options.keys.contains(key),
32 None => true, }
34 }
35}
36
37#[derive(Debug, PartialEq, Eq, Clone)]
38pub struct AccountSecondaryIndexesIncludeExclude {
39 pub exclude: bool,
40 pub keys: HashSet<Pubkey>,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum AccountIndex {
45 ProgramId,
46 SplTokenMint,
47 SplTokenOwner,
48}
49
50#[derive(Debug, Clone, Copy)]
51pub enum IndexKey {
52 ProgramId(Pubkey),
53 SplTokenMint(Pubkey),
54 SplTokenOwner(Pubkey),
55}
56
57type SecondaryReverseIndexEntry = RwLock<Vec<Pubkey>>;
62
63pub trait SecondaryIndexEntry: Debug {
64 fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64);
65 fn remove_inner_key(&self, key: &Pubkey) -> bool;
67 fn is_empty(&self) -> bool;
68 fn keys(&self) -> Vec<Pubkey>;
69 fn len(&self) -> usize;
70}
71
72#[derive(Debug, Default)]
73struct SecondaryIndexStats {
74 last_report: AtomicInterval,
75 num_inner_keys: AtomicU64,
76}
77
78#[derive(Debug, Default)]
79pub struct DashMapSecondaryIndexEntry {
80 account_keys: DashMap<Pubkey, ()>,
81}
82
83impl SecondaryIndexEntry for DashMapSecondaryIndexEntry {
84 fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
85 if self.account_keys.get(key).is_none() {
86 self.account_keys.entry(*key).or_insert_with(|| {
87 inner_keys_count.fetch_add(1, Ordering::Relaxed);
88 });
89 }
90 }
91
92 fn remove_inner_key(&self, key: &Pubkey) -> bool {
93 self.account_keys.remove(key).is_some()
94 }
95
96 fn is_empty(&self) -> bool {
97 self.account_keys.is_empty()
98 }
99
100 fn keys(&self) -> Vec<Pubkey> {
101 self.account_keys
102 .iter()
103 .map(|entry_ref| *entry_ref.key())
104 .collect()
105 }
106
107 fn len(&self) -> usize {
108 self.account_keys.len()
109 }
110}
111
112#[derive(Debug, Default)]
113pub struct RwLockSecondaryIndexEntry {
114 account_keys: RwLock<HashSet<Pubkey>>,
115}
116
117impl SecondaryIndexEntry for RwLockSecondaryIndexEntry {
118 fn insert_if_not_exists(&self, key: &Pubkey, inner_keys_count: &AtomicU64) {
119 if self.account_keys.read().unwrap().contains(key) {
120 return;
122 }
123
124 let was_newly_inserted = self.account_keys.write().unwrap().insert(*key);
125 if was_newly_inserted {
126 inner_keys_count.fetch_add(1, Ordering::Relaxed);
127 }
128 }
129
130 fn remove_inner_key(&self, key: &Pubkey) -> bool {
131 self.account_keys.write().unwrap().remove(key)
132 }
133
134 fn is_empty(&self) -> bool {
135 self.account_keys.read().unwrap().is_empty()
136 }
137
138 fn keys(&self) -> Vec<Pubkey> {
139 self.account_keys.read().unwrap().iter().cloned().collect()
140 }
141
142 fn len(&self) -> usize {
143 self.account_keys.read().unwrap().len()
144 }
145}
146
147#[derive(Debug, Default)]
148pub struct SecondaryIndex<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send> {
149 metrics_name: &'static str,
150 pub index: DashMap<Pubkey, SecondaryIndexEntryType>,
152 pub reverse_index: DashMap<Pubkey, SecondaryReverseIndexEntry>,
153 stats: SecondaryIndexStats,
154}
155
156impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
157 SecondaryIndex<SecondaryIndexEntryType>
158{
159 pub fn new(metrics_name: &'static str) -> Self {
160 Self {
161 metrics_name,
162 ..Self::default()
163 }
164 }
165
166 pub fn insert(&self, key: &Pubkey, inner_key: &Pubkey) {
167 {
168 let pubkeys_map = self
169 .index
170 .get(key)
171 .unwrap_or_else(|| self.index.entry(*key).or_default().downgrade());
172
173 pubkeys_map.insert_if_not_exists(inner_key, &self.stats.num_inner_keys);
174 }
175
176 {
177 let outer_keys = self.reverse_index.get(inner_key).unwrap_or_else(|| {
178 self.reverse_index
179 .entry(*inner_key)
180 .or_insert(RwLock::new(Vec::with_capacity(1)))
181 .downgrade()
182 });
183
184 let should_insert = !outer_keys.read().unwrap().contains(key);
185 if should_insert {
186 let mut w_outer_keys = outer_keys.write().unwrap();
187 if !w_outer_keys.contains(key) {
188 w_outer_keys.push(*key);
189 }
190 }
191 }
192
193 if self.stats.last_report.should_update(1000) {
194 datapoint_info!(
195 self.metrics_name,
196 ("num_secondary_keys", self.index.len() as i64, i64),
197 (
198 "num_inner_keys",
199 self.stats.num_inner_keys.load(Ordering::Relaxed) as i64,
200 i64
201 ),
202 (
203 "num_reverse_index_keys",
204 self.reverse_index.len() as i64,
205 i64
206 ),
207 );
208 }
209 }
210
211 fn remove_index_entries(&self, outer_key: &Pubkey, removed_inner_key: &Pubkey) {
213 let is_outer_key_empty = {
214 let inner_key_map = self
215 .index
216 .get_mut(outer_key)
217 .expect("If we're removing a key, then it must have an entry in the map");
218 assert!(inner_key_map.value().remove_inner_key(removed_inner_key));
221 inner_key_map.is_empty()
222 };
223
224 if is_outer_key_empty {
226 if let Occupied(key_entry) = self.index.entry(*outer_key) {
229 if key_entry.get().is_empty() {
230 key_entry.remove();
231 }
232 }
233 }
234 }
235
236 pub fn remove_by_inner_key(&self, inner_key: &Pubkey) {
237 let mut removed_outer_keys: HashSet<Pubkey> = HashSet::new();
240
241 if let Some((_, outer_keys_set)) = self.reverse_index.remove(inner_key) {
244 for removed_outer_key in outer_keys_set.into_inner().unwrap().into_iter() {
245 removed_outer_keys.insert(removed_outer_key);
246 }
247 }
248
249 for outer_key in &removed_outer_keys {
251 self.remove_index_entries(outer_key, inner_key);
252 }
253
254 self.stats
259 .num_inner_keys
260 .fetch_sub(removed_outer_keys.len() as u64, Ordering::Relaxed);
261 }
262
263 pub fn get(&self, key: &Pubkey) -> Vec<Pubkey> {
264 if let Some(inner_keys_map) = self.index.get(key) {
265 inner_keys_map.keys()
266 } else {
267 vec![]
268 }
269 }
270
271 pub fn log_contents(&self) {
273 let mut entries = self
274 .index
275 .iter()
276 .map(|entry| (entry.value().len(), *entry.key()))
277 .collect::<Vec<_>>();
278 entries.sort_unstable();
279 entries
280 .iter()
281 .rev()
282 .take(20)
283 .for_each(|(v, k)| info!("owner: {k}, accounts: {v}"));
284 }
285}