1use {
2 crate::{
3 accounts_db::AccountsDb,
4 append_vec::{StoredAccountMeta, StoredMeta},
5 },
6 solana_measure::measure::Measure,
7 solana_metrics::*,
8 solana_sdk::{
9 account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
10 },
11 std::collections::{HashMap, HashSet},
12};
13
14#[derive(Default)]
15pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
16 pub total_accounts: usize,
17 pub skipped_accounts: usize,
18 pub notified_accounts: usize,
19 pub elapsed_filtering_us: usize,
20 pub total_pure_notify: usize,
21 pub total_pure_bookeeping: usize,
22 pub elapsed_notifying_us: usize,
23}
24
25impl GeyserPluginNotifyAtSnapshotRestoreStats {
26 pub fn report(&self) {
27 datapoint_info!(
28 "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
29 ("total_accounts", self.total_accounts, i64),
30 ("skipped_accounts", self.skipped_accounts, i64),
31 ("notified_accounts", self.notified_accounts, i64),
32 ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
33 ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
34 ("total_pure_notify_us", self.total_pure_notify, i64),
35 ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
36 );
37 }
38}
39
40impl AccountsDb {
41 pub fn notify_account_restore_from_snapshot(&self) {
45 if self.accounts_update_notifier.is_none() {
46 return;
47 }
48
49 let mut slots = self.storage.all_slots();
50 let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
51 let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
52
53 slots.sort_by(|a, b| b.cmp(a));
54 for slot in slots {
55 self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
56 }
57
58 let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
59 let notifier = &accounts_update_notifier.read().unwrap();
60 notifier.notify_end_of_restore_from_snapshot();
61 notify_stats.report();
62 }
63
64 pub fn notify_account_at_accounts_update<P>(
65 &self,
66 slot: Slot,
67 account: &AccountSharedData,
68 txn: &Option<&SanitizedTransaction>,
69 pubkey: &Pubkey,
70 write_version_producer: &mut P,
71 ) where
72 P: Iterator<Item = u64>,
73 {
74 if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
75 let notifier = &accounts_update_notifier.read().unwrap();
76 notifier.notify_account_update(
77 slot,
78 account,
79 txn,
80 pubkey,
81 write_version_producer.next().unwrap(),
82 );
83 }
84 }
85
86 fn notify_accounts_in_slot(
87 &self,
88 slot: Slot,
89 notified_accounts: &mut HashSet<Pubkey>,
90 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
91 ) {
92 let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
93
94 let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
95 let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
96 let accounts = storage_entry.accounts.account_iter();
97 let mut account_len = 0;
98 accounts.for_each(|account| {
99 account_len += 1;
100 if notified_accounts.contains(&account.meta.pubkey) {
101 notify_stats.skipped_accounts += 1;
102 return;
103 }
104
105 accounts_to_stream.insert(account.meta.pubkey, account);
109 });
110 notify_stats.total_accounts += account_len;
111 measure_filter.stop();
112 notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
113
114 self.notify_filtered_accounts(slot, notified_accounts, accounts_to_stream, notify_stats);
115 }
116
117 fn notify_filtered_accounts(
118 &self,
119 slot: Slot,
120 notified_accounts: &mut HashSet<Pubkey>,
121 mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>,
122 notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
123 ) {
124 let notifier = self
125 .accounts_update_notifier
126 .as_ref()
127 .unwrap()
128 .read()
129 .unwrap();
130
131 let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
132 let local_write_version = 0;
133 for (_, mut account) in accounts_to_stream.drain() {
134 let meta = StoredMeta {
140 write_version_obsolete: local_write_version,
141 ..*account.meta
142 };
143 account.meta = &meta;
144 let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
145 notifier.notify_account_restore_from_snapshot(slot, &account);
146 measure_pure_notify.stop();
147
148 notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
149
150 let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
151 notified_accounts.insert(account.meta.pubkey);
152 measure_bookkeep.stop();
153 notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
154 }
155 notify_stats.notified_accounts += accounts_to_stream.len();
156 measure_notify.stop();
157 notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
158 }
159}
160
161#[cfg(test)]
162pub mod tests {
163 use {
164 crate::{
165 accounts_db::AccountsDb,
166 accounts_update_notifier_interface::{
167 AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
168 },
169 append_vec::StoredAccountMeta,
170 },
171 dashmap::DashMap,
172 solana_sdk::{
173 account::{AccountSharedData, ReadableAccount},
174 clock::Slot,
175 pubkey::Pubkey,
176 transaction::SanitizedTransaction,
177 },
178 std::sync::{
179 atomic::{AtomicBool, Ordering},
180 Arc, RwLock,
181 },
182 };
183
184 impl AccountsDb {
185 pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
186 self.accounts_update_notifier = notifier;
187 }
188 }
189
190 #[derive(Debug, Default)]
191 struct GeyserTestPlugin {
192 pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
193 pub is_startup_done: AtomicBool,
194 }
195
196 impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
197 fn notify_account_update(
199 &self,
200 slot: Slot,
201 account: &AccountSharedData,
202 _txn: &Option<&SanitizedTransaction>,
203 pubkey: &Pubkey,
204 _write_version: u64,
205 ) {
206 self.accounts_notified
207 .entry(*pubkey)
208 .or_default()
209 .push((slot, account.clone()));
210 }
211
212 fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
215 self.accounts_notified
216 .entry(account.meta.pubkey)
217 .or_default()
218 .push((slot, account.clone_account()));
219 }
220
221 fn notify_end_of_restore_from_snapshot(&self) {
222 self.is_startup_done.store(true, Ordering::Relaxed);
223 }
224 }
225
226 #[test]
227 fn test_notify_account_restore_from_snapshot_once_per_slot() {
228 let mut accounts = AccountsDb::new_single_for_tests();
229 let key1 = solana_sdk::pubkey::new_rand();
231 let mut account1_lamports: u64 = 1;
232 let account1 =
233 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
234 let slot0 = 0;
235 accounts.store_uncached(slot0, &[(&key1, &account1)]);
236
237 account1_lamports = 2;
238 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
239 accounts.store_uncached(slot0, &[(&key1, &account1)]);
240 let notifier = GeyserTestPlugin::default();
241
242 let key2 = solana_sdk::pubkey::new_rand();
243 let account2_lamports: u64 = 100;
244 let account2 =
245 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
246
247 accounts.store_uncached(slot0, &[(&key2, &account2)]);
248
249 let notifier = Arc::new(RwLock::new(notifier));
250 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
251
252 accounts.notify_account_restore_from_snapshot();
253
254 let notifier = notifier.write().unwrap();
255 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
256 assert_eq!(
257 notifier.accounts_notified.get(&key1).unwrap()[0]
258 .1
259 .lamports(),
260 account1_lamports
261 );
262 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
263 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
264 assert_eq!(
265 notifier.accounts_notified.get(&key2).unwrap()[0]
266 .1
267 .lamports(),
268 account2_lamports
269 );
270 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
271
272 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
273 }
274
275 #[test]
276 fn test_notify_account_restore_from_snapshot_once_across_slots() {
277 let mut accounts = AccountsDb::new_single_for_tests();
278 let key1 = solana_sdk::pubkey::new_rand();
282 let mut account1_lamports: u64 = 1;
283 let account1 =
284 AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
285 let slot0 = 0;
286 accounts.store_uncached(slot0, &[(&key1, &account1)]);
287
288 let key2 = solana_sdk::pubkey::new_rand();
289 let account2_lamports: u64 = 200;
290 let account2 =
291 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
292 accounts.store_uncached(slot0, &[(&key2, &account2)]);
293
294 account1_lamports = 2;
295 let slot1 = 1;
296 let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
297 accounts.store_uncached(slot1, &[(&key1, &account1)]);
298 let notifier = GeyserTestPlugin::default();
299
300 let key3 = solana_sdk::pubkey::new_rand();
301 let account3_lamports: u64 = 300;
302 let account3 =
303 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
304 accounts.store_uncached(slot1, &[(&key3, &account3)]);
305
306 let notifier = Arc::new(RwLock::new(notifier));
307 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
308
309 accounts.notify_account_restore_from_snapshot();
310
311 let notifier = notifier.write().unwrap();
312 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
313 assert_eq!(
314 notifier.accounts_notified.get(&key1).unwrap()[0]
315 .1
316 .lamports(),
317 account1_lamports
318 );
319 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
320 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
321 assert_eq!(
322 notifier.accounts_notified.get(&key2).unwrap()[0]
323 .1
324 .lamports(),
325 account2_lamports
326 );
327 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
328 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
329 assert_eq!(
330 notifier.accounts_notified.get(&key3).unwrap()[0]
331 .1
332 .lamports(),
333 account3_lamports
334 );
335 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
336 assert!(notifier.is_startup_done.load(Ordering::Relaxed));
337 }
338
339 #[test]
340 fn test_notify_account_at_accounts_update() {
341 let mut accounts = AccountsDb::new_single_for_tests_with_caching();
342
343 let notifier = GeyserTestPlugin::default();
344
345 let notifier = Arc::new(RwLock::new(notifier));
346 accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
347
348 let key1 = solana_sdk::pubkey::new_rand();
352 let account1_lamports1: u64 = 1;
353 let account1 =
354 AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
355 let slot0 = 0;
356 accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
357
358 let key2 = solana_sdk::pubkey::new_rand();
359 let account2_lamports: u64 = 200;
360 let account2 =
361 AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
362 accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
363
364 let account1_lamports2 = 2;
365 let slot1 = 1;
366 let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
367 accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
368
369 let key3 = solana_sdk::pubkey::new_rand();
370 let account3_lamports: u64 = 300;
371 let account3 =
372 AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
373 accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
374
375 let notifier = notifier.write().unwrap();
376 assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
377 assert_eq!(
378 notifier.accounts_notified.get(&key1).unwrap()[0]
379 .1
380 .lamports(),
381 account1_lamports1
382 );
383 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
384 assert_eq!(
385 notifier.accounts_notified.get(&key1).unwrap()[1]
386 .1
387 .lamports(),
388 account1_lamports2
389 );
390 assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
391
392 assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
393 assert_eq!(
394 notifier.accounts_notified.get(&key2).unwrap()[0]
395 .1
396 .lamports(),
397 account2_lamports
398 );
399 assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
400 assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
401 assert_eq!(
402 notifier.accounts_notified.get(&key3).unwrap()[0]
403 .1
404 .lamports(),
405 account3_lamports
406 );
407 assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
408 }
409}