jetstreamer_plugin/plugins/
program_tracking.rs1use std::{cell::RefCell, collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use futures_util::FutureExt;
5use log::error;
6use serde::{Deserialize, Serialize};
7use solana_address::Address;
8use solana_message::VersionedMessage;
9
10use crate::{Plugin, PluginFuture};
11use jetstreamer_firehose::firehose::{BlockData, TransactionData};
12
13const DB_WRITE_INTERVAL_SLOTS: u64 = 1000;
14
15#[derive(Default)]
16struct ThreadLocalData {
17 slot_stats: HashMap<u64, HashMap<Address, ProgramStats>>,
18 pending_rows: Vec<ProgramEvent>,
19 slots_since_flush: u64,
20}
21
22thread_local! {
23 static DATA: RefCell<ThreadLocalData> = RefCell::new(ThreadLocalData::default());
24}
25
26#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
27struct ProgramEvent {
28 pub slot: u32,
29 pub timestamp: u32,
31 pub program_id: Address,
32 pub count: u32,
33 pub error_count: u32,
34 pub min_cus: u32,
35 pub max_cus: u32,
36 pub total_cus: u32,
37}
38
39#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
40struct ProgramStats {
41 pub count: u32,
42 pub error_count: u32,
43 pub min_cus: u32,
44 pub max_cus: u32,
45 pub total_cus: u32,
46}
47
48#[derive(Debug, Default, Clone)]
49pub struct ProgramTrackingPlugin;
51
52impl Plugin for ProgramTrackingPlugin {
53 #[inline(always)]
54 fn name(&self) -> &'static str {
55 "Program Tracking"
56 }
57
58 #[inline(always)]
59 fn on_transaction<'a>(
60 &'a self,
61 _thread_id: usize,
62 _db: Option<Arc<Client>>,
63 transaction: &'a TransactionData,
64 ) -> PluginFuture<'a> {
65 async move {
66 let message = &transaction.transaction.message;
67 let (account_keys, instructions) = match message {
68 VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
69 VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
70 };
71 if instructions.is_empty() {
72 return Ok(());
73 }
74 let program_ids = instructions
75 .iter()
76 .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
77 .cloned()
78 .collect::<Vec<_>>();
79 if program_ids.is_empty() {
80 return Ok(());
81 }
82 let total_cu = transaction
83 .transaction_status_meta
84 .compute_units_consumed
85 .unwrap_or(0) as u32;
86 let program_count = program_ids.len() as u32;
87
88 DATA.with(|data| {
89 let mut data = data.borrow_mut();
90 let slot_data = data.slot_stats.entry(transaction.slot).or_default();
91
92 for program_id in program_ids.iter() {
93 let this_program_cu = if program_count == 0 {
94 0
95 } else {
96 total_cu / program_count
97 };
98 let stats = slot_data.entry(*program_id).or_insert(ProgramStats {
99 min_cus: u32::MAX,
100 max_cus: 0,
101 total_cus: 0,
102 count: 0,
103 error_count: 0,
104 });
105 stats.min_cus = stats.min_cus.min(this_program_cu);
106 stats.max_cus = stats.max_cus.max(this_program_cu);
107 stats.total_cus += this_program_cu;
108 stats.count += 1;
109 if transaction.transaction_status_meta.status.is_err() {
110 stats.error_count += 1;
111 }
112 }
113 });
114
115 Ok(())
116 }
117 .boxed()
118 }
119
120 #[inline(always)]
121 fn on_block(
122 &self,
123 _thread_id: usize,
124 db: Option<Arc<Client>>,
125 block: &BlockData,
126 ) -> PluginFuture<'_> {
127 let slot_info = match block {
128 BlockData::Block {
129 slot, block_time, ..
130 } => Some((*slot, *block_time)),
131 BlockData::LeaderSkipped { .. } => None,
132 };
133 async move {
134 let Some((slot, block_time)) = slot_info else {
135 return Ok(());
136 };
137
138 let flush_rows = DATA.with(|data| {
139 let mut data = data.borrow_mut();
140 if let Some(slot_data) = data.slot_stats.remove(&slot) {
141 let slot_rows = events_from_slot(slot, block_time, &slot_data);
142 data.pending_rows.extend(slot_rows);
143 }
144 data.slots_since_flush = data.slots_since_flush.saturating_add(1);
145 if data.slots_since_flush >= DB_WRITE_INTERVAL_SLOTS {
146 data.slots_since_flush = 0;
147 if data.pending_rows.is_empty() {
148 None
149 } else {
150 Some(data.pending_rows.drain(..).collect::<Vec<_>>())
151 }
152 } else {
153 None
154 }
155 });
156
157 if let (Some(db_client), Some(rows)) = (db, flush_rows) {
158 tokio::spawn(async move {
159 if let Err(err) = write_program_events(db_client, rows).await {
160 error!("failed to flush program rows: {}", err);
161 }
162 });
163 }
164
165 Ok(())
166 }
167 .boxed()
168 }
169
170 #[inline(always)]
171 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
172 DATA.with(|_| {});
174 async move {
176 log::info!("Program Tracking Plugin loaded.");
177 if let Some(db) = db {
178 log::info!("Creating program_invocations table if it does not exist...");
179 db.query(
180 r#"
181 CREATE TABLE IF NOT EXISTS program_invocations (
182 slot UInt32,
183 timestamp DateTime('UTC'),
184 program_id FixedString(32),
185 count UInt32,
186 error_count UInt32,
187 min_cus UInt32,
188 max_cus UInt32,
189 total_cus UInt32
190 )
191 ENGINE = ReplacingMergeTree(slot)
192 ORDER BY (slot, program_id)
193 "#,
194 )
195 .execute()
196 .await?;
197 log::info!("done.");
198 } else {
199 log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
200 }
201 Ok(())
202 }
203 .boxed()
204 }
205
206 #[inline(always)]
207 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
208 async move {
209 if let Some(db_client) = db {
210 let rows = DATA.with(|data| {
211 let mut data = data.borrow_mut();
212 let mut rows = std::mem::take(&mut data.pending_rows);
213 for (slot, stats) in data.slot_stats.drain() {
214 rows.extend(events_from_slot(slot, None, &stats));
215 }
216 rows
217 });
218 if !rows.is_empty()
219 && let Err(err) = write_program_events(db_client, rows).await
220 {
221 error!("failed to flush program rows on exit: {}", err);
222 }
223 }
224 Ok(())
225 }
226 .boxed()
227 }
228}
229
230async fn write_program_events(
231 db: Arc<Client>,
232 rows: Vec<ProgramEvent>,
233) -> Result<(), clickhouse::error::Error> {
234 if rows.is_empty() {
235 return Ok(());
236 }
237 let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
238 for row in rows {
239 insert.write(&row).await?;
240 }
241 insert.end().await?;
242 Ok(())
243}
244
245fn events_from_slot(
246 slot: u64,
247 block_time: Option<i64>,
248 slot_data: &HashMap<Address, ProgramStats>,
249) -> Vec<ProgramEvent> {
250 let raw_ts = block_time.unwrap_or(0);
251 let timestamp: u32 = if raw_ts < 0 {
252 0
253 } else if raw_ts > u32::MAX as i64 {
254 u32::MAX
255 } else {
256 raw_ts as u32
257 };
258
259 slot_data
260 .iter()
261 .map(|(program_id, stats)| ProgramEvent {
262 slot: slot.min(u32::MAX as u64) as u32,
263 program_id: *program_id,
264 count: stats.count,
265 error_count: stats.error_count,
266 min_cus: stats.min_cus,
267 max_cus: stats.max_cus,
268 total_cus: stats.total_cus,
269 timestamp,
270 })
271 .collect()
272}