1use {
2 crate::{
3 accounts_db::AccountsDb,
4 append_vec::{StoredAccountMeta, StoredMeta},
5 },
6 safecoin_measure::measure::Measure,
7 solana_metrics::*,
8 solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature},
9 std::collections::{hash_map::Entry, HashMap, HashSet},
10};
11
12#[derive(Default)]
13pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
14 pub total_accounts: usize,
15 pub skipped_accounts: usize,
16 pub notified_accounts: usize,
17 pub elapsed_filtering_us: usize,
18 pub total_pure_notify: usize,
19 pub total_pure_bookeeping: usize,
20 pub elapsed_notifying_us: usize,
21}
22
23impl GeyserPluginNotifyAtSnapshotRestoreStats {
24 pub fn report(&self) {
25 datapoint_info!(
26 "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
27 ("total_accounts", self.total_accounts, i64),
28 ("skipped_accounts", self.skipped_accounts, i64),
29 ("notified_accounts", self.notified_accounts, i64),
30 ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
31 ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
32 ("total_pure_notify_us", self.total_pure_notify, i64),
33 ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
34 );
35 }
36}
37
38impl AccountsDb {
39 pub fn notify_account_restore_from_snapshot(&self) {
43 if self.accounts_update_notifier.is_none() {
44 return;
45 }
46
47 let mut slots = self.storage.all_slots();
48 let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
49 let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
50
51 slots.sort_by(|a, b| b.cmp(a));
52 for slot in slots {
53 self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
54 }
55
56 let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
57 let notifier = &accounts_update_notifier.read().unwrap();
58 notifier.notify_end_of_restore_from_snapshot();
59 notify_stats.report();
60 }
61
62 pub fn notify_account_at_accounts_update(
63 &self,
64 slot: Slot,
65 meta: &StoredMeta,
66 account: &AccountSharedData,
67 txn_signature: &Option<&Signature>,
68 ) {
69 if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
70 let notifier = &accounts_update_notifier.read().unwrap();
71 notifier.notify_account_update(slot, meta, account, txn_signature);
72 }
73 }
74
75 fn notify_accounts_in_slot(
76 &self,
77 slot: Slot,
78 notified_accounts: &mut HashSet<Pubkey>,
79 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
80 ) {
81 let slot_stores = self.storage.get_slot_stores(slot).unwrap();
82
83 let slot_stores = slot_stores.read().unwrap();
84 let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
85 let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
86 for (_, storage_entry) in slot_stores.iter() {
87 let mut accounts = storage_entry.all_accounts();
88 let account_len = accounts.len();
89 notify_stats.total_accounts += account_len;
90 accounts.drain(..).into_iter().for_each(|account| {
91 if notified_accounts.contains(&account.meta.pubkey) {
92 notify_stats.skipped_accounts += 1;
93 return;
94 }
95 match accounts_to_stream.entry(account.meta.pubkey) {
96 Entry::Occupied(mut entry) => {
97 let existing_account = entry.get();
98 if account.meta.write_version > existing_account.meta.write_version {
99 entry.insert(account);
100 } else {
101 notify_stats.skipped_accounts += 1;
102 }
103 }
104 Entry::Vacant(entry) => {
105 entry.insert(account);
106 }
107 }
108 });
109 }
110 measure_filter.stop();
111 notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
112
113 self.notify_filtered_accounts(slot, notified_accounts, &accounts_to_stream, notify_stats);
114 }
115
116 fn notify_filtered_accounts(
117 &self,
118 slot: Slot,
119 notified_accounts: &mut HashSet<Pubkey>,
120 accounts_to_stream: &HashMap<Pubkey, StoredAccountMeta>,
121 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
122 ) {
123 let notifier = self
124 .accounts_update_notifier
125 .as_ref()
126 .unwrap()
127 .read()
128 .unwrap();
129
130 let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
131 for account in accounts_to_stream.values() {
132 let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
133 notifier.notify_account_restore_from_snapshot(slot, account);
134 measure_pure_notify.stop();
135
136 notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
137
138 let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
139 notified_accounts.insert(account.meta.pubkey);
140 measure_bookkeep.stop();
141 notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
142 }
143 notify_stats.notified_accounts += accounts_to_stream.len();
144 measure_notify.stop();
145 notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
146 }
147}
148
149#[cfg(test)]
150pub mod tests {
151 use {
152 crate::{
153 accounts_db::AccountsDb,
154 accounts_update_notifier_interface::{
155 AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
156 },
157 append_vec::{StoredAccountMeta, StoredMeta},
158 },
159 dashmap::DashMap,
160 solana_sdk::{
161 account::{AccountSharedData, ReadableAccount},
162 clock::Slot,
163 pubkey::Pubkey,
164 signature::Signature,
165 },
166 std::sync::{
167 atomic::{AtomicBool, Ordering},
168 Arc, RwLock,
169 },
170 };
171
172 impl AccountsDb {
173 pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
174 self.accounts_update_notifier = notifier;
175 }
176 }
177
178 #[derive(Debug, Default)]
179 struct GeyserTestPlugin {
180 pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
181 pub is_startup_done: AtomicBool,
182 }
183
184 impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
185 fn notify_account_update(
187 &self,
188 slot: Slot,
189 meta: &StoredMeta,
190 account: &AccountSharedData,
191 _txn_signature: &Option<&Signature>,
192 ) {
193 self.accounts_notified
194 .entry(meta.pubkey)
195 .or_default()
196 .push((slot, account.clone()));
197 }
198
199 fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
202 self.accounts_notified
203 .entry(account.meta.pubkey)
204 .or_default()
205 .push((slot, account.clone_account()));
206 }
207
208 fn notify_end_of_restore_from_snapshot(&self) {
209 self.is_startup_done.store(true, Ordering::Relaxed);
210 }
211 }
212
213 #[test]
214 fn test_notify_account_restore_from_snapshot_once_per_slot() {
215 let mut accounts = AccountsDb::new_single_for_tests();
216 let key1 = solana_sdk::pubkey::new_rand();
218 let mut account1_lamports: u64 = 1;
219 let account1 =
220 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
221 let slot0 = 0;
222 accounts.store_uncached(slot0, &[(&key1, &account1)]);
223
224 account1_lamports = 2;
225 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
226 accounts.store_uncached(slot0, &[(&key1, &account1)]);
227 let notifier = GeyserTestPlugin::default();
228
229 let key2 = solana_sdk::pubkey::new_rand();
230 let account2_lamports: u64 = 100;
231 let account2 =
232 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
233
234 accounts.store_uncached(slot0, &[(&key2, &account2)]);
235
236 let notifier = Arc::new(RwLock::new(notifier));
237 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
238
239 accounts.notify_account_restore_from_snapshot();
240
241 let notifier = notifier.write().unwrap();
242 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
243 assert_eq!(
244 notifier.accounts_notified.get(&key1).unwrap()[0]
245 .1
246 .lamports(),
247 account1_lamports
248 );
249 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
250 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
251 assert_eq!(
252 notifier.accounts_notified.get(&key2).unwrap()[0]
253 .1
254 .lamports(),
255 account2_lamports
256 );
257 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
258
259 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
260 }
261
262 #[test]
263 fn test_notify_account_restore_from_snapshot_once_across_slots() {
264 let mut accounts = AccountsDb::new_single_for_tests();
265 let key1 = solana_sdk::pubkey::new_rand();
269 let mut account1_lamports: u64 = 1;
270 let account1 =
271 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
272 let slot0 = 0;
273 accounts.store_uncached(slot0, &[(&key1, &account1)]);
274
275 let key2 = solana_sdk::pubkey::new_rand();
276 let account2_lamports: u64 = 200;
277 let account2 =
278 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
279 accounts.store_uncached(slot0, &[(&key2, &account2)]);
280
281 account1_lamports = 2;
282 let slot1 = 1;
283 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
284 accounts.store_uncached(slot1, &[(&key1, &account1)]);
285 let notifier = GeyserTestPlugin::default();
286
287 let key3 = solana_sdk::pubkey::new_rand();
288 let account3_lamports: u64 = 300;
289 let account3 =
290 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
291 accounts.store_uncached(slot1, &[(&key3, &account3)]);
292
293 let notifier = Arc::new(RwLock::new(notifier));
294 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
295
296 accounts.notify_account_restore_from_snapshot();
297
298 let notifier = notifier.write().unwrap();
299 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
300 assert_eq!(
301 notifier.accounts_notified.get(&key1).unwrap()[0]
302 .1
303 .lamports(),
304 account1_lamports
305 );
306 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
307 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
308 assert_eq!(
309 notifier.accounts_notified.get(&key2).unwrap()[0]
310 .1
311 .lamports(),
312 account2_lamports
313 );
314 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
315 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
316 assert_eq!(
317 notifier.accounts_notified.get(&key3).unwrap()[0]
318 .1
319 .lamports(),
320 account3_lamports
321 );
322 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
323 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
324 }
325
326 #[test]
327 fn test_notify_account_at_accounts_update() {
328 let mut accounts = AccountsDb::new_single_for_tests_with_caching();
329
330 let notifier = GeyserTestPlugin::default();
331
332 let notifier = Arc::new(RwLock::new(notifier));
333 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
334
335 let key1 = solana_sdk::pubkey::new_rand();
339 let account1_lamports1: u64 = 1;
340 let account1 =
341 AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
342 let slot0 = 0;
343 accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
344
345 let key2 = solana_sdk::pubkey::new_rand();
346 let account2_lamports: u64 = 200;
347 let account2 =
348 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
349 accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
350
351 let account1_lamports2 = 2;
352 let slot1 = 1;
353 let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
354 accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
355
356 let key3 = solana_sdk::pubkey::new_rand();
357 let account3_lamports: u64 = 300;
358 let account3 =
359 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
360 accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
361
362 let notifier = notifier.write().unwrap();
363 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
364 assert_eq!(
365 notifier.accounts_notified.get(&key1).unwrap()[0]
366 .1
367 .lamports(),
368 account1_lamports1
369 );
370 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
371 assert_eq!(
372 notifier.accounts_notified.get(&key1).unwrap()[1]
373 .1
374 .lamports(),
375 account1_lamports2
376 );
377 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
378
379 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
380 assert_eq!(
381 notifier.accounts_notified.get(&key2).unwrap()[0]
382 .1
383 .lamports(),
384 account2_lamports
385 );
386 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
387 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
388 assert_eq!(
389 notifier.accounts_notified.get(&key3).unwrap()[0]
390 .1
391 .lamports(),
392 account3_lamports
393 );
394 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
395 }
396}