solana_runtime/accounts_db/
geyser_plugin_utils.rs

1use {
2    crate::{
3        accounts_db::AccountsDb,
4        append_vec::{StoredAccountMeta, StoredMeta},
5    },
6    solana_measure::measure::Measure,
7    solana_metrics::*,
8    solana_sdk::{
9        account::AccountSharedData, clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction,
10    },
11    std::collections::{HashMap, HashSet},
12};
13
14#[derive(Default)]
15pub struct GeyserPluginNotifyAtSnapshotRestoreStats {
16    pub total_accounts: usize,
17    pub skipped_accounts: usize,
18    pub notified_accounts: usize,
19    pub elapsed_filtering_us: usize,
20    pub total_pure_notify: usize,
21    pub total_pure_bookeeping: usize,
22    pub elapsed_notifying_us: usize,
23}
24
25impl GeyserPluginNotifyAtSnapshotRestoreStats {
26    pub fn report(&self) {
27        datapoint_info!(
28            "accountsdb_plugin_notify_account_restore_from_snapshot_summary",
29            ("total_accounts", self.total_accounts, i64),
30            ("skipped_accounts", self.skipped_accounts, i64),
31            ("notified_accounts", self.notified_accounts, i64),
32            ("elapsed_filtering_us", self.elapsed_filtering_us, i64),
33            ("elapsed_notifying_us", self.elapsed_notifying_us, i64),
34            ("total_pure_notify_us", self.total_pure_notify, i64),
35            ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
36        );
37    }
38}
39
40impl AccountsDb {
41    /// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed
42    /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
43    /// multiple times only the last write (with highest write_version) is notified.
44    pub fn notify_account_restore_from_snapshot(&self) {
45        if self.accounts_update_notifier.is_none() {
46            return;
47        }
48
49        let mut slots = self.storage.all_slots();
50        let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
51        let mut notify_stats = GeyserPluginNotifyAtSnapshotRestoreStats::default();
52
53        slots.sort_by(|a, b| b.cmp(a));
54        for slot in slots {
55            self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
56        }
57
58        let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
59        let notifier = &accounts_update_notifier.read().unwrap();
60        notifier.notify_end_of_restore_from_snapshot();
61        notify_stats.report();
62    }
63
64    pub fn notify_account_at_accounts_update<P>(
65        &self,
66        slot: Slot,
67        account: &AccountSharedData,
68        txn: &Option<&SanitizedTransaction>,
69        pubkey: &Pubkey,
70        write_version_producer: &mut P,
71    ) where
72        P: Iterator<Item = u64>,
73    {
74        if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
75            let notifier = &accounts_update_notifier.read().unwrap();
76            notifier.notify_account_update(
77                slot,
78                account,
79                txn,
80                pubkey,
81                write_version_producer.next().unwrap(),
82            );
83        }
84    }
85
86    fn notify_accounts_in_slot(
87        &self,
88        slot: Slot,
89        notified_accounts: &mut HashSet<Pubkey>,
90        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
91    ) {
92        let storage_entry = self.storage.get_slot_storage_entry(slot).unwrap();
93
94        let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
95        let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
96        let accounts = storage_entry.accounts.account_iter();
97        let mut account_len = 0;
98        accounts.for_each(|account| {
99            account_len += 1;
100            if notified_accounts.contains(&account.meta.pubkey) {
101                notify_stats.skipped_accounts += 1;
102                return;
103            }
104
105            // later entries in the same slot are more recent and override earlier accounts for the same pubkey
106            // We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version.
107            // As long as all accounts for this slot are in 1 append vec that can be itereated olest to newest.
108            accounts_to_stream.insert(account.meta.pubkey, account);
109        });
110        notify_stats.total_accounts += account_len;
111        measure_filter.stop();
112        notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
113
114        self.notify_filtered_accounts(slot, notified_accounts, accounts_to_stream, notify_stats);
115    }
116
117    fn notify_filtered_accounts(
118        &self,
119        slot: Slot,
120        notified_accounts: &mut HashSet<Pubkey>,
121        mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>,
122        notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
123    ) {
124        let notifier = self
125            .accounts_update_notifier
126            .as_ref()
127            .unwrap()
128            .read()
129            .unwrap();
130
131        let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
132        let local_write_version = 0;
133        for (_, mut account) in accounts_to_stream.drain() {
134            // We do not need to rely on the specific write_version read from the append vec.
135            // So, overwrite the write_version with something that works.
136            // 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
137            // write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here.
138            // Passing 0 for everyone's write_version is sufficiently correct.
139            let meta = StoredMeta {
140                write_version_obsolete: local_write_version,
141                ..*account.meta
142            };
143            account.meta = &meta;
144            let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
145            notifier.notify_account_restore_from_snapshot(slot, &account);
146            measure_pure_notify.stop();
147
148            notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
149
150            let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
151            notified_accounts.insert(account.meta.pubkey);
152            measure_bookkeep.stop();
153            notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
154        }
155        notify_stats.notified_accounts += accounts_to_stream.len();
156        measure_notify.stop();
157        notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
158    }
159}
160
161#[cfg(test)]
162pub mod tests {
163    use {
164        crate::{
165            accounts_db::AccountsDb,
166            accounts_update_notifier_interface::{
167                AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
168            },
169            append_vec::StoredAccountMeta,
170        },
171        dashmap::DashMap,
172        solana_sdk::{
173            account::{AccountSharedData, ReadableAccount},
174            clock::Slot,
175            pubkey::Pubkey,
176            transaction::SanitizedTransaction,
177        },
178        std::sync::{
179            atomic::{AtomicBool, Ordering},
180            Arc, RwLock,
181        },
182    };
183
184    impl AccountsDb {
185        pub fn set_geyser_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
186            self.accounts_update_notifier = notifier;
187        }
188    }
189
190    #[derive(Debug, Default)]
191    struct GeyserTestPlugin {
192        pub accounts_notified: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
193        pub is_startup_done: AtomicBool,
194    }
195
196    impl AccountsUpdateNotifierInterface for GeyserTestPlugin {
197        /// Notified when an account is updated at runtime, due to transaction activities
198        fn notify_account_update(
199            &self,
200            slot: Slot,
201            account: &AccountSharedData,
202            _txn: &Option<&SanitizedTransaction>,
203            pubkey: &Pubkey,
204            _write_version: u64,
205        ) {
206            self.accounts_notified
207                .entry(*pubkey)
208                .or_default()
209                .push((slot, account.clone()));
210        }
211
212        /// Notified when the AccountsDb is initialized at start when restored
213        /// from a snapshot.
214        fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
215            self.accounts_notified
216                .entry(account.meta.pubkey)
217                .or_default()
218                .push((slot, account.clone_account()));
219        }
220
221        fn notify_end_of_restore_from_snapshot(&self) {
222            self.is_startup_done.store(true, Ordering::Relaxed);
223        }
224    }
225
226    #[test]
227    fn test_notify_account_restore_from_snapshot_once_per_slot() {
228        let mut accounts = AccountsDb::new_single_for_tests();
229        // Account with key1 is updated twice in the store -- should only get notified once.
230        let key1 = solana_sdk::pubkey::new_rand();
231        let mut account1_lamports: u64 = 1;
232        let account1 =
233            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
234        let slot0 = 0;
235        accounts.store_uncached(slot0, &[(&key1, &account1)]);
236
237        account1_lamports = 2;
238        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
239        accounts.store_uncached(slot0, &[(&key1, &account1)]);
240        let notifier = GeyserTestPlugin::default();
241
242        let key2 = solana_sdk::pubkey::new_rand();
243        let account2_lamports: u64 = 100;
244        let account2 =
245            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
246
247        accounts.store_uncached(slot0, &[(&key2, &account2)]);
248
249        let notifier = Arc::new(RwLock::new(notifier));
250        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
251
252        accounts.notify_account_restore_from_snapshot();
253
254        let notifier = notifier.write().unwrap();
255        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
256        assert_eq!(
257            notifier.accounts_notified.get(&key1).unwrap()[0]
258                .1
259                .lamports(),
260            account1_lamports
261        );
262        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
263        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
264        assert_eq!(
265            notifier.accounts_notified.get(&key2).unwrap()[0]
266                .1
267                .lamports(),
268            account2_lamports
269        );
270        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
271
272        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
273    }
274
275    #[test]
276    fn test_notify_account_restore_from_snapshot_once_across_slots() {
277        let mut accounts = AccountsDb::new_single_for_tests();
278        // Account with key1 is updated twice in two different slots -- should only get notified once.
279        // Account with key2 is updated slot0, should get notified once
280        // Account with key3 is updated in slot1, should get notified once
281        let key1 = solana_sdk::pubkey::new_rand();
282        let mut account1_lamports: u64 = 1;
283        let account1 =
284            AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
285        let slot0 = 0;
286        accounts.store_uncached(slot0, &[(&key1, &account1)]);
287
288        let key2 = solana_sdk::pubkey::new_rand();
289        let account2_lamports: u64 = 200;
290        let account2 =
291            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
292        accounts.store_uncached(slot0, &[(&key2, &account2)]);
293
294        account1_lamports = 2;
295        let slot1 = 1;
296        let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
297        accounts.store_uncached(slot1, &[(&key1, &account1)]);
298        let notifier = GeyserTestPlugin::default();
299
300        let key3 = solana_sdk::pubkey::new_rand();
301        let account3_lamports: u64 = 300;
302        let account3 =
303            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
304        accounts.store_uncached(slot1, &[(&key3, &account3)]);
305
306        let notifier = Arc::new(RwLock::new(notifier));
307        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
308
309        accounts.notify_account_restore_from_snapshot();
310
311        let notifier = notifier.write().unwrap();
312        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 1);
313        assert_eq!(
314            notifier.accounts_notified.get(&key1).unwrap()[0]
315                .1
316                .lamports(),
317            account1_lamports
318        );
319        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot1);
320        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
321        assert_eq!(
322            notifier.accounts_notified.get(&key2).unwrap()[0]
323                .1
324                .lamports(),
325            account2_lamports
326        );
327        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
328        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
329        assert_eq!(
330            notifier.accounts_notified.get(&key3).unwrap()[0]
331                .1
332                .lamports(),
333            account3_lamports
334        );
335        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
336        assert!(notifier.is_startup_done.load(Ordering::Relaxed));
337    }
338
339    #[test]
340    fn test_notify_account_at_accounts_update() {
341        let mut accounts = AccountsDb::new_single_for_tests_with_caching();
342
343        let notifier = GeyserTestPlugin::default();
344
345        let notifier = Arc::new(RwLock::new(notifier));
346        accounts.set_geyser_plugin_notifer(Some(notifier.clone()));
347
348        // Account with key1 is updated twice in two different slots -- should only get notified twice.
349        // Account with key2 is updated slot0, should get notified once
350        // Account with key3 is updated in slot1, should get notified once
351        let key1 = solana_sdk::pubkey::new_rand();
352        let account1_lamports1: u64 = 1;
353        let account1 =
354            AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
355        let slot0 = 0;
356        accounts.store_cached((slot0, &[(&key1, &account1)][..]), None);
357
358        let key2 = solana_sdk::pubkey::new_rand();
359        let account2_lamports: u64 = 200;
360        let account2 =
361            AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
362        accounts.store_cached((slot0, &[(&key2, &account2)][..]), None);
363
364        let account1_lamports2 = 2;
365        let slot1 = 1;
366        let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
367        accounts.store_cached((slot1, &[(&key1, &account1)][..]), None);
368
369        let key3 = solana_sdk::pubkey::new_rand();
370        let account3_lamports: u64 = 300;
371        let account3 =
372            AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
373        accounts.store_cached((slot1, &[(&key3, &account3)][..]), None);
374
375        let notifier = notifier.write().unwrap();
376        assert_eq!(notifier.accounts_notified.get(&key1).unwrap().len(), 2);
377        assert_eq!(
378            notifier.accounts_notified.get(&key1).unwrap()[0]
379                .1
380                .lamports(),
381            account1_lamports1
382        );
383        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[0].0, slot0);
384        assert_eq!(
385            notifier.accounts_notified.get(&key1).unwrap()[1]
386                .1
387                .lamports(),
388            account1_lamports2
389        );
390        assert_eq!(notifier.accounts_notified.get(&key1).unwrap()[1].0, slot1);
391
392        assert_eq!(notifier.accounts_notified.get(&key2).unwrap().len(), 1);
393        assert_eq!(
394            notifier.accounts_notified.get(&key2).unwrap()[0]
395                .1
396                .lamports(),
397            account2_lamports
398        );
399        assert_eq!(notifier.accounts_notified.get(&key2).unwrap()[0].0, slot0);
400        assert_eq!(notifier.accounts_notified.get(&key3).unwrap().len(), 1);
401        assert_eq!(
402            notifier.accounts_notified.get(&key3).unwrap()[0]
403                .1
404                .lamports(),
405            account3_lamports
406        );
407        assert_eq!(notifier.accounts_notified.get(&key3).unwrap()[0].0, slot1);
408    }
409}