jetstreamer_plugin/plugins/
program_tracking.rs1use std::{collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, HashMap<Address, ProgramEvent>>> =
15 Lazy::new(DashMap::new);
16
17#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
18struct ProgramEvent {
19 pub slot: u32,
20 pub timestamp: u32,
22 pub program_id: Address,
23 pub count: u32,
24 pub error_count: u32,
25 pub min_cus: u32,
26 pub max_cus: u32,
27 pub total_cus: u32,
28}
29
30#[derive(Debug, Default, Clone)]
31pub struct ProgramTrackingPlugin;
33
34impl ProgramTrackingPlugin {
35 fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
36 let timestamp = clamp_block_time(block_time);
37 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
38 return events_by_program
39 .into_values()
40 .map(|mut event| {
41 event.timestamp = timestamp;
42 event
43 })
44 .collect();
45 }
46 Vec::new()
47 }
48
49 fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
50 let timestamp = clamp_block_time(block_time);
51 let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
52 let mut rows = Vec::new();
53 for slot in slots {
54 if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
55 rows.extend(events_by_program.into_values().map(|mut event| {
56 event.timestamp = timestamp;
57 event
58 }));
59 }
60 }
61 rows
62 }
63}
64
65impl Plugin for ProgramTrackingPlugin {
66 #[inline(always)]
67 fn name(&self) -> &'static str {
68 "Program Tracking"
69 }
70
71 #[inline(always)]
72 fn on_transaction<'a>(
73 &'a self,
74 _thread_id: usize,
75 _db: Option<Arc<Client>>,
76 transaction: &'a TransactionData,
77 ) -> PluginFuture<'a> {
78 async move {
79 let message = &transaction.transaction.message;
80 let (account_keys, instructions) = match message {
81 VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
82 VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
83 };
84 if instructions.is_empty() {
85 return Ok(());
86 }
87 let program_ids = instructions
88 .iter()
89 .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
90 .cloned()
91 .collect::<Vec<_>>();
92 if program_ids.is_empty() {
93 return Ok(());
94 }
95 let total_cu = transaction
96 .transaction_status_meta
97 .compute_units_consumed
98 .unwrap_or(0) as u32;
99 let program_count = program_ids.len() as u32;
100 let errored = transaction.transaction_status_meta.status.is_err();
101 let slot = transaction.slot;
102
103 let mut slot_entry = PENDING_BY_SLOT.entry(slot).or_default();
104 for program_id in program_ids.iter() {
105 let this_program_cu = if program_count == 0 {
106 0
107 } else {
108 total_cu / program_count
109 };
110 let event = slot_entry
111 .entry(*program_id)
112 .or_insert_with(|| ProgramEvent {
113 slot: slot.min(u32::MAX as u64) as u32,
114 timestamp: 0,
115 program_id: *program_id,
116 count: 0,
117 error_count: 0,
118 min_cus: u32::MAX,
119 max_cus: 0,
120 total_cus: 0,
121 });
122 event.min_cus = event.min_cus.min(this_program_cu);
123 event.max_cus = event.max_cus.max(this_program_cu);
124 event.total_cus = event.total_cus.saturating_add(this_program_cu);
125 event.count = event.count.saturating_add(1);
126 if errored {
127 event.error_count = event.error_count.saturating_add(1);
128 }
129 }
130
131 Ok(())
132 }
133 .boxed()
134 }
135
136 #[inline(always)]
137 fn on_block(
138 &self,
139 _thread_id: usize,
140 db: Option<Arc<Client>>,
141 block: &BlockData,
142 ) -> PluginFuture<'_> {
143 let slot = block.slot();
144 let block_time = block.block_time();
145 let was_skipped = block.was_skipped();
146 async move {
147 if was_skipped {
148 return Ok(());
149 }
150
151 let rows = Self::take_slot_events(slot, block_time);
152
153 if let Some(db_client) = db.as_ref()
154 && !rows.is_empty()
155 {
156 write_program_events(Arc::clone(db_client), rows)
157 .await
158 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
159 }
160
161 Ok(())
162 }
163 .boxed()
164 }
165
166 #[inline(always)]
167 fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
168 async move {
169 log::info!("Program Tracking Plugin loaded.");
170 if let Some(db) = db {
171 log::info!("Creating program_invocations table if it does not exist...");
172 db.query(
173 r#"
174 CREATE TABLE IF NOT EXISTS program_invocations (
175 slot UInt32,
176 timestamp DateTime('UTC'),
177 program_id FixedString(32),
178 count UInt32,
179 error_count UInt32,
180 min_cus UInt32,
181 max_cus UInt32,
182 total_cus UInt32
183 )
184 ENGINE = ReplacingMergeTree(timestamp)
185 ORDER BY (slot, program_id)
186 "#,
187 )
188 .execute()
189 .await?;
190 log::info!("done.");
191 } else {
192 log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
193 }
194 Ok(())
195 }
196 .boxed()
197 }
198
199 #[inline(always)]
200 fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
201 async move {
202 if let Some(db_client) = db {
203 let rows = Self::drain_all_pending(None);
204 if !rows.is_empty() {
205 write_program_events(Arc::clone(&db_client), rows)
206 .await
207 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
208 Box::new(err)
209 })?;
210 }
211 backfill_program_timestamps(db_client)
212 .await
213 .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
214 }
215 Ok(())
216 }
217 .boxed()
218 }
219}
220
221async fn write_program_events(
222 db: Arc<Client>,
223 rows: Vec<ProgramEvent>,
224) -> Result<(), clickhouse::error::Error> {
225 if rows.is_empty() {
226 return Ok(());
227 }
228 let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
229 for row in rows {
230 insert.write(&row).await?;
231 }
232 insert.end().await?;
233 Ok(())
234}
235
236fn clamp_block_time(block_time: Option<i64>) -> u32 {
237 let raw_ts = block_time.unwrap_or(0);
238 if raw_ts < 0 {
239 0
240 } else if raw_ts > u32::MAX as i64 {
241 u32::MAX
242 } else {
243 raw_ts as u32
244 }
245}
246
247async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
248 db.query(
249 r#"
250 INSERT INTO program_invocations
251 SELECT pi.slot,
252 ss.block_time,
253 pi.program_id,
254 pi.count,
255 pi.error_count,
256 pi.min_cus,
257 pi.max_cus,
258 pi.total_cus
259 FROM program_invocations AS pi
260 ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
261 WHERE pi.timestamp = toDateTime(0)
262 AND ss.block_time > toDateTime(0)
263 "#,
264 )
265 .execute()
266 .await?;
267
268 Ok(())
269}