jetstreamer_plugin/plugins/
pubkey_stats.rs1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<
16 DashMap<u64, DashMap<Address, u32, ahash::RandomState>, ahash::RandomState>,
17> = Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
20struct PubkeyMention {
21 slot: u32,
22 timestamp: u32,
23 pubkey: Address,
24 num_mentions: u32,
25}
26
27#[derive(Debug, Clone)]
28pub struct PubkeyStatsPlugin;
37
38impl PubkeyStatsPlugin {
39 pub const fn new() -> Self {
41 Self
42 }
43
44 fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<PubkeyMention> {
45 let timestamp = clamp_block_time(block_time);
46 if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
47 return pubkey_counts
48 .into_iter()
49 .map(|(pubkey, num_mentions)| PubkeyMention {
50 slot: slot.min(u32::MAX as u64) as u32,
51 timestamp,
52 pubkey,
53 num_mentions,
54 })
55 .collect();
56 }
57 Vec::new()
58 }
59
60 fn drain_all_pending(block_time: Option<i64>) -> Vec<PubkeyMention> {
61 let timestamp = clamp_block_time(block_time);
62 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
63 let mut rows = Vec::new();
64 for slot in slots {
65 if let Some((_, pubkey_counts)) = PENDING_BY_SLOT.remove(&slot) {
66 rows.extend(pubkey_counts.into_iter().map(|(pubkey, num_mentions)| {
67 PubkeyMention {
68 slot: slot.min(u32::MAX as u64) as u32,
69 timestamp,
70 pubkey,
71 num_mentions,
72 }
73 }));
74 }
75 }
76 rows
77 }
78}
79
80impl Default for PubkeyStatsPlugin {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Plugin for PubkeyStatsPlugin {
87 #[inline(always)]
88 fn name(&self) -> &'static str {
89 "Pubkey Stats"
90 }
91
92 #[inline(always)]
93 fn on_transaction<'a>(
94 &'a self,
95 _thread_id: usize,
96 _db: Option<Arc<Client>>,
97 transaction: &'a TransactionData,
98 ) -> PluginFuture<'a> {
99 async move {
100 let account_keys = match &transaction.transaction.message {
101 VersionedMessage::Legacy(msg) => &msg.account_keys,
102 VersionedMessage::V0(msg) => &msg.account_keys,
103 };
104 if account_keys.is_empty() {
105 return Ok(());
106 }
107
108 let slot = transaction.slot;
109 let slot_entry = PENDING_BY_SLOT
110 .entry(slot)
111 .or_insert_with(|| DashMap::with_hasher(ahash::RandomState::new()));
112 for pubkey in account_keys {
113 *slot_entry.entry(*pubkey).or_insert(0) += 1;
114 }
115
116 Ok(())
117 }
118 .boxed()
119 }
120
121 #[inline(always)]
122 fn on_block(
123 &self,
124 _thread_id: usize,
125 db: Option<Arc<Client>>,
126 block: &BlockData,
127 ) -> PluginFuture<'_> {
128 let slot = block.slot();
129 let block_time = block.block_time();
130 let was_skipped = block.was_skipped();
131 async move {
132 if was_skipped {
133 return Ok(());
134 }
135
136 let rows = Self::take_slot_events(slot, block_time);
137
138 if let Some(db_client) = db
139 && !rows.is_empty()
140 {
141 tokio::spawn(async move {
142 if let Err(err) = write_pubkey_mentions(db_client, rows).await {
143 log::error!("failed to write pubkey mentions: {}", err);
144 }
145 });
146 }
147
148 Ok(())
149 }
150 .boxed()
151 }
152
153 #[inline(always)]
154 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
155 async move {
156 log::info!("Pubkey Stats Plugin loaded.");
157 if let Some(db) = db {
158 log::info!("Creating pubkey_mentions table if it does not exist...");
159 db.query(
160 r#"
161 CREATE TABLE IF NOT EXISTS pubkey_mentions (
162 slot UInt32,
163 timestamp DateTime('UTC'),
164 pubkey FixedString(32),
165 num_mentions UInt32
166 )
167 ENGINE = ReplacingMergeTree(timestamp)
168 ORDER BY (slot, pubkey)
169 "#,
170 )
171 .execute()
172 .await?;
173
174 log::info!("Creating pubkeys table if it does not exist...");
175 db.query(
176 r#"
177 CREATE TABLE IF NOT EXISTS pubkeys (
178 pubkey FixedString(32),
179 id UInt64
180 )
181 ENGINE = ReplacingMergeTree()
182 ORDER BY pubkey
183 "#,
184 )
185 .execute()
186 .await?;
187
188 log::info!("Creating pubkeys materialised view if it does not exist...");
189 db.query(
190 r#"
191 CREATE MATERIALIZED VIEW IF NOT EXISTS pubkeys_mv TO pubkeys AS
192 SELECT
193 pubkey,
194 sipHash64(pubkey) AS id
195 FROM pubkey_mentions
196 GROUP BY pubkey
197 "#,
198 )
199 .execute()
200 .await?;
201
202 log::info!("done.");
203 } else {
204 log::warn!(
205 "Pubkey Stats Plugin running without ClickHouse; data will not be persisted."
206 );
207 }
208 Ok(())
209 }
210 .boxed()
211 }
212
213 #[inline(always)]
214 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
215 async move {
216 if let Some(db_client) = db {
217 let rows = Self::drain_all_pending(None);
218 if !rows.is_empty() {
219 write_pubkey_mentions(Arc::clone(&db_client), rows)
220 .await
221 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
222 Box::new(err)
223 })?;
224 }
225 backfill_pubkey_timestamps(db_client)
226 .await
227 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
228 }
229 Ok(())
230 }
231 .boxed()
232 }
233}
234
235async fn write_pubkey_mentions(
236 db: Arc<Client>,
237 rows: Vec<PubkeyMention>,
238) -> Result<(), clickhouse::error::Error> {
239 if rows.is_empty() {
240 return Ok(());
241 }
242 let mut insert = db.insert::<PubkeyMention>("pubkey_mentions").await?;
243 for row in rows {
244 insert.write(&row).await?;
245 }
246 insert.end().await?;
247 Ok(())
248}
249
250fn clamp_block_time(block_time: Option<i64>) -> u32 {
251 let Some(raw_ts) = block_time else {
252 return 0;
253 };
254 if raw_ts < 0 {
255 0
256 } else if raw_ts > u32::MAX as i64 {
257 u32::MAX
258 } else {
259 raw_ts as u32
260 }
261}
262
263async fn backfill_pubkey_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
264 db.query(
265 r#"
266 INSERT INTO pubkey_mentions
267 SELECT pm.slot,
268 ss.block_time,
269 pm.pubkey,
270 pm.num_mentions
271 FROM pubkey_mentions AS pm
272 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
273 WHERE pm.timestamp = toDateTime(0)
274 AND ss.block_time > toDateTime(0)
275 "#,
276 )
277 .execute()
278 .await?;
279
280 Ok(())
281}