jetstreamer_plugin/plugins/
instruction_tracking.rs1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9
10use crate::{Plugin, PluginFuture};
11use jetstreamer_firehose::firehose::{BlockData, TransactionData};
12
13static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent>> = Lazy::new(DashMap::new);
14
15#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
16struct SlotInstructionEvent {
17 slot: u32,
18 timestamp: u32,
20 instruction_count: u64,
21 transaction_count: u32,
22}
23
24#[derive(Debug, Default, Clone)]
25pub struct InstructionTrackingPlugin;
27
28impl InstructionTrackingPlugin {
29 fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
30 let timestamp = clamp_block_time(block_time);
31 PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
32 event.timestamp = timestamp;
33 event
34 })
35 }
36
37 fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
38 let timestamp = clamp_block_time(block_time);
39 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
40 let mut rows = Vec::new();
41 for slot in slots {
42 if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
43 event.timestamp = timestamp;
44 rows.push(event);
45 }
46 }
47 rows
48 }
49}
50
51impl Plugin for InstructionTrackingPlugin {
52 #[inline(always)]
53 fn name(&self) -> &'static str {
54 "Instruction Tracking"
55 }
56
57 #[inline(always)]
58 fn on_transaction<'a>(
59 &'a self,
60 _thread_id: usize,
61 _db: Option<Arc<Client>>,
62 transaction: &'a TransactionData,
63 ) -> PluginFuture<'a> {
64 async move {
65 let instruction_count = total_instruction_count(transaction);
66
67 let slot = transaction.slot;
68 let mut entry = PENDING_BY_SLOT
69 .entry(slot)
70 .or_insert_with(|| SlotInstructionEvent {
71 slot: slot.min(u32::MAX as u64) as u32,
72 timestamp: 0,
73 instruction_count: 0,
74 transaction_count: 0,
75 });
76 entry.instruction_count = entry.instruction_count.saturating_add(instruction_count);
77 entry.transaction_count = entry.transaction_count.saturating_add(1);
78
79 Ok(())
80 }
81 .boxed()
82 }
83
84 #[inline(always)]
85 fn on_block(
86 &self,
87 _thread_id: usize,
88 db: Option<Arc<Client>>,
89 block: &BlockData,
90 ) -> PluginFuture<'_> {
91 let slot = block.slot();
92 let block_time = block.block_time();
93 let was_skipped = block.was_skipped();
94
95 async move {
96 if was_skipped {
97 return Ok(());
98 }
99
100 let rows = Self::take_slot_event(slot, block_time)
101 .into_iter()
102 .collect::<Vec<_>>();
103
104 if let Some(db_client) = db.as_ref()
105 && !rows.is_empty()
106 {
107 write_instruction_events(Arc::clone(db_client), rows)
108 .await
109 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
110 }
111
112 Ok(())
113 }
114 .boxed()
115 }
116
117 #[inline(always)]
118 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
119 async move {
120 log::info!("Instruction Tracking Plugin loaded.");
121 if let Some(db) = db {
122 log::info!("Ensuring slot_instructions table exists...");
123 db.query(
124 r#"
125 CREATE TABLE IF NOT EXISTS slot_instructions (
126 slot UInt32,
127 timestamp DateTime('UTC'),
128 instruction_count UInt64,
129 transaction_count UInt32
130 )
131 ENGINE = ReplacingMergeTree(timestamp)
132 ORDER BY slot
133 "#,
134 )
135 .execute()
136 .await?;
137 log::info!("done.");
138 } else {
139 log::warn!(
140 "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
141 );
142 }
143 Ok(())
144 }
145 .boxed()
146 }
147
148 #[inline(always)]
149 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
150 async move {
151 if let Some(db_client) = db {
152 let rows = Self::drain_all_pending(None);
153 if !rows.is_empty() {
154 write_instruction_events(Arc::clone(&db_client), rows)
155 .await
156 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
157 Box::new(err)
158 })?;
159 }
160 backfill_instruction_timestamps(db_client)
161 .await
162 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
163 }
164 Ok(())
165 }
166 .boxed()
167 }
168}
169
170async fn write_instruction_events(
171 db: Arc<Client>,
172 rows: Vec<SlotInstructionEvent>,
173) -> Result<(), clickhouse::error::Error> {
174 if rows.is_empty() {
175 return Ok(());
176 }
177 let mut insert = db
178 .insert::<SlotInstructionEvent>("slot_instructions")
179 .await?;
180 for row in rows {
181 insert.write(&row).await?;
182 }
183 insert.end().await?;
184 Ok(())
185}
186
187fn clamp_block_time(block_time: Option<i64>) -> u32 {
188 let Some(raw_ts) = block_time else {
189 return 0;
190 };
191 if raw_ts < 0 {
192 0
193 } else if raw_ts > u32::MAX as i64 {
194 u32::MAX
195 } else {
196 raw_ts as u32
197 }
198}
199
200fn total_instruction_count(transaction: &TransactionData) -> u64 {
201 let message_instructions = match &transaction.transaction.message {
202 VersionedMessage::Legacy(msg) => msg.instructions.len() as u64,
203 VersionedMessage::V0(msg) => msg.instructions.len() as u64,
204 };
205 let inner_instruction_count = transaction
206 .transaction_status_meta
207 .inner_instructions
208 .as_ref()
209 .map(|sets| {
210 sets.iter()
211 .map(|set| set.instructions.len() as u64)
212 .sum::<u64>()
213 })
214 .unwrap_or(0);
215 message_instructions.saturating_add(inner_instruction_count)
216}
217
218async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
219 db.query(
220 r#"
221 INSERT INTO slot_instructions
222 SELECT si.slot,
223 ss.block_time,
224 si.instruction_count,
225 si.transaction_count
226 FROM slot_instructions AS si
227 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
228 WHERE si.timestamp = toDateTime(0)
229 AND ss.block_time > toDateTime(0)
230 "#,
231 )
232 .execute()
233 .await?;
234
235 Ok(())
236}