jetstreamer_plugin/plugins/
instruction_tracking.rs1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9use solana_sdk_ids::vote::id as vote_program_id;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent, ahash::RandomState>> =
15 Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
16
17#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
18struct SlotInstructionEvent {
19 slot: u32,
20 timestamp: u32,
22 vote_instruction_count: u64,
23 non_vote_instruction_count: u64,
24 vote_transaction_count: u32,
25 non_vote_transaction_count: u32,
26}
27
28#[derive(Debug, Clone)]
29pub struct InstructionTrackingPlugin;
31
32impl InstructionTrackingPlugin {
33 pub const fn new() -> Self {
35 Self
36 }
37
38 fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
39 let timestamp = clamp_block_time(block_time);
40 PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
41 event.timestamp = timestamp;
42 event
43 })
44 }
45
46 fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
47 let timestamp = clamp_block_time(block_time);
48 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
49 let mut rows = Vec::new();
50 for slot in slots {
51 if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
52 event.timestamp = timestamp;
53 rows.push(event);
54 }
55 }
56 rows
57 }
58}
59
60impl Default for InstructionTrackingPlugin {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl Plugin for InstructionTrackingPlugin {
67 #[inline(always)]
68 fn name(&self) -> &'static str {
69 "Instruction Tracking"
70 }
71
72 #[inline(always)]
73 fn on_transaction<'a>(
74 &'a self,
75 _thread_id: usize,
76 _db: Option<Arc<Client>>,
77 transaction: &'a TransactionData,
78 ) -> PluginFuture<'a> {
79 async move {
80 let (vote_instruction_count, non_vote_instruction_count) =
81 instruction_vote_counts(transaction);
82
83 let slot = transaction.slot;
84 let mut entry = PENDING_BY_SLOT
85 .entry(slot)
86 .or_insert_with(|| SlotInstructionEvent {
87 slot: slot.min(u32::MAX as u64) as u32,
88 timestamp: 0,
89 vote_instruction_count: 0,
90 non_vote_instruction_count: 0,
91 vote_transaction_count: 0,
92 non_vote_transaction_count: 0,
93 });
94 entry.vote_instruction_count = entry
95 .vote_instruction_count
96 .saturating_add(vote_instruction_count);
97 entry.non_vote_instruction_count = entry
98 .non_vote_instruction_count
99 .saturating_add(non_vote_instruction_count);
100 if vote_instruction_count > 0 {
101 entry.vote_transaction_count = entry.vote_transaction_count.saturating_add(1);
102 } else {
103 entry.non_vote_transaction_count =
104 entry.non_vote_transaction_count.saturating_add(1);
105 }
106
107 Ok(())
108 }
109 .boxed()
110 }
111
112 #[inline(always)]
113 fn on_block(
114 &self,
115 _thread_id: usize,
116 db: Option<Arc<Client>>,
117 block: &BlockData,
118 ) -> PluginFuture<'_> {
119 let slot = block.slot();
120 let block_time = block.block_time();
121 let was_skipped = block.was_skipped();
122
123 async move {
124 if was_skipped {
125 return Ok(());
126 }
127
128 let rows = Self::take_slot_event(slot, block_time)
129 .into_iter()
130 .collect::<Vec<_>>();
131
132 if let Some(db_client) = db
133 && !rows.is_empty()
134 {
135 tokio::spawn(async move {
136 if let Err(err) = write_instruction_events(db_client, rows).await {
137 log::error!("failed to write instruction events: {}", err);
138 }
139 });
140 }
141
142 Ok(())
143 }
144 .boxed()
145 }
146
147 #[inline(always)]
148 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
149 async move {
150 log::info!("Instruction Tracking Plugin loaded.");
151 if let Some(db) = db {
152 log::info!("Ensuring slot_instructions table exists...");
153 db.query(
154 r#"
155 CREATE TABLE IF NOT EXISTS slot_instructions (
156 slot UInt32,
157 timestamp DateTime('UTC'),
158 vote_instruction_count UInt64,
159 non_vote_instruction_count UInt64,
160 vote_transaction_count UInt32,
161 non_vote_transaction_count UInt32
162 )
163 ENGINE = ReplacingMergeTree(timestamp)
164 ORDER BY slot
165 "#,
166 )
167 .execute()
168 .await?;
169 log::info!("done.");
170 } else {
171 log::warn!(
172 "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
173 );
174 }
175 Ok(())
176 }
177 .boxed()
178 }
179
180 #[inline(always)]
181 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
182 async move {
183 if let Some(db_client) = db {
184 let rows = Self::drain_all_pending(None);
185 if !rows.is_empty() {
186 write_instruction_events(Arc::clone(&db_client), rows)
187 .await
188 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
189 Box::new(err)
190 })?;
191 }
192 backfill_instruction_timestamps(db_client)
193 .await
194 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
195 }
196 Ok(())
197 }
198 .boxed()
199 }
200}
201
202async fn write_instruction_events(
203 db: Arc<Client>,
204 rows: Vec<SlotInstructionEvent>,
205) -> Result<(), clickhouse::error::Error> {
206 if rows.is_empty() {
207 return Ok(());
208 }
209 let mut insert = db
210 .insert::<SlotInstructionEvent>("slot_instructions")
211 .await?;
212 for row in rows {
213 insert.write(&row).await?;
214 }
215 insert.end().await?;
216 Ok(())
217}
218
219fn clamp_block_time(block_time: Option<i64>) -> u32 {
220 let Some(raw_ts) = block_time else {
221 return 0;
222 };
223 if raw_ts < 0 {
224 0
225 } else if raw_ts > u32::MAX as i64 {
226 u32::MAX
227 } else {
228 raw_ts as u32
229 }
230}
231
232fn instruction_vote_counts(transaction: &TransactionData) -> (u64, u64) {
233 let static_keys = transaction.transaction.message.static_account_keys();
234 let vote_program = vote_program_id();
235 let mut vote_count: u64 = 0;
236 let mut non_vote_count: u64 = 0;
237
238 let classify = |program_index: usize, vote_count: &mut u64, non_vote_count: &mut u64| {
239 if let Some(pid) = static_keys.get(program_index) {
240 if pid == &vote_program {
241 *vote_count = vote_count.saturating_add(1);
242 } else {
243 *non_vote_count = non_vote_count.saturating_add(1);
244 }
245 } else {
246 *non_vote_count = non_vote_count.saturating_add(1);
247 }
248 };
249
250 match &transaction.transaction.message {
251 VersionedMessage::Legacy(msg) => {
252 for ix in &msg.instructions {
253 classify(
254 ix.program_id_index as usize,
255 &mut vote_count,
256 &mut non_vote_count,
257 );
258 }
259 }
260 VersionedMessage::V0(msg) => {
261 for ix in &msg.instructions {
262 classify(
263 ix.program_id_index as usize,
264 &mut vote_count,
265 &mut non_vote_count,
266 );
267 }
268 }
269 }
270
271 if let Some(inner_sets) = transaction
272 .transaction_status_meta
273 .inner_instructions
274 .as_ref()
275 {
276 for set in inner_sets {
277 for ix in &set.instructions {
278 classify(
279 ix.instruction.program_id_index as usize,
280 &mut vote_count,
281 &mut non_vote_count,
282 );
283 }
284 }
285 }
286
287 (vote_count, non_vote_count)
288}
289
290async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
291 db.query(
292 r#"
293 INSERT INTO slot_instructions
294 SELECT si.slot,
295 ss.block_time,
296 si.vote_instruction_count,
297 si.non_vote_instruction_count,
298 si.vote_transaction_count,
299 si.non_vote_transaction_count
300 FROM slot_instructions AS si
301 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
302 WHERE si.timestamp = toDateTime(0)
303 AND ss.block_time > toDateTime(0)
304 "#,
305 )
306 .execute()
307 .await?;
308
309 Ok(())
310}