jetstreamer_plugin/plugins/
instruction_tracking.rs1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9use solana_sdk_ids::vote::id as vote_program_id;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent>> = Lazy::new(DashMap::new);
15
16#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
17struct SlotInstructionEvent {
18 slot: u32,
19 timestamp: u32,
21 vote_instruction_count: u64,
22 non_vote_instruction_count: u64,
23 vote_transaction_count: u32,
24 non_vote_transaction_count: u32,
25}
26
27#[derive(Debug, Clone)]
28pub struct InstructionTrackingPlugin;
30
31impl InstructionTrackingPlugin {
32 pub const fn new() -> Self {
34 Self
35 }
36
37 fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
38 let timestamp = clamp_block_time(block_time);
39 PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
40 event.timestamp = timestamp;
41 event
42 })
43 }
44
45 fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
46 let timestamp = clamp_block_time(block_time);
47 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
48 let mut rows = Vec::new();
49 for slot in slots {
50 if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
51 event.timestamp = timestamp;
52 rows.push(event);
53 }
54 }
55 rows
56 }
57}
58
59impl Default for InstructionTrackingPlugin {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl Plugin for InstructionTrackingPlugin {
66 #[inline(always)]
67 fn name(&self) -> &'static str {
68 "Instruction Tracking"
69 }
70
71 #[inline(always)]
72 fn on_transaction<'a>(
73 &'a self,
74 _thread_id: usize,
75 _db: Option<Arc<Client>>,
76 transaction: &'a TransactionData,
77 ) -> PluginFuture<'a> {
78 async move {
79 let (vote_instruction_count, non_vote_instruction_count) =
80 instruction_vote_counts(transaction);
81
82 let slot = transaction.slot;
83 let mut entry = PENDING_BY_SLOT
84 .entry(slot)
85 .or_insert_with(|| SlotInstructionEvent {
86 slot: slot.min(u32::MAX as u64) as u32,
87 timestamp: 0,
88 vote_instruction_count: 0,
89 non_vote_instruction_count: 0,
90 vote_transaction_count: 0,
91 non_vote_transaction_count: 0,
92 });
93 entry.vote_instruction_count = entry
94 .vote_instruction_count
95 .saturating_add(vote_instruction_count);
96 entry.non_vote_instruction_count = entry
97 .non_vote_instruction_count
98 .saturating_add(non_vote_instruction_count);
99 if vote_instruction_count > 0 {
100 entry.vote_transaction_count = entry.vote_transaction_count.saturating_add(1);
101 } else {
102 entry.non_vote_transaction_count =
103 entry.non_vote_transaction_count.saturating_add(1);
104 }
105
106 Ok(())
107 }
108 .boxed()
109 }
110
111 #[inline(always)]
112 fn on_block(
113 &self,
114 _thread_id: usize,
115 db: Option<Arc<Client>>,
116 block: &BlockData,
117 ) -> PluginFuture<'_> {
118 let slot = block.slot();
119 let block_time = block.block_time();
120 let was_skipped = block.was_skipped();
121
122 async move {
123 if was_skipped {
124 return Ok(());
125 }
126
127 let rows = Self::take_slot_event(slot, block_time)
128 .into_iter()
129 .collect::<Vec<_>>();
130
131 if let Some(db_client) = db.as_ref()
132 && !rows.is_empty()
133 {
134 write_instruction_events(Arc::clone(db_client), rows)
135 .await
136 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
137 }
138
139 Ok(())
140 }
141 .boxed()
142 }
143
144 #[inline(always)]
145 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
146 async move {
147 log::info!("Instruction Tracking Plugin loaded.");
148 if let Some(db) = db {
149 log::info!("Ensuring slot_instructions table exists...");
150 db.query(
151 r#"
152 CREATE TABLE IF NOT EXISTS slot_instructions (
153 slot UInt32,
154 timestamp DateTime('UTC'),
155 vote_instruction_count UInt64,
156 non_vote_instruction_count UInt64,
157 vote_transaction_count UInt32,
158 non_vote_transaction_count UInt32
159 )
160 ENGINE = ReplacingMergeTree(timestamp)
161 ORDER BY slot
162 "#,
163 )
164 .execute()
165 .await?;
166 log::info!("done.");
167 } else {
168 log::warn!(
169 "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
170 );
171 }
172 Ok(())
173 }
174 .boxed()
175 }
176
177 #[inline(always)]
178 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
179 async move {
180 if let Some(db_client) = db {
181 let rows = Self::drain_all_pending(None);
182 if !rows.is_empty() {
183 write_instruction_events(Arc::clone(&db_client), rows)
184 .await
185 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
186 Box::new(err)
187 })?;
188 }
189 backfill_instruction_timestamps(db_client)
190 .await
191 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
192 }
193 Ok(())
194 }
195 .boxed()
196 }
197}
198
199async fn write_instruction_events(
200 db: Arc<Client>,
201 rows: Vec<SlotInstructionEvent>,
202) -> Result<(), clickhouse::error::Error> {
203 if rows.is_empty() {
204 return Ok(());
205 }
206 let mut insert = db
207 .insert::<SlotInstructionEvent>("slot_instructions")
208 .await?;
209 for row in rows {
210 insert.write(&row).await?;
211 }
212 insert.end().await?;
213 Ok(())
214}
215
216fn clamp_block_time(block_time: Option<i64>) -> u32 {
217 let Some(raw_ts) = block_time else {
218 return 0;
219 };
220 if raw_ts < 0 {
221 0
222 } else if raw_ts > u32::MAX as i64 {
223 u32::MAX
224 } else {
225 raw_ts as u32
226 }
227}
228
229fn instruction_vote_counts(transaction: &TransactionData) -> (u64, u64) {
230 let static_keys = transaction.transaction.message.static_account_keys();
231 let vote_program = vote_program_id();
232 let mut vote_count: u64 = 0;
233 let mut non_vote_count: u64 = 0;
234
235 let classify = |program_index: usize, vote_count: &mut u64, non_vote_count: &mut u64| {
236 if let Some(pid) = static_keys.get(program_index) {
237 if pid == &vote_program {
238 *vote_count = vote_count.saturating_add(1);
239 } else {
240 *non_vote_count = non_vote_count.saturating_add(1);
241 }
242 } else {
243 *non_vote_count = non_vote_count.saturating_add(1);
244 }
245 };
246
247 match &transaction.transaction.message {
248 VersionedMessage::Legacy(msg) => {
249 for ix in &msg.instructions {
250 classify(
251 ix.program_id_index as usize,
252 &mut vote_count,
253 &mut non_vote_count,
254 );
255 }
256 }
257 VersionedMessage::V0(msg) => {
258 for ix in &msg.instructions {
259 classify(
260 ix.program_id_index as usize,
261 &mut vote_count,
262 &mut non_vote_count,
263 );
264 }
265 }
266 }
267
268 if let Some(inner_sets) = transaction
269 .transaction_status_meta
270 .inner_instructions
271 .as_ref()
272 {
273 for set in inner_sets {
274 for ix in &set.instructions {
275 classify(
276 ix.instruction.program_id_index as usize,
277 &mut vote_count,
278 &mut non_vote_count,
279 );
280 }
281 }
282 }
283
284 (vote_count, non_vote_count)
285}
286
287async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
288 db.query(
289 r#"
290 INSERT INTO slot_instructions
291 SELECT si.slot,
292 ss.block_time,
293 si.vote_instruction_count,
294 si.non_vote_instruction_count,
295 si.vote_transaction_count,
296 si.non_vote_transaction_count
297 FROM slot_instructions AS si
298 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
299 WHERE si.timestamp = toDateTime(0)
300 AND ss.block_time > toDateTime(0)
301 "#,
302 )
303 .execute()
304 .await?;
305
306 Ok(())
307}