jetstreamer_plugin/plugins/
program_tracking.rs

1use std::{cell::RefCell, collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use futures_util::FutureExt;
5use log::error;
6use serde::{Deserialize, Serialize};
7use solana_sdk::{message::VersionedMessage, pubkey::Pubkey};
8
9use crate::{Plugin, PluginFuture};
10use jetstreamer_firehose::firehose::{BlockData, TransactionData};
11
12const DB_WRITE_INTERVAL_SLOTS: u64 = 1000;
13
14#[derive(Default)]
15struct ThreadLocalData {
16    slot_stats: HashMap<u64, HashMap<Pubkey, ProgramStats>>,
17    pending_rows: Vec<ProgramEvent>,
18    slots_since_flush: u64,
19}
20
21thread_local! {
22    static DATA: RefCell<ThreadLocalData> = RefCell::new(ThreadLocalData::default());
23}
24
25#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
26struct ProgramEvent {
27    pub slot: u32,
28    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
29    pub timestamp: u32,
30    pub program_id: Pubkey,
31    pub count: u32,
32    pub error_count: u32,
33    pub min_cus: u32,
34    pub max_cus: u32,
35    pub total_cus: u32,
36}
37
38#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
39struct ProgramStats {
40    pub count: u32,
41    pub error_count: u32,
42    pub min_cus: u32,
43    pub max_cus: u32,
44    pub total_cus: u32,
45}
46
47#[derive(Debug, Default, Clone)]
48/// Tracks per-program invocation counts and writes them to ClickHouse.
49pub struct ProgramTrackingPlugin;
50
51impl Plugin for ProgramTrackingPlugin {
52    #[inline(always)]
53    fn name(&self) -> &'static str {
54        "Program Tracking"
55    }
56
57    #[inline(always)]
58    fn on_transaction<'a>(
59        &'a self,
60        _thread_id: usize,
61        _db: Option<Arc<Client>>,
62        transaction: &'a TransactionData,
63    ) -> PluginFuture<'a> {
64        async move {
65            let message = &transaction.transaction.message;
66            let (account_keys, instructions) = match message {
67                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
68                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
69            };
70            if instructions.is_empty() {
71                return Ok(());
72            }
73            let program_ids = instructions
74                .iter()
75                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
76                .cloned()
77                .collect::<Vec<_>>();
78            if program_ids.is_empty() {
79                return Ok(());
80            }
81            let total_cu = transaction
82                .transaction_status_meta
83                .compute_units_consumed
84                .unwrap_or(0) as u32;
85            let program_count = program_ids.len() as u32;
86
87            DATA.with(|data| {
88                let mut data = data.borrow_mut();
89                let slot_data = data.slot_stats.entry(transaction.slot).or_default();
90
91                for program_id in program_ids.iter() {
92                    let this_program_cu = if program_count == 0 {
93                        0
94                    } else {
95                        total_cu / program_count
96                    };
97                    let stats = slot_data.entry(*program_id).or_insert(ProgramStats {
98                        min_cus: u32::MAX,
99                        max_cus: 0,
100                        total_cus: 0,
101                        count: 0,
102                        error_count: 0,
103                    });
104                    stats.min_cus = stats.min_cus.min(this_program_cu);
105                    stats.max_cus = stats.max_cus.max(this_program_cu);
106                    stats.total_cus += this_program_cu;
107                    stats.count += 1;
108                    if transaction.transaction_status_meta.status.is_err() {
109                        stats.error_count += 1;
110                    }
111                }
112            });
113
114            Ok(())
115        }
116        .boxed()
117    }
118
119    #[inline(always)]
120    fn on_block(
121        &self,
122        _thread_id: usize,
123        db: Option<Arc<Client>>,
124        block: &BlockData,
125    ) -> PluginFuture<'_> {
126        let slot_info = match block {
127            BlockData::Block {
128                slot, block_time, ..
129            } => Some((*slot, *block_time)),
130            BlockData::LeaderSkipped { .. } => None,
131        };
132        async move {
133            let Some((slot, block_time)) = slot_info else {
134                return Ok(());
135            };
136
137            let flush_rows = DATA.with(|data| {
138                let mut data = data.borrow_mut();
139                if let Some(slot_data) = data.slot_stats.remove(&slot) {
140                    let slot_rows = events_from_slot(slot, block_time, &slot_data);
141                    data.pending_rows.extend(slot_rows);
142                }
143                data.slots_since_flush = data.slots_since_flush.saturating_add(1);
144                if data.slots_since_flush >= DB_WRITE_INTERVAL_SLOTS {
145                    data.slots_since_flush = 0;
146                    if data.pending_rows.is_empty() {
147                        None
148                    } else {
149                        Some(data.pending_rows.drain(..).collect::<Vec<_>>())
150                    }
151                } else {
152                    None
153                }
154            });
155
156            if let (Some(db_client), Some(rows)) = (db, flush_rows) {
157                tokio::spawn(async move {
158                    if let Err(err) = write_program_events(db_client, rows).await {
159                        error!("failed to flush program rows: {}", err);
160                    }
161                });
162            }
163
164            Ok(())
165        }
166        .boxed()
167    }
168
169    #[inline(always)]
170    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
171        // Remove invalid `get_or_init` call in `on_load`
172        DATA.with(|_| {});
173        // SLOT_TIMESTAMPS is a Lazy global, nothing to initialize
174        async move {
175            log::info!("Program Tracking Plugin loaded.");
176            if let Some(db) = db {
177                log::info!("Creating program_invocations table if it does not exist...");
178                db.query(
179                    r#"
180                    CREATE TABLE IF NOT EXISTS program_invocations (
181                        slot        UInt32,
182                        timestamp   DateTime('UTC'),
183                        program_id  FixedString(32),
184                        count       UInt32,
185                        error_count UInt32,
186                        min_cus     UInt32,
187                        max_cus     UInt32,
188                        total_cus   UInt32
189                    )
190                    ENGINE = ReplacingMergeTree(slot)
191                    ORDER BY (slot, program_id)
192                    "#,
193                )
194                .execute()
195                .await?;
196                log::info!("done.");
197            } else {
198                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
199            }
200            Ok(())
201        }
202        .boxed()
203    }
204
205    #[inline(always)]
206    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
207        async move {
208            if let Some(db_client) = db {
209                let rows = DATA.with(|data| {
210                    let mut data = data.borrow_mut();
211                    let mut rows = std::mem::take(&mut data.pending_rows);
212                    for (slot, stats) in data.slot_stats.drain() {
213                        rows.extend(events_from_slot(slot, None, &stats));
214                    }
215                    rows
216                });
217                if !rows.is_empty()
218                    && let Err(err) = write_program_events(db_client, rows).await
219                {
220                    error!("failed to flush program rows on exit: {}", err);
221                }
222            }
223            Ok(())
224        }
225        .boxed()
226    }
227}
228
229async fn write_program_events(
230    db: Arc<Client>,
231    rows: Vec<ProgramEvent>,
232) -> Result<(), clickhouse::error::Error> {
233    if rows.is_empty() {
234        return Ok(());
235    }
236    let mut insert = db.insert("program_invocations")?;
237    for row in rows {
238        insert.write(&row).await?;
239    }
240    insert.end().await?;
241    Ok(())
242}
243
244fn events_from_slot(
245    slot: u64,
246    block_time: Option<i64>,
247    slot_data: &HashMap<Pubkey, ProgramStats>,
248) -> Vec<ProgramEvent> {
249    let raw_ts = block_time.unwrap_or(0);
250    let timestamp: u32 = if raw_ts < 0 {
251        0
252    } else if raw_ts > u32::MAX as i64 {
253        u32::MAX
254    } else {
255        raw_ts as u32
256    };
257
258    slot_data
259        .iter()
260        .map(|(program_id, stats)| ProgramEvent {
261            slot: slot.min(u32::MAX as u64) as u32,
262            program_id: *program_id,
263            count: stats.count,
264            error_count: stats.error_count,
265            min_cus: stats.min_cus,
266            max_cus: stats.max_cus,
267            total_cus: stats.total_cus,
268            timestamp,
269        })
270        .collect()
271}