jetstreamer_plugin/plugins/
program_tracking.rs1use ahash::RandomState;
2use std::{collections::HashMap, sync::Arc};
3
4use clickhouse::{Client, Row};
5use dashmap::DashMap;
6use futures_util::FutureExt;
7use once_cell::sync::Lazy;
8use serde::{Deserialize, Serialize};
9use solana_address::Address;
10use solana_message::VersionedMessage;
11
12use crate::{Plugin, PluginFuture};
13use jetstreamer_firehose::firehose::{BlockData, TransactionData};
14
15type SlotProgramKey = (Address, bool);
16type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent, RandomState>;
17
18static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents, RandomState>> =
19 Lazy::new(|| DashMap::with_hasher(RandomState::new()));
20
21#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
22struct ProgramEvent {
23 pub slot: u32,
24 pub timestamp: u32,
26 pub program_id: Address,
27 pub is_vote: bool,
28 pub count: u32,
29 pub error_count: u32,
30 pub min_cus: u32,
31 pub max_cus: u32,
32 pub total_cus: u32,
33}
34
35#[derive(Debug, Clone)]
36pub struct ProgramTrackingPlugin;
38
39impl ProgramTrackingPlugin {
40 pub const fn new() -> Self {
42 Self
43 }
44
45 fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
46 let timestamp = clamp_block_time(block_time);
47 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
48 return events_by_program
49 .into_values()
50 .map(|mut event| {
51 event.timestamp = timestamp;
52 event
53 })
54 .collect();
55 }
56 Vec::new()
57 }
58
59 fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
60 let timestamp = clamp_block_time(block_time);
61 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
62 let mut rows = Vec::new();
63 for slot in slots {
64 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
65 rows.extend(events_by_program.into_values().map(|mut event| {
66 event.timestamp = timestamp;
67 event
68 }));
69 }
70 }
71 rows
72 }
73}
74
75impl Default for ProgramTrackingPlugin {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl Plugin for ProgramTrackingPlugin {
82 #[inline(always)]
83 fn name(&self) -> &'static str {
84 "Program Tracking"
85 }
86
87 #[inline(always)]
88 fn on_transaction<'a>(
89 &'a self,
90 _thread_id: usize,
91 _db: Option<Arc<Client>>,
92 transaction: &'a TransactionData,
93 ) -> PluginFuture<'a> {
94 async move {
95 let message = &transaction.transaction.message;
96 let (account_keys, instructions) = match message {
97 VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
98 VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
99 };
100 if instructions.is_empty() {
101 return Ok(());
102 }
103 let program_ids = instructions
104 .iter()
105 .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
106 .cloned()
107 .collect::<Vec<_>>();
108 if program_ids.is_empty() {
109 return Ok(());
110 }
111 let total_cu = transaction
112 .transaction_status_meta
113 .compute_units_consumed
114 .unwrap_or(0) as u32;
115 let program_count = program_ids.len() as u32;
116 let errored = transaction.transaction_status_meta.status.is_err();
117 let slot = transaction.slot;
118 let is_vote = transaction.is_vote;
119
120 let mut slot_entry = PENDING_BY_SLOT
121 .entry(slot)
122 .or_insert_with(|| HashMap::with_hasher(RandomState::new()));
123 for program_id in program_ids.iter() {
124 let this_program_cu = if program_count == 0 {
125 0
126 } else {
127 total_cu / program_count
128 };
129 let event =
130 slot_entry
131 .entry((*program_id, is_vote))
132 .or_insert_with(|| ProgramEvent {
133 slot: slot.min(u32::MAX as u64) as u32,
134 timestamp: 0,
135 program_id: *program_id,
136 is_vote,
137 count: 0,
138 error_count: 0,
139 min_cus: u32::MAX,
140 max_cus: 0,
141 total_cus: 0,
142 });
143 event.min_cus = event.min_cus.min(this_program_cu);
144 event.max_cus = event.max_cus.max(this_program_cu);
145 event.total_cus = event.total_cus.saturating_add(this_program_cu);
146 event.count = event.count.saturating_add(1);
147 if errored {
148 event.error_count = event.error_count.saturating_add(1);
149 }
150 }
151
152 Ok(())
153 }
154 .boxed()
155 }
156
157 #[inline(always)]
158 fn on_block(
159 &self,
160 _thread_id: usize,
161 db: Option<Arc<Client>>,
162 block: &BlockData,
163 ) -> PluginFuture<'_> {
164 let slot = block.slot();
165 let block_time = block.block_time();
166 let was_skipped = block.was_skipped();
167 async move {
168 if was_skipped {
169 return Ok(());
170 }
171
172 let rows = Self::take_slot_events(slot, block_time);
173
174 if let Some(db_client) = db
175 && !rows.is_empty()
176 {
177 tokio::spawn(async move {
178 if let Err(err) = write_program_events(db_client, rows).await {
179 log::error!("failed to write program events: {}", err);
180 }
181 });
182 }
183
184 Ok(())
185 }
186 .boxed()
187 }
188
189 #[inline(always)]
190 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
191 async move {
192 log::info!("Program Tracking Plugin loaded.");
193 if let Some(db) = db {
194 log::info!("Creating program_invocations table if it does not exist...");
195 db.query(
196 r#"
197 CREATE TABLE IF NOT EXISTS program_invocations (
198 slot UInt32,
199 timestamp DateTime('UTC'),
200 program_id FixedString(32),
201 is_vote UInt8,
202 count UInt32,
203 error_count UInt32,
204 min_cus UInt32,
205 max_cus UInt32,
206 total_cus UInt32
207 )
208 ENGINE = ReplacingMergeTree(timestamp)
209 ORDER BY (slot, program_id, is_vote)
210 "#,
211 )
212 .execute()
213 .await?;
214 log::info!("done.");
215 } else {
216 log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
217 }
218 Ok(())
219 }
220 .boxed()
221 }
222
223 #[inline(always)]
224 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
225 async move {
226 if let Some(db_client) = db {
227 let rows = Self::drain_all_pending(None);
228 if !rows.is_empty() {
229 write_program_events(Arc::clone(&db_client), rows)
230 .await
231 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
232 Box::new(err)
233 })?;
234 }
235 backfill_program_timestamps(db_client)
236 .await
237 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
238 }
239 Ok(())
240 }
241 .boxed()
242 }
243}
244
245async fn write_program_events(
246 db: Arc<Client>,
247 rows: Vec<ProgramEvent>,
248) -> Result<(), clickhouse::error::Error> {
249 if rows.is_empty() {
250 return Ok(());
251 }
252 let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
253 for row in rows {
254 insert.write(&row).await?;
255 }
256 insert.end().await?;
257 Ok(())
258}
259
260fn clamp_block_time(block_time: Option<i64>) -> u32 {
261 let raw_ts = block_time.unwrap_or(0);
262 if raw_ts < 0 {
263 0
264 } else if raw_ts > u32::MAX as i64 {
265 u32::MAX
266 } else {
267 raw_ts as u32
268 }
269}
270
271async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
272 db.query(
273 r#"
274 INSERT INTO program_invocations
275 SELECT pi.slot,
276 ss.block_time,
277 pi.program_id,
278 pi.is_vote,
279 pi.count,
280 pi.error_count,
281 pi.min_cus,
282 pi.max_cus,
283 pi.total_cus
284 FROM program_invocations AS pi
285 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
286 WHERE pi.timestamp = toDateTime(0)
287 AND ss.block_time > toDateTime(0)
288 "#,
289 )
290 .execute()
291 .await?;
292
293 Ok(())
294}