jetstreamer_plugin/plugins/
program_tracking.rs

1use std::{collections::HashMap, sync::Arc};
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_address::Address;
9use solana_message::VersionedMessage;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, HashMap<Address, ProgramEvent>>> =
15    Lazy::new(DashMap::new);
16
17#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
18struct ProgramEvent {
19    pub slot: u32,
20    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; we clamp Solana i64.
21    pub timestamp: u32,
22    pub program_id: Address,
23    pub count: u32,
24    pub error_count: u32,
25    pub min_cus: u32,
26    pub max_cus: u32,
27    pub total_cus: u32,
28}
29
30#[derive(Debug, Default, Clone)]
31/// Tracks per-program invocation counts and writes them to ClickHouse.
32pub struct ProgramTrackingPlugin;
33
34impl ProgramTrackingPlugin {
35    fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
36        let timestamp = clamp_block_time(block_time);
37        if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
38            return events_by_program
39                .into_values()
40                .map(|mut event| {
41                    event.timestamp = timestamp;
42                    event
43                })
44                .collect();
45        }
46        Vec::new()
47    }
48
49    fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
50        let timestamp = clamp_block_time(block_time);
51        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
52        let mut rows = Vec::new();
53        for slot in slots {
54            if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
55                rows.extend(events_by_program.into_values().map(|mut event| {
56                    event.timestamp = timestamp;
57                    event
58                }));
59            }
60        }
61        rows
62    }
63}
64
65impl Plugin for ProgramTrackingPlugin {
66    #[inline(always)]
67    fn name(&self) -> &'static str {
68        "Program Tracking"
69    }
70
71    #[inline(always)]
72    fn on_transaction<'a>(
73        &'a self,
74        _thread_id: usize,
75        _db: Option<Arc<Client>>,
76        transaction: &'a TransactionData,
77    ) -> PluginFuture<'a> {
78        async move {
79            let message = &transaction.transaction.message;
80            let (account_keys, instructions) = match message {
81                VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
82                VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
83            };
84            if instructions.is_empty() {
85                return Ok(());
86            }
87            let program_ids = instructions
88                .iter()
89                .filter_map(|ix| account_keys.get(ix.program_id_index as usize))
90                .cloned()
91                .collect::<Vec<_>>();
92            if program_ids.is_empty() {
93                return Ok(());
94            }
95            let total_cu = transaction
96                .transaction_status_meta
97                .compute_units_consumed
98                .unwrap_or(0) as u32;
99            let program_count = program_ids.len() as u32;
100            let errored = transaction.transaction_status_meta.status.is_err();
101            let slot = transaction.slot;
102
103            let mut slot_entry = PENDING_BY_SLOT.entry(slot).or_default();
104            for program_id in program_ids.iter() {
105                let this_program_cu = if program_count == 0 {
106                    0
107                } else {
108                    total_cu / program_count
109                };
110                let event = slot_entry
111                    .entry(*program_id)
112                    .or_insert_with(|| ProgramEvent {
113                        slot: slot.min(u32::MAX as u64) as u32,
114                        timestamp: 0,
115                        program_id: *program_id,
116                        count: 0,
117                        error_count: 0,
118                        min_cus: u32::MAX,
119                        max_cus: 0,
120                        total_cus: 0,
121                    });
122                event.min_cus = event.min_cus.min(this_program_cu);
123                event.max_cus = event.max_cus.max(this_program_cu);
124                event.total_cus = event.total_cus.saturating_add(this_program_cu);
125                event.count = event.count.saturating_add(1);
126                if errored {
127                    event.error_count = event.error_count.saturating_add(1);
128                }
129            }
130
131            Ok(())
132        }
133        .boxed()
134    }
135
136    #[inline(always)]
137    fn on_block(
138        &self,
139        _thread_id: usize,
140        db: Option<Arc<Client>>,
141        block: &BlockData,
142    ) -> PluginFuture<'_> {
143        let slot = block.slot();
144        let block_time = block.block_time();
145        let was_skipped = block.was_skipped();
146        async move {
147            if was_skipped {
148                return Ok(());
149            }
150
151            let rows = Self::take_slot_events(slot, block_time);
152
153            if let Some(db_client) = db.as_ref()
154                && !rows.is_empty()
155            {
156                write_program_events(Arc::clone(db_client), rows)
157                    .await
158                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
159            }
160
161            Ok(())
162        }
163        .boxed()
164    }
165
166    #[inline(always)]
167    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
168        async move {
169            log::info!("Program Tracking Plugin loaded.");
170            if let Some(db) = db {
171                log::info!("Creating program_invocations table if it does not exist...");
172                db.query(
173                    r#"
174                    CREATE TABLE IF NOT EXISTS program_invocations (
175                        slot        UInt32,
176                        timestamp   DateTime('UTC'),
177                        program_id  FixedString(32),
178                        count       UInt32,
179                        error_count UInt32,
180                        min_cus     UInt32,
181                        max_cus     UInt32,
182                        total_cus   UInt32
183                    )
184                    ENGINE = ReplacingMergeTree(timestamp)
185                    ORDER BY (slot, program_id)
186                    "#,
187                )
188                .execute()
189                .await?;
190                log::info!("done.");
191            } else {
192                log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
193            }
194            Ok(())
195        }
196        .boxed()
197    }
198
199    #[inline(always)]
200    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
201        async move {
202            if let Some(db_client) = db {
203                let rows = Self::drain_all_pending(None);
204                if !rows.is_empty() {
205                    write_program_events(Arc::clone(&db_client), rows)
206                        .await
207                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
208                            Box::new(err)
209                        })?;
210                }
211                backfill_program_timestamps(db_client)
212                    .await
213                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
214            }
215            Ok(())
216        }
217        .boxed()
218    }
219}
220
221async fn write_program_events(
222    db: Arc<Client>,
223    rows: Vec<ProgramEvent>,
224) -> Result<(), clickhouse::error::Error> {
225    if rows.is_empty() {
226        return Ok(());
227    }
228    let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
229    for row in rows {
230        insert.write(&row).await?;
231    }
232    insert.end().await?;
233    Ok(())
234}
235
236fn clamp_block_time(block_time: Option<i64>) -> u32 {
237    let raw_ts = block_time.unwrap_or(0);
238    if raw_ts < 0 {
239        0
240    } else if raw_ts > u32::MAX as i64 {
241        u32::MAX
242    } else {
243        raw_ts as u32
244    }
245}
246
247async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
248    db.query(
249        r#"
250        INSERT INTO program_invocations
251        SELECT pi.slot,
252               ss.block_time,
253               pi.program_id,
254               pi.count,
255               pi.error_count,
256               pi.min_cus,
257               pi.max_cus,
258               pi.total_cus
259        FROM program_invocations AS pi
260        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
261        WHERE pi.timestamp = toDateTime(0)
262          AND ss.block_time > toDateTime(0)
263        "#,
264    )
265    .execute()
266    .await?;
267
268    Ok(())
269}