Skip to main content

jetstreamer_plugin/plugins/
program_tracking.rs

1use ahash::RandomState;
2use std::{collections::HashMap, sync::Arc};
3
4use clickhouse::{Client, Row};
5use dashmap::DashMap;
6use futures_util::FutureExt;
7use once_cell::sync::Lazy;
8use serde::{Deserialize, Serialize};
9use solana_address::Address;
10use solana_message::VersionedMessage;
11
12use crate::{Plugin, PluginFuture};
13use jetstreamer_firehose::firehose::{BlockData, TransactionData};
14
15type SlotProgramKey = (Address, bool);
16type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent, RandomState>;
17
18static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents, RandomState>> =
19    Lazy::new(|| DashMap::with_hasher(RandomState::new()));
20
21#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
22struct ProgramEvent {
23    pub slot: u32,
24    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
25    pub timestamp: u32,
26    pub program_id: Address,
27    pub is_vote: bool,
28    pub count: u32,
29    pub error_count: u32,
30    pub min_cus: u32,
31    pub max_cus: u32,
32    pub total_cus: u32,
33}
34
35#[derive(Debug, Clone)]
36/// Tracks per-program invocation counts (including vote transactions) and writes them to ClickHouse.
37pub struct ProgramTrackingPlugin;
38
39impl ProgramTrackingPlugin {
40    /// Creates a new instance that records both vote and non-vote transactions.
41    pub const fn new() -> Self {
42        Self
43    }
44
45    fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
46        let timestamp = clamp_block_time(block_time);
47        if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
48            return events_by_program
49                .into_values()
50                .map(|mut event| {
51                    event.timestamp = timestamp;
52                    event
53                })
54                .collect();
55        }
56        Vec::new()
57    }
58
59    fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
60        let timestamp = clamp_block_time(block_time);
61        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
62        let mut rows = Vec::new();
63        for slot in slots {
64            if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
65                rows.extend(events_by_program.into_values().map(|mut event| {
66                    event.timestamp = timestamp;
67                    event
68                }));
69            }
70        }
71        rows
72    }
73}
74
75impl Default for ProgramTrackingPlugin {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl Plugin for ProgramTrackingPlugin {
82    #[inline(always)]
83    fn name(&self) -> &'static str {
84        "Program Tracking"
85    }
86
87    #[inline(always)]
88    fn on_transaction<'a>(
89        &'a self,
90        _thread_id: usize,
91        _db: Option<Arc<Client>>,
92        transaction: &'a TransactionData,
93    ) -> PluginFuture<'a> {
94        async move {
95            let message = &transaction.transaction.message;
96            let (account_keys, instructions) = match message {
97                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
98                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
99            };
100            if instructions.is_empty() {
101                return Ok(());
102            }
103            let program_ids = instructions
104                .iter()
105                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
106                .cloned()
107                .collect::<Vec<_>>();
108            if program_ids.is_empty() {
109                return Ok(());
110            }
111            let total_cu = transaction
112                .transaction_status_meta
113                .compute_units_consumed
114                .unwrap_or(0) as u32;
115            let program_count = program_ids.len() as u32;
116            let errored = transaction.transaction_status_meta.status.is_err();
117            let slot = transaction.slot;
118            let is_vote = transaction.is_vote;
119
120            let mut slot_entry = PENDING_BY_SLOT
121                .entry(slot)
122                .or_insert_with(|| HashMap::with_hasher(RandomState::new()));
123            for program_id in program_ids.iter() {
124                let this_program_cu = if program_count == 0 {
125                    0
126                } else {
127                    total_cu / program_count
128                };
129                let event =
130                    slot_entry
131                        .entry((*program_id, is_vote))
132                        .or_insert_with(|| ProgramEvent {
133                            slot: slot.min(u32::MAX as u64) as u32,
134                            timestamp: 0,
135                            program_id: *program_id,
136                            is_vote,
137                            count: 0,
138                            error_count: 0,
139                            min_cus: u32::MAX,
140                            max_cus: 0,
141                            total_cus: 0,
142                        });
143                event.min_cus = event.min_cus.min(this_program_cu);
144                event.max_cus = event.max_cus.max(this_program_cu);
145                event.total_cus = event.total_cus.saturating_add(this_program_cu);
146                event.count = event.count.saturating_add(1);
147                if errored {
148                    event.error_count = event.error_count.saturating_add(1);
149                }
150            }
151
152            Ok(())
153        }
154        .boxed()
155    }
156
157    #[inline(always)]
158    fn on_block(
159        &self,
160        _thread_id: usize,
161        db: Option<Arc<Client>>,
162        block: &BlockData,
163    ) -> PluginFuture<'_> {
164        let slot = block.slot();
165        let block_time = block.block_time();
166        let was_skipped = block.was_skipped();
167        async move {
168            if was_skipped {
169                return Ok(());
170            }
171
172            let rows = Self::take_slot_events(slot, block_time);
173
174            if let Some(db_client) = db
175                && !rows.is_empty()
176            {
177                tokio::spawn(async move {
178                    if let Err(err) = write_program_events(db_client, rows).await {
179                        log::error!("failed to write program events: {}", err);
180                    }
181                });
182            }
183
184            Ok(())
185        }
186        .boxed()
187    }
188
189    #[inline(always)]
190    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
191        async move {
192            log::info!("Program Tracking Plugin loaded.");
193            if let Some(db) = db {
194                log::info!("Creating program_invocations table if it does not exist...");
195                db.query(
196                    r#"
197                    CREATE TABLE IF NOT EXISTS program_invocations (
198                        slot        UInt32,
199                        timestamp   DateTime('UTC'),
200                        program_id  FixedString(32),
201                        is_vote     UInt8,
202                        count       UInt32,
203                        error_count UInt32,
204                        min_cus     UInt32,
205                        max_cus     UInt32,
206                        total_cus   UInt32
207                    )
208                    ENGINE = ReplacingMergeTree(timestamp)
209                    ORDER BY (slot, program_id, is_vote)
210                    "#,
211                )
212                .execute()
213                .await?;
214                log::info!("done.");
215            } else {
216                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
217            }
218            Ok(())
219        }
220        .boxed()
221    }
222
223    #[inline(always)]
224    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
225        async move {
226            if let Some(db_client) = db {
227                let rows = Self::drain_all_pending(None);
228                if !rows.is_empty() {
229                    write_program_events(Arc::clone(&db_client), rows)
230                        .await
231                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
232                            Box::new(err)
233                        })?;
234                }
235                backfill_program_timestamps(db_client)
236                    .await
237                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
238            }
239            Ok(())
240        }
241        .boxed()
242    }
243}
244
245async fn write_program_events(
246    db: Arc<Client>,
247    rows: Vec<ProgramEvent>,
248) -> Result<(), clickhouse::error::Error> {
249    if rows.is_empty() {
250        return Ok(());
251    }
252    let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
253    for row in rows {
254        insert.write(&row).await?;
255    }
256    insert.end().await?;
257    Ok(())
258}
259
260fn clamp_block_time(block_time: Option<i64>) -> u32 {
261    let raw_ts = block_time.unwrap_or(0);
262    if raw_ts < 0 {
263        0
264    } else if raw_ts > u32::MAX as i64 {
265        u32::MAX
266    } else {
267        raw_ts as u32
268    }
269}
270
271async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
272    db.query(
273        r#"
274        INSERT INTO program_invocations
275        SELECT pi.slot,
276               ss.block_time,
277               pi.program_id,
278               pi.is_vote,
279               pi.count,
280               pi.error_count,
281               pi.min_cus,
282               pi.max_cus,
283               pi.total_cus
284        FROM program_invocations AS pi
285        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
286        WHERE pi.timestamp = toDateTime(0)
287          AND ss.block_time > toDateTime(0)
288        "#,
289    )
290    .execute()
291    .await?;
292
293    Ok(())
294}