jetstreamer_plugin/plugins/
program_tracking.rs1use std::{cell::RefCell, collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use futures_util::FutureExt;
5use log::error;
6use serde::{Deserialize, Serialize};
7use solana_sdk::{message::VersionedMessage, pubkey::Pubkey};
8
9use crate::{Plugin, PluginFuture};
10use jetstreamer_firehose::firehose::{BlockData, TransactionData};
11
12const DB_WRITE_INTERVAL_SLOTS: u64 = 1000;
13
14#[derive(Default)]
15struct ThreadLocalData {
16 slot_stats: HashMap<u64, HashMap<Pubkey, ProgramStats>>,
17 pending_rows: Vec<ProgramEvent>,
18 slots_since_flush: u64,
19}
20
21thread_local! {
22 static DATA: RefCell<ThreadLocalData> = RefCell::new(ThreadLocalData::default());
23}
24
25#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
26struct ProgramEvent {
27 pub slot: u32,
28 pub timestamp: u32,
30 pub program_id: Pubkey,
31 pub count: u32,
32 pub error_count: u32,
33 pub min_cus: u32,
34 pub max_cus: u32,
35 pub total_cus: u32,
36}
37
38#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
39struct ProgramStats {
40 pub count: u32,
41 pub error_count: u32,
42 pub min_cus: u32,
43 pub max_cus: u32,
44 pub total_cus: u32,
45}
46
47#[derive(Debug, Default, Clone)]
48pub struct ProgramTrackingPlugin;
50
51impl Plugin for ProgramTrackingPlugin {
52 #[inline(always)]
53 fn name(&self) -> &'static str {
54 "Program Tracking"
55 }
56
57 #[inline(always)]
58 fn on_transaction<'a>(
59 &'a self,
60 _thread_id: usize,
61 _db: Option<Arc<Client>>,
62 transaction: &'a TransactionData,
63 ) -> PluginFuture<'a> {
64 async move {
65 let message = &transaction.transaction.message;
66 let (account_keys, instructions) = match message {
67 VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
68 VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
69 };
70 if instructions.is_empty() {
71 return Ok(());
72 }
73 let program_ids = instructions
74 .iter()
75 .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
76 .cloned()
77 .collect::<Vec<_>>();
78 if program_ids.is_empty() {
79 return Ok(());
80 }
81 let total_cu = transaction
82 .transaction_status_meta
83 .compute_units_consumed
84 .unwrap_or(0) as u32;
85 let program_count = program_ids.len() as u32;
86
87 DATA.with(|data| {
88 let mut data = data.borrow_mut();
89 let slot_data = data.slot_stats.entry(transaction.slot).or_default();
90
91 for program_id in program_ids.iter() {
92 let this_program_cu = if program_count == 0 {
93 0
94 } else {
95 total_cu / program_count
96 };
97 let stats = slot_data.entry(*program_id).or_insert(ProgramStats {
98 min_cus: u32::MAX,
99 max_cus: 0,
100 total_cus: 0,
101 count: 0,
102 error_count: 0,
103 });
104 stats.min_cus = stats.min_cus.min(this_program_cu);
105 stats.max_cus = stats.max_cus.max(this_program_cu);
106 stats.total_cus += this_program_cu;
107 stats.count += 1;
108 if transaction.transaction_status_meta.status.is_err() {
109 stats.error_count += 1;
110 }
111 }
112 });
113
114 Ok(())
115 }
116 .boxed()
117 }
118
119 #[inline(always)]
120 fn on_block(
121 &self,
122 _thread_id: usize,
123 db: Option<Arc<Client>>,
124 block: &BlockData,
125 ) -> PluginFuture<'_> {
126 let slot_info = match block {
127 BlockData::Block {
128 slot, block_time, ..
129 } => Some((*slot, *block_time)),
130 BlockData::LeaderSkipped { .. } => None,
131 };
132 async move {
133 let Some((slot, block_time)) = slot_info else {
134 return Ok(());
135 };
136
137 let flush_rows = DATA.with(|data| {
138 let mut data = data.borrow_mut();
139 if let Some(slot_data) = data.slot_stats.remove(&slot) {
140 let slot_rows = events_from_slot(slot, block_time, &slot_data);
141 data.pending_rows.extend(slot_rows);
142 }
143 data.slots_since_flush = data.slots_since_flush.saturating_add(1);
144 if data.slots_since_flush >= DB_WRITE_INTERVAL_SLOTS {
145 data.slots_since_flush = 0;
146 if data.pending_rows.is_empty() {
147 None
148 } else {
149 Some(data.pending_rows.drain(..).collect::<Vec<_>>())
150 }
151 } else {
152 None
153 }
154 });
155
156 if let (Some(db_client), Some(rows)) = (db, flush_rows) {
157 tokio::spawn(async move {
158 if let Err(err) = write_program_events(db_client, rows).await {
159 error!("failed to flush program rows: {}", err);
160 }
161 });
162 }
163
164 Ok(())
165 }
166 .boxed()
167 }
168
169 #[inline(always)]
170 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
171 DATA.with(|_| {});
173 async move {
175 log::info!("Program Tracking Plugin loaded.");
176 if let Some(db) = db {
177 log::info!("Creating program_invocations table if it does not exist...");
178 db.query(
179 r#"
180 CREATE TABLE IF NOT EXISTS program_invocations (
181 slot UInt32,
182 timestamp DateTime('UTC'),
183 program_id FixedString(32),
184 count UInt32,
185 error_count UInt32,
186 min_cus UInt32,
187 max_cus UInt32,
188 total_cus UInt32
189 )
190 ENGINE = ReplacingMergeTree(slot)
191 ORDER BY (slot, program_id)
192 "#,
193 )
194 .execute()
195 .await?;
196 log::info!("done.");
197 } else {
198 log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
199 }
200 Ok(())
201 }
202 .boxed()
203 }
204
205 #[inline(always)]
206 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
207 async move {
208 if let Some(db_client) = db {
209 let rows = DATA.with(|data| {
210 let mut data = data.borrow_mut();
211 let mut rows = std::mem::take(&mut data.pending_rows);
212 for (slot, stats) in data.slot_stats.drain() {
213 rows.extend(events_from_slot(slot, None, &stats));
214 }
215 rows
216 });
217 if !rows.is_empty()
218 && let Err(err) = write_program_events(db_client, rows).await
219 {
220 error!("failed to flush program rows on exit: {}", err);
221 }
222 }
223 Ok(())
224 }
225 .boxed()
226 }
227}
228
229async fn write_program_events(
230 db: Arc<Client>,
231 rows: Vec<ProgramEvent>,
232) -> Result<(), clickhouse::error::Error> {
233 if rows.is_empty() {
234 return Ok(());
235 }
236 let mut insert = db.insert("program_invocations")?;
237 for row in rows {
238 insert.write(&row).await?;
239 }
240 insert.end().await?;
241 Ok(())
242}
243
244fn events_from_slot(
245 slot: u64,
246 block_time: Option<i64>,
247 slot_data: &HashMap<Pubkey, ProgramStats>,
248) -> Vec<ProgramEvent> {
249 let raw_ts = block_time.unwrap_or(0);
250 let timestamp: u32 = if raw_ts < 0 {
251 0
252 } else if raw_ts > u32::MAX as i64 {
253 u32::MAX
254 } else {
255 raw_ts as u32
256 };
257
258 slot_data
259 .iter()
260 .map(|(program_id, stats)| ProgramEvent {
261 slot: slot.min(u32::MAX as u64) as u32,
262 program_id: *program_id,
263 count: stats.count,
264 error_count: stats.error_count,
265 min_cus: stats.min_cus,
266 max_cus: stats.max_cus,
267 total_cus: stats.total_cus,
268 timestamp,
269 })
270 .collect()
271}