Skip to main content

jetstreamer_plugin/plugins/
instruction_tracking.rs

1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9use solana_sdk_ids::vote::id as vote_program_id;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent>> = Lazy::new(DashMap::new);
15
16#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
17struct SlotInstructionEvent {
18    slot: u32,
19    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; clamp Solana's i64 timestamp.
20    timestamp: u32,
21    vote_instruction_count: u64,
22    non_vote_instruction_count: u64,
23    vote_transaction_count: u32,
24    non_vote_transaction_count: u32,
25}
26
27#[derive(Debug, Clone)]
28/// Tracks total instructions executed per slot (votes and non-votes separated) and batches writes to ClickHouse.
29pub struct InstructionTrackingPlugin;
30
31impl InstructionTrackingPlugin {
32    /// Creates a new instance that records both vote and non-vote transactions.
33    pub const fn new() -> Self {
34        Self
35    }
36
37    fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
38        let timestamp = clamp_block_time(block_time);
39        PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
40            event.timestamp = timestamp;
41            event
42        })
43    }
44
45    fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
46        let timestamp = clamp_block_time(block_time);
47        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
48        let mut rows = Vec::new();
49        for slot in slots {
50            if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
51                event.timestamp = timestamp;
52                rows.push(event);
53            }
54        }
55        rows
56    }
57}
58
59impl Default for InstructionTrackingPlugin {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl Plugin for InstructionTrackingPlugin {
66    #[inline(always)]
67    fn name(&self) -> &'static str {
68        "Instruction Tracking"
69    }
70
71    #[inline(always)]
72    fn on_transaction<'a>(
73        &'a self,
74        _thread_id: usize,
75        _db: Option<Arc<Client>>,
76        transaction: &'a TransactionData,
77    ) -> PluginFuture<'a> {
78        async move {
79            let (vote_instruction_count, non_vote_instruction_count) =
80                instruction_vote_counts(transaction);
81
82            let slot = transaction.slot;
83            let mut entry = PENDING_BY_SLOT
84                .entry(slot)
85                .or_insert_with(|| SlotInstructionEvent {
86                    slot: slot.min(u32::MAX as u64) as u32,
87                    timestamp: 0,
88                    vote_instruction_count: 0,
89                    non_vote_instruction_count: 0,
90                    vote_transaction_count: 0,
91                    non_vote_transaction_count: 0,
92                });
93            entry.vote_instruction_count = entry
94                .vote_instruction_count
95                .saturating_add(vote_instruction_count);
96            entry.non_vote_instruction_count = entry
97                .non_vote_instruction_count
98                .saturating_add(non_vote_instruction_count);
99            if vote_instruction_count > 0 {
100                entry.vote_transaction_count = entry.vote_transaction_count.saturating_add(1);
101            } else {
102                entry.non_vote_transaction_count =
103                    entry.non_vote_transaction_count.saturating_add(1);
104            }
105
106            Ok(())
107        }
108        .boxed()
109    }
110
111    #[inline(always)]
112    fn on_block(
113        &self,
114        _thread_id: usize,
115        db: Option<Arc<Client>>,
116        block: &BlockData,
117    ) -> PluginFuture<'_> {
118        let slot = block.slot();
119        let block_time = block.block_time();
120        let was_skipped = block.was_skipped();
121
122        async move {
123            if was_skipped {
124                return Ok(());
125            }
126
127            let rows = Self::take_slot_event(slot, block_time)
128                .into_iter()
129                .collect::<Vec<_>>();
130
131            if let Some(db_client) = db
132                && !rows.is_empty()
133            {
134                tokio::spawn(async move {
135                    if let Err(err) = write_instruction_events(db_client, rows).await {
136                        log::error!("failed to write instruction events: {}", err);
137                    }
138                });
139            }
140
141            Ok(())
142        }
143        .boxed()
144    }
145
146    #[inline(always)]
147    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
148        async move {
149            log::info!("Instruction Tracking Plugin loaded.");
150            if let Some(db) = db {
151                log::info!("Ensuring slot_instructions table exists...");
152                db.query(
153                    r#"
154                    CREATE TABLE IF NOT EXISTS slot_instructions (
155                        slot                         UInt32,
156                        timestamp                    DateTime('UTC'),
157                        vote_instruction_count       UInt64,
158                        non_vote_instruction_count   UInt64,
159                        vote_transaction_count       UInt32,
160                        non_vote_transaction_count   UInt32
161                    )
162                    ENGINE = ReplacingMergeTree(timestamp)
163                    ORDER BY slot
164                    "#,
165                )
166                .execute()
167                .await?;
168                log::info!("done.");
169            } else {
170                log::warn!(
171                    "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
172                );
173            }
174            Ok(())
175        }
176        .boxed()
177    }
178
179    #[inline(always)]
180    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
181        async move {
182            if let Some(db_client) = db {
183                let rows = Self::drain_all_pending(None);
184                if !rows.is_empty() {
185                    write_instruction_events(Arc::clone(&db_client), rows)
186                        .await
187                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
188                            Box::new(err)
189                        })?;
190                }
191                backfill_instruction_timestamps(db_client)
192                    .await
193                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
194            }
195            Ok(())
196        }
197        .boxed()
198    }
199}
200
201async fn write_instruction_events(
202    db: Arc<Client>,
203    rows: Vec<SlotInstructionEvent>,
204) -> Result<(), clickhouse::error::Error> {
205    if rows.is_empty() {
206        return Ok(());
207    }
208    let mut insert = db
209        .insert::<SlotInstructionEvent>("slot_instructions")
210        .await?;
211    for row in rows {
212        insert.write(&row).await?;
213    }
214    insert.end().await?;
215    Ok(())
216}
217
218fn clamp_block_time(block_time: Option<i64>) -> u32 {
219    let Some(raw_ts) = block_time else {
220        return 0;
221    };
222    if raw_ts < 0 {
223        0
224    } else if raw_ts > u32::MAX as i64 {
225        u32::MAX
226    } else {
227        raw_ts as u32
228    }
229}
230
231fn instruction_vote_counts(transaction: &TransactionData) -> (u64, u64) {
232    let static_keys = transaction.transaction.message.static_account_keys();
233    let vote_program = vote_program_id();
234    let mut vote_count: u64 = 0;
235    let mut non_vote_count: u64 = 0;
236
237    let classify = |program_index: usize, vote_count: &mut u64, non_vote_count: &mut u64| {
238        if let Some(pid) = static_keys.get(program_index) {
239            if pid == &vote_program {
240                *vote_count = vote_count.saturating_add(1);
241            } else {
242                *non_vote_count = non_vote_count.saturating_add(1);
243            }
244        } else {
245            *non_vote_count = non_vote_count.saturating_add(1);
246        }
247    };
248
249    match &transaction.transaction.message {
250        VersionedMessage::Legacy(msg) => {
251            for ix in &msg.instructions {
252                classify(
253                    ix.program_id_index as usize,
254                    &mut vote_count,
255                    &mut non_vote_count,
256                );
257            }
258        }
259        VersionedMessage::V0(msg) => {
260            for ix in &msg.instructions {
261                classify(
262                    ix.program_id_index as usize,
263                    &mut vote_count,
264                    &mut non_vote_count,
265                );
266            }
267        }
268    }
269
270    if let Some(inner_sets) = transaction
271        .transaction_status_meta
272        .inner_instructions
273        .as_ref()
274    {
275        for set in inner_sets {
276            for ix in &set.instructions {
277                classify(
278                    ix.instruction.program_id_index as usize,
279                    &mut vote_count,
280                    &mut non_vote_count,
281                );
282            }
283        }
284    }
285
286    (vote_count, non_vote_count)
287}
288
289async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
290    db.query(
291        r#"
292        INSERT INTO slot_instructions
293        SELECT si.slot,
294               ss.block_time,
295               si.vote_instruction_count,
296               si.non_vote_instruction_count,
297               si.vote_transaction_count,
298               si.non_vote_transaction_count
299        FROM slot_instructions AS si
300        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
301        WHERE si.timestamp = toDateTime(0)
302          AND ss.block_time > toDateTime(0)
303        "#,
304    )
305    .execute()
306    .await?;
307
308    Ok(())
309}