Skip to main content

jetstreamer_plugin/plugins/
program_tracking.rs

1use std::{collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14type SlotProgramKey = (Address, bool);
15type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent>;
16
17static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents>> = Lazy::new(DashMap::new);
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
20struct ProgramEvent {
21    pub slot: u32,
22    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
23    pub timestamp: u32,
24    pub program_id: Address,
25    pub is_vote: bool,
26    pub count: u32,
27    pub error_count: u32,
28    pub min_cus: u32,
29    pub max_cus: u32,
30    pub total_cus: u32,
31}
32
33#[derive(Debug, Clone)]
34/// Tracks per-program invocation counts (including vote transactions) and writes them to ClickHouse.
35pub struct ProgramTrackingPlugin;
36
37impl ProgramTrackingPlugin {
38    /// Creates a new instance that records both vote and non-vote transactions.
39    pub const fn new() -> Self {
40        Self
41    }
42
43    fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
44        let timestamp = clamp_block_time(block_time);
45        if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
46            return events_by_program
47                .into_values()
48                .map(|mut event| {
49                    event.timestamp = timestamp;
50                    event
51                })
52                .collect();
53        }
54        Vec::new()
55    }
56
57    fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
58        let timestamp = clamp_block_time(block_time);
59        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
60        let mut rows = Vec::new();
61        for slot in slots {
62            if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
63                rows.extend(events_by_program.into_values().map(|mut event| {
64                    event.timestamp = timestamp;
65                    event
66                }));
67            }
68        }
69        rows
70    }
71}
72
73impl Default for ProgramTrackingPlugin {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl Plugin for ProgramTrackingPlugin {
80    #[inline(always)]
81    fn name(&self) -> &'static str {
82        "Program Tracking"
83    }
84
85    #[inline(always)]
86    fn on_transaction<'a>(
87        &'a self,
88        _thread_id: usize,
89        _db: Option<Arc<Client>>,
90        transaction: &'a TransactionData,
91    ) -> PluginFuture<'a> {
92        async move {
93            let message = &transaction.transaction.message;
94            let (account_keys, instructions) = match message {
95                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
96                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
97            };
98            if instructions.is_empty() {
99                return Ok(());
100            }
101            let program_ids = instructions
102                .iter()
103                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
104                .cloned()
105                .collect::<Vec<_>>();
106            if program_ids.is_empty() {
107                return Ok(());
108            }
109            let total_cu = transaction
110                .transaction_status_meta
111                .compute_units_consumed
112                .unwrap_or(0) as u32;
113            let program_count = program_ids.len() as u32;
114            let errored = transaction.transaction_status_meta.status.is_err();
115            let slot = transaction.slot;
116            let is_vote = transaction.is_vote;
117
118            let mut slot_entry = PENDING_BY_SLOT.entry(slot).or_default();
119            for program_id in program_ids.iter() {
120                let this_program_cu = if program_count == 0 {
121                    0
122                } else {
123                    total_cu / program_count
124                };
125                let event =
126                    slot_entry
127                        .entry((*program_id, is_vote))
128                        .or_insert_with(|| ProgramEvent {
129                            slot: slot.min(u32::MAX as u64) as u32,
130                            timestamp: 0,
131                            program_id: *program_id,
132                            is_vote,
133                            count: 0,
134                            error_count: 0,
135                            min_cus: u32::MAX,
136                            max_cus: 0,
137                            total_cus: 0,
138                        });
139                event.min_cus = event.min_cus.min(this_program_cu);
140                event.max_cus = event.max_cus.max(this_program_cu);
141                event.total_cus = event.total_cus.saturating_add(this_program_cu);
142                event.count = event.count.saturating_add(1);
143                if errored {
144                    event.error_count = event.error_count.saturating_add(1);
145                }
146            }
147
148            Ok(())
149        }
150        .boxed()
151    }
152
153    #[inline(always)]
154    fn on_block(
155        &self,
156        _thread_id: usize,
157        db: Option<Arc<Client>>,
158        block: &BlockData,
159    ) -> PluginFuture<'_> {
160        let slot = block.slot();
161        let block_time = block.block_time();
162        let was_skipped = block.was_skipped();
163        async move {
164            if was_skipped {
165                return Ok(());
166            }
167
168            let rows = Self::take_slot_events(slot, block_time);
169
170            if let Some(db_client) = db
171                && !rows.is_empty()
172            {
173                tokio::spawn(async move {
174                    if let Err(err) = write_program_events(db_client, rows).await {
175                        log::error!("failed to write program events: {}", err);
176                    }
177                });
178            }
179
180            Ok(())
181        }
182        .boxed()
183    }
184
185    #[inline(always)]
186    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
187        async move {
188            log::info!("Program Tracking Plugin loaded.");
189            if let Some(db) = db {
190                log::info!("Creating program_invocations table if it does not exist...");
191                db.query(
192                    r#"
193                    CREATE TABLE IF NOT EXISTS program_invocations (
194                        slot        UInt32,
195                        timestamp   DateTime('UTC'),
196                        program_id  FixedString(32),
197                        is_vote     UInt8,
198                        count       UInt32,
199                        error_count UInt32,
200                        min_cus     UInt32,
201                        max_cus     UInt32,
202                        total_cus   UInt32
203                    )
204                    ENGINE = ReplacingMergeTree(timestamp)
205                    ORDER BY (slot, program_id, is_vote)
206                    "#,
207                )
208                .execute()
209                .await?;
210                log::info!("done.");
211            } else {
212                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
213            }
214            Ok(())
215        }
216        .boxed()
217    }
218
219    #[inline(always)]
220    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
221        async move {
222            if let Some(db_client) = db {
223                let rows = Self::drain_all_pending(None);
224                if !rows.is_empty() {
225                    write_program_events(Arc::clone(&db_client), rows)
226                        .await
227                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
228                            Box::new(err)
229                        })?;
230                }
231                backfill_program_timestamps(db_client)
232                    .await
233                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
234            }
235            Ok(())
236        }
237        .boxed()
238    }
239}
240
241async fn write_program_events(
242    db: Arc<Client>,
243    rows: Vec<ProgramEvent>,
244) -> Result<(), clickhouse::error::Error> {
245    if rows.is_empty() {
246        return Ok(());
247    }
248    let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
249    for row in rows {
250        insert.write(&row).await?;
251    }
252    insert.end().await?;
253    Ok(())
254}
255
256fn clamp_block_time(block_time: Option<i64>) -> u32 {
257    let raw_ts = block_time.unwrap_or(0);
258    if raw_ts < 0 {
259        0
260    } else if raw_ts > u32::MAX as i64 {
261        u32::MAX
262    } else {
263        raw_ts as u32
264    }
265}
266
267async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
268    db.query(
269        r#"
270        INSERT INTO program_invocations
271        SELECT pi.slot,
272               ss.block_time,
273               pi.program_id,
274               pi.is_vote,
275               pi.count,
276               pi.error_count,
277               pi.min_cus,
278               pi.max_cus,
279               pi.total_cus
280        FROM program_invocations AS pi
281        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
282        WHERE pi.timestamp = toDateTime(0)
283          AND ss.block_time > toDateTime(0)
284        "#,
285    )
286    .execute()
287    .await?;
288
289    Ok(())
290}