jetstreamer_plugin/plugins/
program_tracking.rs

1use std::{collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14type SlotProgramKey = (Address, bool);
15type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent>;
16
17static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents>> = Lazy::new(DashMap::new);
18
19#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
20struct ProgramEvent {
21    pub slot: u32,
22    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
23    pub timestamp: u32,
24    pub program_id: Address,
25    pub is_vote: bool,
26    pub count: u32,
27    pub error_count: u32,
28    pub min_cus: u32,
29    pub max_cus: u32,
30    pub total_cus: u32,
31}
32
33#[derive(Debug, Clone)]
34/// Tracks per-program invocation counts (including vote transactions) and writes them to ClickHouse.
35pub struct ProgramTrackingPlugin;
36
37impl ProgramTrackingPlugin {
38    /// Creates a new instance that records both vote and non-vote transactions.
39    pub const fn new() -> Self {
40        Self
41    }
42
43    fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
44        let timestamp = clamp_block_time(block_time);
45        if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
46            return events_by_program
47                .into_values()
48                .map(|mut event| {
49                    event.timestamp = timestamp;
50                    event
51                })
52                .collect();
53        }
54        Vec::new()
55    }
56
57    fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
58        let timestamp = clamp_block_time(block_time);
59        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
60        let mut rows = Vec::new();
61        for slot in slots {
62            if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
63                rows.extend(events_by_program.into_values().map(|mut event| {
64                    event.timestamp = timestamp;
65                    event
66                }));
67            }
68        }
69        rows
70    }
71}
72
73impl Default for ProgramTrackingPlugin {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79impl Plugin for ProgramTrackingPlugin {
80    #[inline(always)]
81    fn name(&self) -> &'static str {
82        "Program Tracking"
83    }
84
85    #[inline(always)]
86    fn on_transaction<'a>(
87        &'a self,
88        _thread_id: usize,
89        _db: Option<Arc<Client>>,
90        transaction: &'a TransactionData,
91    ) -> PluginFuture<'a> {
92        async move {
93            let message = &transaction.transaction.message;
94            let (account_keys, instructions) = match message {
95                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
96                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
97            };
98            if instructions.is_empty() {
99                return Ok(());
100            }
101            let program_ids = instructions
102                .iter()
103                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
104                .cloned()
105                .collect::<Vec<_>>();
106            if program_ids.is_empty() {
107                return Ok(());
108            }
109            let total_cu = transaction
110                .transaction_status_meta
111                .compute_units_consumed
112                .unwrap_or(0) as u32;
113            let program_count = program_ids.len() as u32;
114            let errored = transaction.transaction_status_meta.status.is_err();
115            let slot = transaction.slot;
116            let is_vote = transaction.is_vote;
117
118            let mut slot_entry = PENDING_BY_SLOT.entry(slot).or_default();
119            for program_id in program_ids.iter() {
120                let this_program_cu = if program_count == 0 {
121                    0
122                } else {
123                    total_cu / program_count
124                };
125                let event =
126                    slot_entry
127                        .entry((*program_id, is_vote))
128                        .or_insert_with(|| ProgramEvent {
129                            slot: slot.min(u32::MAX as u64) as u32,
130                            timestamp: 0,
131                            program_id: *program_id,
132                            is_vote,
133                            count: 0,
134                            error_count: 0,
135                            min_cus: u32::MAX,
136                            max_cus: 0,
137                            total_cus: 0,
138                        });
139                event.min_cus = event.min_cus.min(this_program_cu);
140                event.max_cus = event.max_cus.max(this_program_cu);
141                event.total_cus = event.total_cus.saturating_add(this_program_cu);
142                event.count = event.count.saturating_add(1);
143                if errored {
144                    event.error_count = event.error_count.saturating_add(1);
145                }
146            }
147
148            Ok(())
149        }
150        .boxed()
151    }
152
153    #[inline(always)]
154    fn on_block(
155        &self,
156        _thread_id: usize,
157        db: Option<Arc<Client>>,
158        block: &BlockData,
159    ) -> PluginFuture<'_> {
160        let slot = block.slot();
161        let block_time = block.block_time();
162        let was_skipped = block.was_skipped();
163        async move {
164            if was_skipped {
165                return Ok(());
166            }
167
168            let rows = Self::take_slot_events(slot, block_time);
169
170            if let Some(db_client) = db.as_ref()
171                && !rows.is_empty()
172            {
173                write_program_events(Arc::clone(db_client), rows)
174                    .await
175                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
176            }
177
178            Ok(())
179        }
180        .boxed()
181    }
182
183    #[inline(always)]
184    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
185        async move {
186            log::info!("Program Tracking Plugin loaded.");
187            if let Some(db) = db {
188                log::info!("Creating program_invocations table if it does not exist...");
189                db.query(
190                    r#"
191                    CREATE TABLE IF NOT EXISTS program_invocations (
192                        slot        UInt32,
193                        timestamp   DateTime('UTC'),
194                        program_id  FixedString(32),
195                        is_vote     UInt8,
196                        count       UInt32,
197                        error_count UInt32,
198                        min_cus     UInt32,
199                        max_cus     UInt32,
200                        total_cus   UInt32
201                    )
202                    ENGINE = ReplacingMergeTree(timestamp)
203                    ORDER BY (slot, program_id, is_vote)
204                    "#,
205                )
206                .execute()
207                .await?;
208                log::info!("done.");
209            } else {
210                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
211            }
212            Ok(())
213        }
214        .boxed()
215    }
216
217    #[inline(always)]
218    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
219        async move {
220            if let Some(db_client) = db {
221                let rows = Self::drain_all_pending(None);
222                if !rows.is_empty() {
223                    write_program_events(Arc::clone(&db_client), rows)
224                        .await
225                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
226                            Box::new(err)
227                        })?;
228                }
229                backfill_program_timestamps(db_client)
230                    .await
231                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
232            }
233            Ok(())
234        }
235        .boxed()
236    }
237}
238
239async fn write_program_events(
240    db: Arc<Client>,
241    rows: Vec<ProgramEvent>,
242) -> Result<(), clickhouse::error::Error> {
243    if rows.is_empty() {
244        return Ok(());
245    }
246    let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
247    for row in rows {
248        insert.write(&row).await?;
249    }
250    insert.end().await?;
251    Ok(())
252}
253
254fn clamp_block_time(block_time: Option<i64>) -> u32 {
255    let raw_ts = block_time.unwrap_or(0);
256    if raw_ts < 0 {
257        0
258    } else if raw_ts > u32::MAX as i64 {
259        u32::MAX
260    } else {
261        raw_ts as u32
262    }
263}
264
265async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
266    db.query(
267        r#"
268        INSERT INTO program_invocations
269        SELECT pi.slot,
270               ss.block_time,
271               pi.program_id,
272               pi.is_vote,
273               pi.count,
274               pi.error_count,
275               pi.min_cus,
276               pi.max_cus,
277               pi.total_cus
278        FROM program_invocations AS pi
279        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
280        WHERE pi.timestamp = toDateTime(0)
281          AND ss.block_time > toDateTime(0)
282        "#,
283    )
284    .execute()
285    .await?;
286
287    Ok(())
288}