Skip to main content

jetstreamer_plugin/plugins/
pubkey_stats.rs

1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14/// Per-slot accumulator: maps each pubkey to its mention count within that slot.
15static PENDING_BY_SLOT: Lazy<
16    DashMap<u64, DashMap<Address, u32, ahash::RandomState>, ahash::RandomState>,
17> = Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
20struct PubkeyMention {
21    slot: u32,
22    timestamp: u32,
23    pubkey: Address,
24    num_mentions: u32,
25}
26
27#[derive(Debug, Clone)]
28/// Tracks per-slot pubkey mention counts and writes them to ClickHouse.
29///
30/// For every transaction, all account keys referenced in the message (both static and loaded)
31/// are counted. A ClickHouse `pubkey_mentions` table stores the aggregated count per
32/// `(slot, pubkey)` pair using `ReplacingMergeTree` for safe parallel ingestion.
33///
34/// A companion `pubkeys` table assigns a unique auto-incremented id to each pubkey, maintained
35/// via a materialised view so lookups by id are efficient.
36pub struct PubkeyStatsPlugin;
37
38impl PubkeyStatsPlugin {
39    /// Creates a new instance.
40    pub const fn new() -> Self {
41        Self
42    }
43
44    fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<PubkeyMention> {
45        let timestamp = clamp_block_time(block_time);
46        if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
47            return pubkey_counts
48                .into_iter()
49                .map(|(pubkey, num_mentions)| PubkeyMention {
50                    slot: slot.min(u32::MAX as u64) as u32,
51                    timestamp,
52                    pubkey,
53                    num_mentions,
54                })
55                .collect();
56        }
57        Vec::new()
58    }
59
60    fn drain_all_pending(block_time: Option<i64>) -> Vec<PubkeyMention> {
61        let timestamp = clamp_block_time(block_time);
62        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
63        let mut rows = Vec::new();
64        for slot in slots {
65            if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
66                rows.extend(pubkey_counts.into_iter().map(|(pubkey, num_mentions)| {
67                    PubkeyMention {
68                        slot: slot.min(u32::MAX as u64) as u32,
69                        timestamp,
70                        pubkey,
71                        num_mentions,
72                    }
73                }));
74            }
75        }
76        rows
77    }
78}
79
80impl Default for PubkeyStatsPlugin {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Plugin for PubkeyStatsPlugin {
87    #[inline(always)]
88    fn name(&self) -> &'static str {
89        "Pubkey Stats"
90    }
91
92    #[inline(always)]
93    fn on_transaction<'a>(
94        &'a self,
95        _thread_id: usize,
96        _db: Option<Arc<Client>>,
97        transaction: &'a TransactionData,
98    ) -> PluginFuture<'a> {
99        async move {
100            let account_keys = match &transaction.transaction.message {
101                VersionedMessage::Legacy(msg) => &msg.account_keys,
102                VersionedMessage::V0(msg) => &msg.account_keys,
103            };
104            if account_keys.is_empty() {
105                return Ok(());
106            }
107
108            let slot = transaction.slot;
109            let slot_entry = PENDING_BY_SLOT
110                .entry(slot)
111                .or_insert_with(|| DashMap::with_hasher(ahash::RandomState::new()));
112            for pubkey in account_keys {
113                *slot_entry.entry(*pubkey).or_insert(0) += 1;
114            }
115
116            Ok(())
117        }
118        .boxed()
119    }
120
121    #[inline(always)]
122    fn on_block(
123        &self,
124        _thread_id: usize,
125        db: Option<Arc<Client>>,
126        block: &BlockData,
127    ) -> PluginFuture<'_> {
128        let slot = block.slot();
129        let block_time = block.block_time();
130        let was_skipped = block.was_skipped();
131        async move {
132            if was_skipped {
133                return Ok(());
134            }
135
136            let rows = Self::take_slot_events(slot, block_time);
137
138            if let Some(db_client) = db
139                && !rows.is_empty()
140            {
141                tokio::spawn(async move {
142                    if let Err(err) = write_pubkey_mentions(db_client, rows).await {
143                        log::error!("failed to write pubkey mentions: {}", err);
144                    }
145                });
146            }
147
148            Ok(())
149        }
150        .boxed()
151    }
152
153    #[inline(always)]
154    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
155        async move {
156            log::info!("Pubkey Stats Plugin loaded.");
157            if let Some(db) = db {
158                log::info!("Creating pubkey_mentions table if it does not exist...");
159                db.query(
160                    r#"
161                    CREATE TABLE IF NOT EXISTS pubkey_mentions (
162                        slot          UInt32,
163                        timestamp     DateTime('UTC'),
164                        pubkey        FixedString(32),
165                        num_mentions  UInt32
166                    )
167                    ENGINE = ReplacingMergeTree(timestamp)
168                    ORDER BY (slot, pubkey)
169                    "#,
170                )
171                .execute()
172                .await?;
173
174                log::info!("Creating pubkeys table if it does not exist...");
175                db.query(
176                    r#"
177                    CREATE TABLE IF NOT EXISTS pubkeys (
178                        pubkey  FixedString(32),
179                        id      UInt64
180                    )
181                    ENGINE = ReplacingMergeTree()
182                    ORDER BY pubkey
183                    "#,
184                )
185                .execute()
186                .await?;
187
188                log::info!("Creating pubkeys materialised view if it does not exist...");
189                db.query(
190                    r#"
191                    CREATE MATERIALIZED VIEW IF NOT EXISTS pubkeys_mv TO pubkeys AS
192                    SELECT
193                        pubkey,
194                        sipHash64(pubkey) AS id
195                    FROM pubkey_mentions
196                    GROUP BY pubkey
197                    "#,
198                )
199                .execute()
200                .await?;
201
202                log::info!("done.");
203            } else {
204                log::warn!(
205                    "Pubkey Stats Plugin running without ClickHouse; data will not be persisted."
206                );
207            }
208            Ok(())
209        }
210        .boxed()
211    }
212
213    #[inline(always)]
214    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
215        async move {
216            if let Some(db_client) = db {
217                let rows = Self::drain_all_pending(None);
218                if !rows.is_empty() {
219                    write_pubkey_mentions(Arc::clone(&db_client), rows)
220                        .await
221                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
222                            Box::new(err)
223                        })?;
224                }
225                backfill_pubkey_timestamps(db_client)
226                    .await
227                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
228            }
229            Ok(())
230        }
231        .boxed()
232    }
233}
234
235async fn write_pubkey_mentions(
236    db: Arc<Client>,
237    rows: Vec<PubkeyMention>,
238) -> Result<(), clickhouse::error::Error> {
239    if rows.is_empty() {
240        return Ok(());
241    }
242    let mut insert = db.insert::<PubkeyMention>("pubkey_mentions").await?;
243    for row in rows {
244        insert.write(&row).await?;
245    }
246    insert.end().await?;
247    Ok(())
248}
249
250fn clamp_block_time(block_time: Option<i64>) -> u32 {
251    let Some(raw_ts) = block_time else {
252        return 0;
253    };
254    if raw_ts < 0 {
255        0
256    } else if raw_ts > u32::MAX as i64 {
257        u32::MAX
258    } else {
259        raw_ts as u32
260    }
261}
262
263async fn backfill_pubkey_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
264    db.query(
265        r#"
266        INSERT INTO pubkey_mentions
267        SELECT pm.slot,
268               ss.block_time,
269               pm.pubkey,
270               pm.num_mentions
271        FROM pubkey_mentions AS pm
272        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
273        WHERE pm.timestamp = toDateTime(0)
274          AND ss.block_time > toDateTime(0)
275        "#,
276    )
277    .execute()
278    .await?;
279
280    Ok(())
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::Plugin;
287    use jetstreamer_firehose::firehose::{BlockData, TransactionData};
288    use serial_test::serial;
289    use solana_hash::Hash;
290    use solana_message::VersionedMessage;
291    use solana_message::legacy::Message as LegacyMessage;
292    use solana_runtime::bank::KeyedRewardsAndNumPartitions;
293    use solana_transaction::versioned::VersionedTransaction;
294    use solana_transaction_status::TransactionStatusMeta;
295
296    fn make_tx(slot: u64, account_keys: Vec<Address>) -> TransactionData {
297        let message = LegacyMessage {
298            account_keys,
299            ..LegacyMessage::default()
300        };
301        TransactionData {
302            slot,
303            transaction_slot_index: 0,
304            signature: Default::default(),
305            message_hash: Hash::default(),
306            is_vote: false,
307            transaction_status_meta: TransactionStatusMeta {
308                status: Ok(()),
309                fee: 0,
310                pre_balances: vec![],
311                post_balances: vec![],
312                inner_instructions: None,
313                log_messages: None,
314                pre_token_balances: None,
315                post_token_balances: None,
316                rewards: None,
317                loaded_addresses: Default::default(),
318                return_data: None,
319                compute_units_consumed: Some(0),
320                cost_units: None,
321            },
322            transaction: VersionedTransaction {
323                signatures: vec![],
324                message: VersionedMessage::Legacy(message),
325            },
326        }
327    }
328
329    fn make_block(slot: u64, block_time: Option<i64>) -> BlockData {
330        BlockData::Block {
331            slot,
332            parent_slot: slot.saturating_sub(1),
333            blockhash: Hash::default(),
334            parent_blockhash: Hash::default(),
335            rewards: KeyedRewardsAndNumPartitions {
336                keyed_rewards: vec![],
337                num_partitions: None,
338            },
339            block_time,
340            block_height: Some(slot),
341            executed_transaction_count: 0,
342            entry_count: 0,
343        }
344    }
345
346    fn clear_pending() {
347        PENDING_BY_SLOT.clear();
348    }
349
350    #[test]
351    fn clamp_block_time_none_returns_zero() {
352        assert_eq!(clamp_block_time(None), 0);
353    }
354
355    #[test]
356    fn clamp_block_time_negative_returns_zero() {
357        assert_eq!(clamp_block_time(Some(-100)), 0);
358    }
359
360    #[test]
361    fn clamp_block_time_overflow_returns_max() {
362        assert_eq!(clamp_block_time(Some(u32::MAX as i64 + 1)), u32::MAX);
363    }
364
365    #[test]
366    fn clamp_block_time_normal() {
367        assert_eq!(clamp_block_time(Some(1_700_000_000)), 1_700_000_000);
368    }
369
370    #[serial]
371    #[tokio::test]
372    async fn single_transaction_counts_all_account_keys() {
373        clear_pending();
374        let plugin = PubkeyStatsPlugin::new();
375        let key_a = Address::from([1u8; 32]);
376        let key_b = Address::from([2u8; 32]);
377        let key_c = Address::from([3u8; 32]);
378        let tx = make_tx(100, vec![key_a, key_b, key_c]);
379
380        plugin.on_transaction(0, None, &tx).await.unwrap();
381
382        let events = PubkeyStatsPlugin::take_slot_events(100, Some(1_700_000_000));
383        assert_eq!(events.len(), 3);
384        for event in &events {
385            assert_eq!(event.num_mentions, 1);
386            assert_eq!(event.slot, 100);
387            assert_eq!(event.timestamp, 1_700_000_000);
388        }
389    }
390
391    #[serial]
392    #[tokio::test]
393    async fn duplicate_keys_in_single_transaction_accumulate() {
394        clear_pending();
395        let plugin = PubkeyStatsPlugin::new();
396        let key_a = Address::from([1u8; 32]);
397        let tx = make_tx(200, vec![key_a, key_a, key_a]);
398
399        plugin.on_transaction(0, None, &tx).await.unwrap();
400
401        let events = PubkeyStatsPlugin::take_slot_events(200, None);
402        assert_eq!(events.len(), 1);
403        assert_eq!(events[0].num_mentions, 3);
404        assert_eq!(events[0].pubkey, key_a);
405    }
406
407    #[serial]
408    #[tokio::test]
409    async fn multiple_transactions_same_slot_accumulate() {
410        clear_pending();
411        let plugin = PubkeyStatsPlugin::new();
412        let key_a = Address::from([10u8; 32]);
413        let key_b = Address::from([20u8; 32]);
414
415        let tx1 = make_tx(300, vec![key_a, key_b]);
416        let tx2 = make_tx(300, vec![key_a]);
417
418        plugin.on_transaction(0, None, &tx1).await.unwrap();
419        plugin.on_transaction(0, None, &tx2).await.unwrap();
420
421        let events = PubkeyStatsPlugin::take_slot_events(300, None);
422        assert_eq!(events.len(), 2);
423        let a_event = events.iter().find(|e| e.pubkey == key_a).unwrap();
424        let b_event = events.iter().find(|e| e.pubkey == key_b).unwrap();
425        assert_eq!(a_event.num_mentions, 2);
426        assert_eq!(b_event.num_mentions, 1);
427    }
428
429    #[serial]
430    #[tokio::test]
431    async fn different_slots_are_independent() {
432        clear_pending();
433        let plugin = PubkeyStatsPlugin::new();
434        let key = Address::from([42u8; 32]);
435
436        let tx1 = make_tx(400, vec![key]);
437        let tx2 = make_tx(401, vec![key]);
438
439        plugin.on_transaction(0, None, &tx1).await.unwrap();
440        plugin.on_transaction(0, None, &tx2).await.unwrap();
441
442        let events_400 = PubkeyStatsPlugin::take_slot_events(400, None);
443        let events_401 = PubkeyStatsPlugin::take_slot_events(401, None);
444        assert_eq!(events_400.len(), 1);
445        assert_eq!(events_401.len(), 1);
446        assert_eq!(events_400[0].num_mentions, 1);
447        assert_eq!(events_401[0].num_mentions, 1);
448    }
449
450    #[serial]
451    #[tokio::test]
452    async fn take_slot_events_drains_slot() {
453        clear_pending();
454        let plugin = PubkeyStatsPlugin::new();
455        let tx = make_tx(500, vec![Address::from([1u8; 32])]);
456        plugin.on_transaction(0, None, &tx).await.unwrap();
457
458        let first = PubkeyStatsPlugin::take_slot_events(500, None);
459        let second = PubkeyStatsPlugin::take_slot_events(500, None);
460        assert_eq!(first.len(), 1);
461        assert!(second.is_empty());
462    }
463
464    #[serial]
465    #[tokio::test]
466    async fn drain_all_pending_collects_all_slots() {
467        clear_pending();
468        let plugin = PubkeyStatsPlugin::new();
469
470        let tx1 = make_tx(600, vec![Address::from([1u8; 32])]);
471        let tx2 = make_tx(601, vec![Address::from([2u8; 32])]);
472        let tx3 = make_tx(602, vec![Address::from([3u8; 32])]);
473
474        plugin.on_transaction(0, None, &tx1).await.unwrap();
475        plugin.on_transaction(0, None, &tx2).await.unwrap();
476        plugin.on_transaction(0, None, &tx3).await.unwrap();
477
478        let events = PubkeyStatsPlugin::drain_all_pending(Some(1_000));
479        assert_eq!(events.len(), 3);
480        assert!(PENDING_BY_SLOT.is_empty());
481    }
482
483    #[serial]
484    #[tokio::test]
485    async fn empty_account_keys_produces_no_events() {
486        clear_pending();
487        let plugin = PubkeyStatsPlugin::new();
488        let tx = make_tx(700, vec![]);
489        plugin.on_transaction(0, None, &tx).await.unwrap();
490        assert!(PENDING_BY_SLOT.is_empty());
491    }
492
493    #[serial]
494    #[tokio::test]
495    async fn on_block_drains_pending_slot() {
496        clear_pending();
497        let plugin = PubkeyStatsPlugin::new();
498        let tx = make_tx(800, vec![Address::from([1u8; 32])]);
499        plugin.on_transaction(0, None, &tx).await.unwrap();
500        assert!(!PENDING_BY_SLOT.is_empty());
501
502        let block = make_block(800, Some(1_700_000_000));
503        plugin.on_block(0, None, &block).await.unwrap();
504
505        // on_block without db just drains, doesn't write
506        assert!(!PENDING_BY_SLOT.contains_key(&800));
507    }
508
509    #[serial]
510    #[tokio::test]
511    async fn skipped_block_does_not_drain() {
512        clear_pending();
513        let plugin = PubkeyStatsPlugin::new();
514        let tx = make_tx(900, vec![Address::from([1u8; 32])]);
515        plugin.on_transaction(0, None, &tx).await.unwrap();
516
517        let skipped = BlockData::PossibleLeaderSkipped { slot: 900 };
518        plugin.on_block(0, None, &skipped).await.unwrap();
519
520        assert!(PENDING_BY_SLOT.contains_key(&900));
521        clear_pending();
522    }
523
524    #[test]
525    fn plugin_name() {
526        assert_eq!(PubkeyStatsPlugin::new().name(), "Pubkey Stats");
527    }
528
529    #[serial]
530    #[test]
531    fn slot_clamped_to_u32_max() {
532        let slot = u64::from(u32::MAX) + 100;
533        PENDING_BY_SLOT.clear();
534        let inner = DashMap::with_hasher(ahash::RandomState::new());
535        inner.insert(Address::from([1u8; 32]), 5);
536        PENDING_BY_SLOT.insert(slot, inner);
537
538        let events = PubkeyStatsPlugin::take_slot_events(slot, None);
539        assert_eq!(events.len(), 1);
540        assert_eq!(events[0].slot, u32::MAX);
541    }
542}