1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<
16 DashMap<u64, DashMap<Address, u32, ahash::RandomState>, ahash::RandomState>,
17> = Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
20struct PubkeyMention {
21 slot: u32,
22 timestamp: u32,
23 pubkey: Address,
24 num_mentions: u32,
25}
26
27#[derive(Debug, Clone)]
28pub struct PubkeyStatsPlugin;
37
38impl PubkeyStatsPlugin {
39 pub const fn new() -> Self {
41 Self
42 }
43
44 fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<PubkeyMention> {
45 let timestamp = clamp_block_time(block_time);
46 if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
47 return pubkey_counts
48 .into_iter()
49 .map(|(pubkey, num_mentions)| PubkeyMention {
50 slot: slot.min(u32::MAX as u64) as u32,
51 timestamp,
52 pubkey,
53 num_mentions,
54 })
55 .collect();
56 }
57 Vec::new()
58 }
59
60 fn drain_all_pending(block_time: Option<i64>) -> Vec<PubkeyMention> {
61 let timestamp = clamp_block_time(block_time);
62 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
63 let mut rows = Vec::new();
64 for slot in slots {
65 if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
66 rows.extend(pubkey_counts.into_iter().map(|(pubkey, num_mentions)| {
67 PubkeyMention {
68 slot: slot.min(u32::MAX as u64) as u32,
69 timestamp,
70 pubkey,
71 num_mentions,
72 }
73 }));
74 }
75 }
76 rows
77 }
78}
79
80impl Default for PubkeyStatsPlugin {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Plugin for PubkeyStatsPlugin {
87 #[inline(always)]
88 fn name(&self) -> &'static str {
89 "Pubkey Stats"
90 }
91
92 #[inline(always)]
93 fn on_transaction<'a>(
94 &'a self,
95 _thread_id: usize,
96 _db: Option<Arc<Client>>,
97 transaction: &'a TransactionData,
98 ) -> PluginFuture<'a> {
99 async move {
100 let account_keys = match &transaction.transaction.message {
101 VersionedMessage::Legacy(msg) => &msg.account_keys,
102 VersionedMessage::V0(msg) => &msg.account_keys,
103 };
104 if account_keys.is_empty() {
105 return Ok(());
106 }
107
108 let slot = transaction.slot;
109 let slot_entry = PENDING_BY_SLOT
110 .entry(slot)
111 .or_insert_with(|| DashMap::with_hasher(ahash::RandomState::new()));
112 for pubkey in account_keys {
113 *slot_entry.entry(*pubkey).or_insert(0) += 1;
114 }
115
116 Ok(())
117 }
118 .boxed()
119 }
120
121 #[inline(always)]
122 fn on_block(
123 &self,
124 _thread_id: usize,
125 db: Option<Arc<Client>>,
126 block: &BlockData,
127 ) -> PluginFuture<'_> {
128 let slot = block.slot();
129 let block_time = block.block_time();
130 let was_skipped = block.was_skipped();
131 async move {
132 if was_skipped {
133 return Ok(());
134 }
135
136 let rows = Self::take_slot_events(slot, block_time);
137
138 if let Some(db_client) = db
139 && !rows.is_empty()
140 {
141 tokio::spawn(async move {
142 if let Err(err) = write_pubkey_mentions(db_client, rows).await {
143 log::error!("failed to write pubkey mentions: {}", err);
144 }
145 });
146 }
147
148 Ok(())
149 }
150 .boxed()
151 }
152
153 #[inline(always)]
154 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
155 async move {
156 log::info!("Pubkey Stats Plugin loaded.");
157 if let Some(db) = db {
158 log::info!("Creating pubkey_mentions table if it does not exist...");
159 db.query(
160 r#"
161 CREATE TABLE IF NOT EXISTS pubkey_mentions (
162 slot UInt32,
163 timestamp DateTime('UTC'),
164 pubkey FixedString(32),
165 num_mentions UInt32
166 )
167 ENGINE = ReplacingMergeTree(timestamp)
168 ORDER BY (slot, pubkey)
169 "#,
170 )
171 .execute()
172 .await?;
173
174 log::info!("Creating pubkeys table if it does not exist...");
175 db.query(
176 r#"
177 CREATE TABLE IF NOT EXISTS pubkeys (
178 pubkey FixedString(32),
179 id UInt64
180 )
181 ENGINE = ReplacingMergeTree()
182 ORDER BY pubkey
183 "#,
184 )
185 .execute()
186 .await?;
187
188 log::info!("Creating pubkeys materialised view if it does not exist...");
189 db.query(
190 r#"
191 CREATE MATERIALIZED VIEW IF NOT EXISTS pubkeys_mv TO pubkeys AS
192 SELECT
193 pubkey,
194 sipHash64(pubkey) AS id
195 FROM pubkey_mentions
196 GROUP BY pubkey
197 "#,
198 )
199 .execute()
200 .await?;
201
202 log::info!("done.");
203 } else {
204 log::warn!(
205 "Pubkey Stats Plugin running without ClickHouse; data will not be persisted."
206 );
207 }
208 Ok(())
209 }
210 .boxed()
211 }
212
213 #[inline(always)]
214 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
215 async move {
216 if let Some(db_client) = db {
217 let rows = Self::drain_all_pending(None);
218 if !rows.is_empty() {
219 write_pubkey_mentions(Arc::clone(&db_client), rows)
220 .await
221 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
222 Box::new(err)
223 })?;
224 }
225 backfill_pubkey_timestamps(db_client)
226 .await
227 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
228 }
229 Ok(())
230 }
231 .boxed()
232 }
233}
234
235async fn write_pubkey_mentions(
236 db: Arc<Client>,
237 rows: Vec<PubkeyMention>,
238) -> Result<(), clickhouse::error::Error> {
239 if rows.is_empty() {
240 return Ok(());
241 }
242 let mut insert = db.insert::<PubkeyMention>("pubkey_mentions").await?;
243 for row in rows {
244 insert.write(&row).await?;
245 }
246 insert.end().await?;
247 Ok(())
248}
249
250fn clamp_block_time(block_time: Option<i64>) -> u32 {
251 let Some(raw_ts) = block_time else {
252 return 0;
253 };
254 if raw_ts < 0 {
255 0
256 } else if raw_ts > u32::MAX as i64 {
257 u32::MAX
258 } else {
259 raw_ts as u32
260 }
261}
262
263async fn backfill_pubkey_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
264 db.query(
265 r#"
266 INSERT INTO pubkey_mentions
267 SELECT pm.slot,
268 ss.block_time,
269 pm.pubkey,
270 pm.num_mentions
271 FROM pubkey_mentions AS pm
272 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
273 WHERE pm.timestamp = toDateTime(0)
274 AND ss.block_time > toDateTime(0)
275 "#,
276 )
277 .execute()
278 .await?;
279
280 Ok(())
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::Plugin;
287 use jetstreamer_firehose::firehose::{BlockData, TransactionData};
288 use serial_test::serial;
289 use solana_hash::Hash;
290 use solana_message::VersionedMessage;
291 use solana_message::legacy::Message as LegacyMessage;
292 use solana_runtime::bank::KeyedRewardsAndNumPartitions;
293 use solana_transaction::versioned::VersionedTransaction;
294 use solana_transaction_status::TransactionStatusMeta;
295
296 fn make_tx(slot: u64, account_keys: Vec<Address>) -> TransactionData {
297 let message = LegacyMessage {
298 account_keys,
299 ..LegacyMessage::default()
300 };
301 TransactionData {
302 slot,
303 transaction_slot_index: 0,
304 signature: Default::default(),
305 message_hash: Hash::default(),
306 is_vote: false,
307 transaction_status_meta: TransactionStatusMeta {
308 status: Ok(()),
309 fee: 0,
310 pre_balances: vec![],
311 post_balances: vec![],
312 inner_instructions: None,
313 log_messages: None,
314 pre_token_balances: None,
315 post_token_balances: None,
316 rewards: None,
317 loaded_addresses: Default::default(),
318 return_data: None,
319 compute_units_consumed: Some(0),
320 cost_units: None,
321 },
322 transaction: VersionedTransaction {
323 signatures: vec![],
324 message: VersionedMessage::Legacy(message),
325 },
326 }
327 }
328
329 fn make_block(slot: u64, block_time: Option<i64>) -> BlockData {
330 BlockData::Block {
331 slot,
332 parent_slot: slot.saturating_sub(1),
333 blockhash: Hash::default(),
334 parent_blockhash: Hash::default(),
335 rewards: KeyedRewardsAndNumPartitions {
336 keyed_rewards: vec![],
337 num_partitions: None,
338 },
339 block_time,
340 block_height: Some(slot),
341 executed_transaction_count: 0,
342 entry_count: 0,
343 }
344 }
345
346 fn clear_pending() {
347 PENDING_BY_SLOT.clear();
348 }
349
350 #[test]
351 fn clamp_block_time_none_returns_zero() {
352 assert_eq!(clamp_block_time(None), 0);
353 }
354
355 #[test]
356 fn clamp_block_time_negative_returns_zero() {
357 assert_eq!(clamp_block_time(Some(-100)), 0);
358 }
359
360 #[test]
361 fn clamp_block_time_overflow_returns_max() {
362 assert_eq!(clamp_block_time(Some(u32::MAX as i64 + 1)), u32::MAX);
363 }
364
365 #[test]
366 fn clamp_block_time_normal() {
367 assert_eq!(clamp_block_time(Some(1_700_000_000)), 1_700_000_000);
368 }
369
370 #[serial]
371 #[tokio::test]
372 async fn single_transaction_counts_all_account_keys() {
373 clear_pending();
374 let plugin = PubkeyStatsPlugin::new();
375 let key_a = Address::from([1u8; 32]);
376 let key_b = Address::from([2u8; 32]);
377 let key_c = Address::from([3u8; 32]);
378 let tx = make_tx(100, vec![key_a, key_b, key_c]);
379
380 plugin.on_transaction(0, None, &tx).await.unwrap();
381
382 let events = PubkeyStatsPlugin::take_slot_events(100, Some(1_700_000_000));
383 assert_eq!(events.len(), 3);
384 for event in &events {
385 assert_eq!(event.num_mentions, 1);
386 assert_eq!(event.slot, 100);
387 assert_eq!(event.timestamp, 1_700_000_000);
388 }
389 }
390
391 #[serial]
392 #[tokio::test]
393 async fn duplicate_keys_in_single_transaction_accumulate() {
394 clear_pending();
395 let plugin = PubkeyStatsPlugin::new();
396 let key_a = Address::from([1u8; 32]);
397 let tx = make_tx(200, vec![key_a, key_a, key_a]);
398
399 plugin.on_transaction(0, None, &tx).await.unwrap();
400
401 let events = PubkeyStatsPlugin::take_slot_events(200, None);
402 assert_eq!(events.len(), 1);
403 assert_eq!(events[0].num_mentions, 3);
404 assert_eq!(events[0].pubkey, key_a);
405 }
406
407 #[serial]
408 #[tokio::test]
409 async fn multiple_transactions_same_slot_accumulate() {
410 clear_pending();
411 let plugin = PubkeyStatsPlugin::new();
412 let key_a = Address::from([10u8; 32]);
413 let key_b = Address::from([20u8; 32]);
414
415 let tx1 = make_tx(300, vec![key_a, key_b]);
416 let tx2 = make_tx(300, vec![key_a]);
417
418 plugin.on_transaction(0, None, &tx1).await.unwrap();
419 plugin.on_transaction(0, None, &tx2).await.unwrap();
420
421 let events = PubkeyStatsPlugin::take_slot_events(300, None);
422 assert_eq!(events.len(), 2);
423 let a_event = events.iter().find(|e| e.pubkey == key_a).unwrap();
424 let b_event = events.iter().find(|e| e.pubkey == key_b).unwrap();
425 assert_eq!(a_event.num_mentions, 2);
426 assert_eq!(b_event.num_mentions, 1);
427 }
428
429 #[serial]
430 #[tokio::test]
431 async fn different_slots_are_independent() {
432 clear_pending();
433 let plugin = PubkeyStatsPlugin::new();
434 let key = Address::from([42u8; 32]);
435
436 let tx1 = make_tx(400, vec![key]);
437 let tx2 = make_tx(401, vec![key]);
438
439 plugin.on_transaction(0, None, &tx1).await.unwrap();
440 plugin.on_transaction(0, None, &tx2).await.unwrap();
441
442 let events_400 = PubkeyStatsPlugin::take_slot_events(400, None);
443 let events_401 = PubkeyStatsPlugin::take_slot_events(401, None);
444 assert_eq!(events_400.len(), 1);
445 assert_eq!(events_401.len(), 1);
446 assert_eq!(events_400[0].num_mentions, 1);
447 assert_eq!(events_401[0].num_mentions, 1);
448 }
449
450 #[serial]
451 #[tokio::test]
452 async fn take_slot_events_drains_slot() {
453 clear_pending();
454 let plugin = PubkeyStatsPlugin::new();
455 let tx = make_tx(500, vec![Address::from([1u8; 32])]);
456 plugin.on_transaction(0, None, &tx).await.unwrap();
457
458 let first = PubkeyStatsPlugin::take_slot_events(500, None);
459 let second = PubkeyStatsPlugin::take_slot_events(500, None);
460 assert_eq!(first.len(), 1);
461 assert!(second.is_empty());
462 }
463
464 #[serial]
465 #[tokio::test]
466 async fn drain_all_pending_collects_all_slots() {
467 clear_pending();
468 let plugin = PubkeyStatsPlugin::new();
469
470 let tx1 = make_tx(600, vec![Address::from([1u8; 32])]);
471 let tx2 = make_tx(601, vec![Address::from([2u8; 32])]);
472 let tx3 = make_tx(602, vec![Address::from([3u8; 32])]);
473
474 plugin.on_transaction(0, None, &tx1).await.unwrap();
475 plugin.on_transaction(0, None, &tx2).await.unwrap();
476 plugin.on_transaction(0, None, &tx3).await.unwrap();
477
478 let events = PubkeyStatsPlugin::drain_all_pending(Some(1_000));
479 assert_eq!(events.len(), 3);
480 assert!(PENDING_BY_SLOT.is_empty());
481 }
482
483 #[serial]
484 #[tokio::test]
485 async fn empty_account_keys_produces_no_events() {
486 clear_pending();
487 let plugin = PubkeyStatsPlugin::new();
488 let tx = make_tx(700, vec![]);
489 plugin.on_transaction(0, None, &tx).await.unwrap();
490 assert!(PENDING_BY_SLOT.is_empty());
491 }
492
493 #[serial]
494 #[tokio::test]
495 async fn on_block_drains_pending_slot() {
496 clear_pending();
497 let plugin = PubkeyStatsPlugin::new();
498 let tx = make_tx(800, vec![Address::from([1u8; 32])]);
499 plugin.on_transaction(0, None, &tx).await.unwrap();
500 assert!(!PENDING_BY_SLOT.is_empty());
501
502 let block = make_block(800, Some(1_700_000_000));
503 plugin.on_block(0, None, &block).await.unwrap();
504
505 assert!(!PENDING_BY_SLOT.contains_key(&800));
507 }
508
509 #[serial]
510 #[tokio::test]
511 async fn skipped_block_does_not_drain() {
512 clear_pending();
513 let plugin = PubkeyStatsPlugin::new();
514 let tx = make_tx(900, vec![Address::from([1u8; 32])]);
515 plugin.on_transaction(0, None, &tx).await.unwrap();
516
517 let skipped = BlockData::PossibleLeaderSkipped { slot: 900 };
518 plugin.on_block(0, None, &skipped).await.unwrap();
519
520 assert!(PENDING_BY_SLOT.contains_key(&900));
521 clear_pending();
522 }
523
524 #[test]
525 fn plugin_name() {
526 assert_eq!(PubkeyStatsPlugin::new().name(), "Pubkey Stats");
527 }
528
529 #[serial]
530 #[test]
531 fn slot_clamped_to_u32_max() {
532 let slot = u64::from(u32::MAX) + 100;
533 PENDING_BY_SLOT.clear();
534 let inner = DashMap::with_hasher(ahash::RandomState::new());
535 inner.insert(Address::from([1u8; 32]), 5);
536 PENDING_BY_SLOT.insert(slot, inner);
537
538 let events = PubkeyStatsPlugin::take_slot_events(slot, None);
539 assert_eq!(events.len(), 1);
540 assert_eq!(events[0].slot, u32::MAX);
541 }
542}