jetstreamer_plugin/plugins/
program_tracking.rs

1use std::{cell::RefCell, collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use futures_util::FutureExt;
5use log::error;
6use serde::{Deserialize, Serialize};
7use solana_address::Address;
8use solana_message::VersionedMessage;
9
10use crate::{Plugin, PluginFuture};
11use jetstreamer_firehose::firehose::{BlockData, TransactionData};
12
13const DB_WRITE_INTERVAL_SLOTS: u64 = 1000;
14
15#[derive(Default)]
16struct ThreadLocalData {
17    slot_stats: HashMap<u64, HashMap<Address, ProgramStats>>,
18    pending_rows: Vec<ProgramEvent>,
19    slots_since_flush: u64,
20}
21
22thread_local! {
23    static DATA: RefCell<ThreadLocalData> = RefCell::new(ThreadLocalData::default());
24}
25
26#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
27struct ProgramEvent {
28    pub slot: u32,
29    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
30    pub timestamp: u32,
31    pub program_id: Address,
32    pub count: u32,
33    pub error_count: u32,
34    pub min_cus: u32,
35    pub max_cus: u32,
36    pub total_cus: u32,
37}
38
39#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
40struct ProgramStats {
41    pub count: u32,
42    pub error_count: u32,
43    pub min_cus: u32,
44    pub max_cus: u32,
45    pub total_cus: u32,
46}
47
48#[derive(Debug, Default, Clone)]
49/// Tracks per-program invocation counts and writes them to ClickHouse.
50pub struct ProgramTrackingPlugin;
51
52impl Plugin for ProgramTrackingPlugin {
53    #[inline(always)]
54    fn name(&self) -> &'static str {
55        "Program Tracking"
56    }
57
58    #[inline(always)]
59    fn on_transaction<'a>(
60        &'a self,
61        _thread_id: usize,
62        _db: Option<Arc<Client>>,
63        transaction: &'a TransactionData,
64    ) -> PluginFuture<'a> {
65        async move {
66            let message = &transaction.transaction.message;
67            let (account_keys, instructions) = match message {
68                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
69                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
70            };
71            if instructions.is_empty() {
72                return Ok(());
73            }
74            let program_ids = instructions
75                .iter()
76                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
77                .cloned()
78                .collect::<Vec<_>>();
79            if program_ids.is_empty() {
80                return Ok(());
81            }
82            let total_cu = transaction
83                .transaction_status_meta
84                .compute_units_consumed
85                .unwrap_or(0) as u32;
86            let program_count = program_ids.len() as u32;
87
88            DATA.with(|data| {
89                let mut data = data.borrow_mut();
90                let slot_data = data.slot_stats.entry(transaction.slot).or_default();
91
92                for program_id in program_ids.iter() {
93                    let this_program_cu = if program_count == 0 {
94                        0
95                    } else {
96                        total_cu / program_count
97                    };
98                    let stats = slot_data.entry(*program_id).or_insert(ProgramStats {
99                        min_cus: u32::MAX,
100                        max_cus: 0,
101                        total_cus: 0,
102                        count: 0,
103                        error_count: 0,
104                    });
105                    stats.min_cus = stats.min_cus.min(this_program_cu);
106                    stats.max_cus = stats.max_cus.max(this_program_cu);
107                    stats.total_cus += this_program_cu;
108                    stats.count += 1;
109                    if transaction.transaction_status_meta.status.is_err() {
110                        stats.error_count += 1;
111                    }
112                }
113            });
114
115            Ok(())
116        }
117        .boxed()
118    }
119
120    #[inline(always)]
121    fn on_block(
122        &self,
123        _thread_id: usize,
124        db: Option<Arc<Client>>,
125        block: &BlockData,
126    ) -> PluginFuture<'_> {
127        let slot_info = match block {
128            BlockData::Block {
129                slot, block_time, ..
130            } => Some((*slot, *block_time)),
131            BlockData::LeaderSkipped { .. } => None,
132        };
133        async move {
134            let Some((slot, block_time)) = slot_info else {
135                return Ok(());
136            };
137
138            let flush_rows = DATA.with(|data| {
139                let mut data = data.borrow_mut();
140                if let Some(slot_data) = data.slot_stats.remove(&slot) {
141                    let slot_rows = events_from_slot(slot, block_time, &slot_data);
142                    data.pending_rows.extend(slot_rows);
143                }
144                data.slots_since_flush = data.slots_since_flush.saturating_add(1);
145                if data.slots_since_flush >= DB_WRITE_INTERVAL_SLOTS {
146                    data.slots_since_flush = 0;
147                    if data.pending_rows.is_empty() {
148                        None
149                    } else {
150                        Some(data.pending_rows.drain(..).collect::<Vec<_>>())
151                    }
152                } else {
153                    None
154                }
155            });
156
157            if let (Some(db_client), Some(rows)) = (db, flush_rows) {
158                tokio::spawn(async move {
159                    if let Err(err) = write_program_events(db_client, rows).await {
160                        error!("failed to flush program rows: {}", err);
161                    }
162                });
163            }
164
165            Ok(())
166        }
167        .boxed()
168    }
169
170    #[inline(always)]
171    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
172        // Remove invalid `get_or_init` call in `on_load`
173        DATA.with(|_| {});
174        // SLOT_TIMESTAMPS is a Lazy global, nothing to initialize
175        async move {
176            log::info!("Program Tracking Plugin loaded.");
177            if let Some(db) = db {
178                log::info!("Creating program_invocations table if it does not exist...");
179                db.query(
180                    r#"
181                    CREATE TABLE IF NOT EXISTS program_invocations (
182                        slot        UInt32,
183                        timestamp   DateTime('UTC'),
184                        program_id  FixedString(32),
185                        count       UInt32,
186                        error_count UInt32,
187                        min_cus     UInt32,
188                        max_cus     UInt32,
189                        total_cus   UInt32
190                    )
191                    ENGINE = ReplacingMergeTree(slot)
192                    ORDER BY (slot, program_id)
193                    "#,
194                )
195                .execute()
196                .await?;
197                log::info!("done.");
198            } else {
199                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
200            }
201            Ok(())
202        }
203        .boxed()
204    }
205
206    #[inline(always)]
207    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
208        async move {
209            if let Some(db_client) = db {
210                let rows = DATA.with(|data| {
211                    let mut data = data.borrow_mut();
212                    let mut rows = std::mem::take(&mut data.pending_rows);
213                    for (slot, stats) in data.slot_stats.drain() {
214                        rows.extend(events_from_slot(slot, None, &stats));
215                    }
216                    rows
217                });
218                if !rows.is_empty()
219                    && let Err(err) = write_program_events(db_client, rows).await
220                {
221                    error!("failed to flush program rows on exit: {}", err);
222                }
223            }
224            Ok(())
225        }
226        .boxed()
227    }
228}
229
230async fn write_program_events(
231    db: Arc<Client>,
232    rows: Vec<ProgramEvent>,
233) -> Result<(), clickhouse::error::Error> {
234    if rows.is_empty() {
235        return Ok(());
236    }
237    let mut insert = db.insert("program_invocations")?;
238    for row in rows {
239        insert.write(&row).await?;
240    }
241    insert.end().await?;
242    Ok(())
243}
244
245fn events_from_slot(
246    slot: u64,
247    block_time: Option<i64>,
248    slot_data: &HashMap<Address, ProgramStats>,
249) -> Vec<ProgramEvent> {
250    let raw_ts = block_time.unwrap_or(0);
251    let timestamp: u32 = if raw_ts < 0 {
252        0
253    } else if raw_ts > u32::MAX as i64 {
254        u32::MAX
255    } else {
256        raw_ts as u32
257    };
258
259    slot_data
260        .iter()
261        .map(|(program_id, stats)| ProgramEvent {
262            slot: slot.min(u32::MAX as u64) as u32,
263            program_id: *program_id,
264            count: stats.count,
265            error_count: stats.error_count,
266            min_cus: stats.min_cus,
267            max_cus: stats.max_cus,
268            total_cus: stats.total_cus,
269            timestamp,
270        })
271        .collect()
272}