solana_runtime/accounts_db/
geyser_plugin_utils.rs

1use {
2    crate::{
3        accounts_db::AccountsDb,
4        append_vec::{StoredAccountMeta, StoredMeta},
5    },
6    safecoin_measure::measure::Measure,
7    solana_metrics::*,
8    solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey, signature::Signature},
9    std::collections::{hash_map::Entry, HashMap, HashSet},
10};
11
12#[derive(Default)]
13pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
14    pub total_accounts: usize,
15    pub skipped_accounts: usize,
16    pub notified_accounts: usize,
17    pub elapsed_filtering_us: usize,
18    pub total_pure_notify: usize,
19    pub total_pure_bookeeping: usize,
20    pub elapsed_notifying_us: usize,
21}
22
23impl GeyserPluginNotifyAtSnapshotRestoreStats {
24    pub fn report(&self) {
25        datapoint_info!(
26            "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
27            ("total_accounts", self.total_accounts, i64),
28            ("skipped_accounts", self.skipped_accounts, i64),
29            ("notified_accounts", self.notified_accounts, i64),
30            ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
31            ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
32            ("total_pure_notify_us", self.total_pure_notify, i64),
33            ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
34        );
35    }
36}
37
38impl AccountsDb {
39    /// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed
40    /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
41    /// multiple times only the last write (with highest write_version) is notified.
42    pub fn notify_account_restore_from_snapshot(&self) {
43        if self.accounts_update_notifier.is_none() {
44            return;
45        }
46
47        let mut slots = self.storage.all_slots();
48        let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
49        let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
50
51        slots.sort_by(|a, b| b.cmp(a));
52        for slot in slots {
53            self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
54        }
55
56        let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
57        let notifier = &accounts_update_notifier.read().unwrap();
58        notifier.notify_end_of_restore_from_snapshot();
59        notify_stats.report();
60    }
61
62    pub fn notify_account_at_accounts_update(
63        &self,
64        slot: Slot,
65        meta: &StoredMeta,
66        account: &AccountSharedData,
67        txn_signature: &Option<&Signature>,
68    ) {
69        if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
70            let notifier = &accounts_update_notifier.read().unwrap();
71            notifier.notify_account_update(slot, meta, account, txn_signature);
72        }
73    }
74
75    fn notify_accounts_in_slot(
76        &self,
77        slot: Slot,
78        notified_accounts: &mut HashSet<Pubkey>,
79        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
80    ) {
81        let slot_stores = self.storage.get_slot_stores(slot).unwrap();
82
83        let slot_stores = slot_stores.read().unwrap();
84        let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
85        let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
86        for (_, storage_entry) in slot_stores.iter() {
87            let mut accounts = storage_entry.all_accounts();
88            let account_len = accounts.len();
89            notify_stats.total_accounts += account_len;
90            accounts.drain(..).into_iter().for_each(|account| {
91                if notified_accounts.contains(&account.meta.pubkey) {
92                    notify_stats.skipped_accounts += 1;
93                    return;
94                }
95                match accounts_to_stream.entry(account.meta.pubkey) {
96                    Entry::Occupied(mut entry) => {
97                        let existing_account = entry.get();
98                        if account.meta.write_version > existing_account.meta.write_version {
99                            entry.insert(account);
100                        } else {
101                            notify_stats.skipped_accounts += 1;
102                        }
103                    }
104                    Entry::Vacant(entry) => {
105                        entry.insert(account);
106                    }
107                }
108            });
109        }
110        measure_filter.stop();
111        notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
112
113        self.notify_filtered_accounts(slot, notified_accounts, &accounts_to_stream, notify_stats);
114    }
115
116    fn notify_filtered_accounts(
117        &self,
118        slot: Slot,
119        notified_accounts: &mut HashSet<Pubkey>,
120        accounts_to_stream: &HashMap<Pubkey, StoredAccountMeta>,
121        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
122    ) {
123        let notifier = self
124            .accounts_update_notifier
125            .as_ref()
126            .unwrap()
127            .read()
128            .unwrap();
129
130        let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
131        for account in accounts_to_stream.values() {
132            let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
133            notifier.notify_account_restore_from_snapshot(slot, account);
134            measure_pure_notify.stop();
135
136            notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
137
138            let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
139            notified_accounts.insert(account.meta.pubkey);
140            measure_bookkeep.stop();
141            notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
142        }
143        notify_stats.notified_accounts += accounts_to_stream.len();
144        measure_notify.stop();
145        notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
146    }
147}
148
149#[cfg(test)]
150pub mod tests {
151    use {
152        crate::{
153            accounts_db::AccountsDb,
154            accounts_update_notifier_interface::{
155                AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
156            },
157            append_vec::{StoredAccountMeta, StoredMeta},
158        },
159        dashmap::DashMap,
160        solana_sdk::{
161            account::{AccountSharedData, ReadableAccount},
162            clock::Slot,
163            pubkey::Pubkey,
164            signature::Signature,
165        },
166        std::sync::{
167            atomic::{AtomicBool, Ordering},
168            Arc, RwLock,
169        },
170    };
171
172    impl AccountsDb {
173        pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
174            self.accounts_update_notifier = notifier;
175        }
176    }
177
178    #[derive(Debug, Default)]
179    struct GeyserTestPlugin {
180        pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
181        pub is_startup_done: AtomicBool,
182    }
183
184    impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
185        /// Notified when an account is updated at runtime, due to transaction activities
186        fn notify_account_update(
187            &self,
188            slot: Slot,
189            meta: &StoredMeta,
190            account: &AccountSharedData,
191            _txn_signature: &Option<&Signature>,
192        ) {
193            self.accounts_notified
194                .entry(meta.pubkey)
195                .or_default()
196                .push((slot, account.clone()));
197        }
198
199        /// Notified when the AccountsDb is initialized at start when restored
200        /// from a snapshot.
201        fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
202            self.accounts_notified
203                .entry(account.meta.pubkey)
204                .or_default()
205                .push((slot, account.clone_account()));
206        }
207
208        fn notify_end_of_restore_from_snapshot(&self) {
209            self.is_startup_done.store(true, Ordering::Relaxed);
210        }
211    }
212
213    #[test]
214    fn test_notify_account_restore_from_snapshot_once_per_slot() {
215        let mut accounts = AccountsDb::new_single_for_tests();
216        // Account with key1 is updated twice in the store -- should only get notified once.
217        let key1 = solana_sdk::pubkey::new_rand();
218        let mut account1_lamports: u64 = 1;
219        let account1 =
220            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
221        let slot0 = 0;
222        accounts.store_uncached(slot0, &[(&key1, &account1)]);
223
224        account1_lamports = 2;
225        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
226        accounts.store_uncached(slot0, &[(&key1, &account1)]);
227        let notifier = GeyserTestPlugin::default();
228
229        let key2 = solana_sdk::pubkey::new_rand();
230        let account2_lamports: u64 = 100;
231        let account2 =
232            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
233
234        accounts.store_uncached(slot0, &[(&key2, &account2)]);
235
236        let notifier = Arc::new(RwLock::new(notifier));
237        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
238
239        accounts.notify_account_restore_from_snapshot();
240
241        let notifier = notifier.write().unwrap();
242        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
243        assert_eq!(
244            notifier.accounts_notified.get(&key1).unwrap()[0]
245                .1
246                .lamports(),
247            account1_lamports
248        );
249        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
250        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
251        assert_eq!(
252            notifier.accounts_notified.get(&key2).unwrap()[0]
253                .1
254                .lamports(),
255            account2_lamports
256        );
257        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
258
259        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
260    }
261
262    #[test]
263    fn test_notify_account_restore_from_snapshot_once_across_slots() {
264        let mut accounts = AccountsDb::new_single_for_tests();
265        // Account with key1 is updated twice in two different slots -- should only get notified once.
266        // Account with key2 is updated slot0, should get notified once
267        // Account with key3 is updated in slot1, should get notified once
268        let key1 = solana_sdk::pubkey::new_rand();
269        let mut account1_lamports: u64 = 1;
270        let account1 =
271            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
272        let slot0 = 0;
273        accounts.store_uncached(slot0, &[(&key1, &account1)]);
274
275        let key2 = solana_sdk::pubkey::new_rand();
276        let account2_lamports: u64 = 200;
277        let account2 =
278            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
279        accounts.store_uncached(slot0, &[(&key2, &account2)]);
280
281        account1_lamports = 2;
282        let slot1 = 1;
283        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
284        accounts.store_uncached(slot1, &[(&key1, &account1)]);
285        let notifier = GeyserTestPlugin::default();
286
287        let key3 = solana_sdk::pubkey::new_rand();
288        let account3_lamports: u64 = 300;
289        let account3 =
290            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
291        accounts.store_uncached(slot1, &[(&key3, &account3)]);
292
293        let notifier = Arc::new(RwLock::new(notifier));
294        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
295
296        accounts.notify_account_restore_from_snapshot();
297
298        let notifier = notifier.write().unwrap();
299        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
300        assert_eq!(
301            notifier.accounts_notified.get(&key1).unwrap()[0]
302                .1
303                .lamports(),
304            account1_lamports
305        );
306        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
307        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
308        assert_eq!(
309            notifier.accounts_notified.get(&key2).unwrap()[0]
310                .1
311                .lamports(),
312            account2_lamports
313        );
314        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
315        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
316        assert_eq!(
317            notifier.accounts_notified.get(&key3).unwrap()[0]
318                .1
319                .lamports(),
320            account3_lamports
321        );
322        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
323        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
324    }
325
326    #[test]
327    fn test_notify_account_at_accounts_update() {
328        let mut accounts = AccountsDb::new_single_for_tests_with_caching();
329
330        let notifier = GeyserTestPlugin::default();
331
332        let notifier = Arc::new(RwLock::new(notifier));
333        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
334
335        // Account with key1 is updated twice in two different slots -- should only get notified twice.
336        // Account with key2 is updated slot0, should get notified once
337        // Account with key3 is updated in slot1, should get notified once
338        let key1 = solana_sdk::pubkey::new_rand();
339        let account1_lamports1: u64 = 1;
340        let account1 =
341            AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
342        let slot0 = 0;
343        accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
344
345        let key2 = solana_sdk::pubkey::new_rand();
346        let account2_lamports: u64 = 200;
347        let account2 =
348            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
349        accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
350
351        let account1_lamports2 = 2;
352        let slot1 = 1;
353        let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
354        accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
355
356        let key3 = solana_sdk::pubkey::new_rand();
357        let account3_lamports: u64 = 300;
358        let account3 =
359            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
360        accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
361
362        let notifier = notifier.write().unwrap();
363        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
364        assert_eq!(
365            notifier.accounts_notified.get(&key1).unwrap()[0]
366                .1
367                .lamports(),
368            account1_lamports1
369        );
370        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
371        assert_eq!(
372            notifier.accounts_notified.get(&key1).unwrap()[1]
373                .1
374                .lamports(),
375            account1_lamports2
376        );
377        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
378
379        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
380        assert_eq!(
381            notifier.accounts_notified.get(&key2).unwrap()[0]
382                .1
383                .lamports(),
384            account2_lamports
385        );
386        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
387        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
388        assert_eq!(
389            notifier.accounts_notified.get(&key3).unwrap()[0]
390                .1
391                .lamports(),
392            account3_lamports
393        );
394        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
395    }
396}