jetstreamer_plugin/plugins/
program_tracking.rs1use std::{collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14type SlotProgramKey = (Address, bool);
15type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent>;
16
17static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents>> = Lazy::new(DashMap::new);
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
20struct ProgramEvent {
21 pub slot: u32,
22 pub timestamp: u32,
24 pub program_id: Address,
25 pub is_vote: bool,
26 pub count: u32,
27 pub error_count: u32,
28 pub min_cus: u32,
29 pub max_cus: u32,
30 pub total_cus: u32,
31}
32
33#[derive(Debug, Clone)]
34pub struct ProgramTrackingPlugin;
36
37impl ProgramTrackingPlugin {
38 pub const fn new() -> Self {
40 Self
41 }
42
43 fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
44 let timestamp = clamp_block_time(block_time);
45 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
46 return events_by_program
47 .into_values()
48 .map(|mut event| {
49 event.timestamp = timestamp;
50 event
51 })
52 .collect();
53 }
54 Vec::new()
55 }
56
57 fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
58 let timestamp = clamp_block_time(block_time);
59 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
60 let mut rows = Vec::new();
61 for slot in slots {
62 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
63 rows.extend(events_by_program.into_values().map(|mut event| {
64 event.timestamp = timestamp;
65 event
66 }));
67 }
68 }
69 rows
70 }
71}
72
73impl Default for ProgramTrackingPlugin {
74 fn default() -> Self {
75 Self::new()
76 }
77}
78
79impl Plugin for ProgramTrackingPlugin {
80 #[inline(always)]
81 fn name(&self) -> &'static str {
82 "Program Tracking"
83 }
84
85 #[inline(always)]
86 fn on_transaction<'a>(
87 &'a self,
88 _thread_id: usize,
89 _db: Option<Arc<Client>>,
90 transaction: &'a TransactionData,
91 ) -> PluginFuture<'a> {
92 async move {
93 let message = &transaction.transaction.message;
94 let (account_keys, instructions) = match message {
95 VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
96 VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
97 };
98 if instructions.is_empty() {
99 return Ok(());
100 }
101 let program_ids = instructions
102 .iter()
103 .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
104 .cloned()
105 .collect::<Vec<_>>();
106 if program_ids.is_empty() {
107 return Ok(());
108 }
109 let total_cu = transaction
110 .transaction_status_meta
111 .compute_units_consumed
112 .unwrap_or(0) as u32;
113 let program_count = program_ids.len() as u32;
114 let errored = transaction.transaction_status_meta.status.is_err();
115 let slot = transaction.slot;
116 let is_vote = transaction.is_vote;
117
118 let mut slot_entry = PENDING_BY_SLOT.entry(slot).or_default();
119 for program_id in program_ids.iter() {
120 let this_program_cu = if program_count == 0 {
121 0
122 } else {
123 total_cu / program_count
124 };
125 let event =
126 slot_entry
127 .entry((*program_id, is_vote))
128 .or_insert_with(|| ProgramEvent {
129 slot: slot.min(u32::MAX as u64) as u32,
130 timestamp: 0,
131 program_id: *program_id,
132 is_vote,
133 count: 0,
134 error_count: 0,
135 min_cus: u32::MAX,
136 max_cus: 0,
137 total_cus: 0,
138 });
139 event.min_cus = event.min_cus.min(this_program_cu);
140 event.max_cus = event.max_cus.max(this_program_cu);
141 event.total_cus = event.total_cus.saturating_add(this_program_cu);
142 event.count = event.count.saturating_add(1);
143 if errored {
144 event.error_count = event.error_count.saturating_add(1);
145 }
146 }
147
148 Ok(())
149 }
150 .boxed()
151 }
152
153 #[inline(always)]
154 fn on_block(
155 &self,
156 _thread_id: usize,
157 db: Option<Arc<Client>>,
158 block: &BlockData,
159 ) -> PluginFuture<'_> {
160 let slot = block.slot();
161 let block_time = block.block_time();
162 let was_skipped = block.was_skipped();
163 async move {
164 if was_skipped {
165 return Ok(());
166 }
167
168 let rows = Self::take_slot_events(slot, block_time);
169
170 if let Some(db_client) = db.as_ref()
171 && !rows.is_empty()
172 {
173 write_program_events(Arc::clone(db_client), rows)
174 .await
175 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
176 }
177
178 Ok(())
179 }
180 .boxed()
181 }
182
183 #[inline(always)]
184 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
185 async move {
186 log::info!("Program Tracking Plugin loaded.");
187 if let Some(db) = db {
188 log::info!("Creating program_invocations table if it does not exist...");
189 db.query(
190 r#"
191 CREATE TABLE IF NOT EXISTS program_invocations (
192 slot UInt32,
193 timestamp DateTime('UTC'),
194 program_id FixedString(32),
195 is_vote UInt8,
196 count UInt32,
197 error_count UInt32,
198 min_cus UInt32,
199 max_cus UInt32,
200 total_cus UInt32
201 )
202 ENGINE = ReplacingMergeTree(timestamp)
203 ORDER BY (slot, program_id, is_vote)
204 "#,
205 )
206 .execute()
207 .await?;
208 log::info!("done.");
209 } else {
210 log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
211 }
212 Ok(())
213 }
214 .boxed()
215 }
216
217 #[inline(always)]
218 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
219 async move {
220 if let Some(db_client) = db {
221 let rows = Self::drain_all_pending(None);
222 if !rows.is_empty() {
223 write_program_events(Arc::clone(&db_client), rows)
224 .await
225 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
226 Box::new(err)
227 })?;
228 }
229 backfill_program_timestamps(db_client)
230 .await
231 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
232 }
233 Ok(())
234 }
235 .boxed()
236 }
237}
238
239async fn write_program_events(
240 db: Arc<Client>,
241 rows: Vec<ProgramEvent>,
242) -> Result<(), clickhouse::error::Error> {
243 if rows.is_empty() {
244 return Ok(());
245 }
246 let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
247 for row in rows {
248 insert.write(&row).await?;
249 }
250 insert.end().await?;
251 Ok(())
252}
253
254fn clamp_block_time(block_time: Option<i64>) -> u32 {
255 let raw_ts = block_time.unwrap_or(0);
256 if raw_ts < 0 {
257 0
258 } else if raw_ts > u32::MAX as i64 {
259 u32::MAX
260 } else {
261 raw_ts as u32
262 }
263}
264
265async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
266 db.query(
267 r#"
268 INSERT INTO program_invocations
269 SELECT pi.slot,
270 ss.block_time,
271 pi.program_id,
272 pi.is_vote,
273 pi.count,
274 pi.error_count,
275 pi.min_cus,
276 pi.max_cus,
277 pi.total_cus
278 FROM program_invocations AS pi
279 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
280 WHERE pi.timestamp = toDateTime(0)
281 AND ss.block_time > toDateTime(0)
282 "#,
283 )
284 .execute()
285 .await?;
286
287 Ok(())
288}