Skip to main content

jetstreamer_plugin/plugins/
instruction_tracking.rs

1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9use solana_sdk_ids::vote::id as vote_program_id;
10
11use crate::{Plugin, PluginFuture};
12use jetstreamer_firehose::firehose::{BlockData, TransactionData};
13
14static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent, ahash::RandomState>> =
15    Lazy::new(|| DashMap::with_hasher(ahash::RandomState::new()));
16
17#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
18struct SlotInstructionEvent {
19    slot: u32,
20    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; clamp Solana's i64 timestamp.
21    timestamp: u32,
22    vote_instruction_count: u64,
23    non_vote_instruction_count: u64,
24    vote_transaction_count: u32,
25    non_vote_transaction_count: u32,
26}
27
28#[derive(Debug, Clone)]
29/// Tracks total instructions executed per slot (votes and non-votes separated) and batches writes to ClickHouse.
30pub struct InstructionTrackingPlugin;
31
32impl InstructionTrackingPlugin {
33    /// Creates a new instance that records both vote and non-vote transactions.
34    pub const fn new() -> Self {
35        Self
36    }
37
38    fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
39        let timestamp = clamp_block_time(block_time);
40        PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
41            event.timestamp = timestamp;
42            event
43        })
44    }
45
46    fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
47        let timestamp = clamp_block_time(block_time);
48        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
49        let mut rows = Vec::new();
50        for slot in slots {
51            if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
52                event.timestamp = timestamp;
53                rows.push(event);
54            }
55        }
56        rows
57    }
58}
59
60impl Default for InstructionTrackingPlugin {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl Plugin for InstructionTrackingPlugin {
67    #[inline(always)]
68    fn name(&self) -> &'static str {
69        "Instruction Tracking"
70    }
71
72    #[inline(always)]
73    fn on_transaction<'a>(
74        &'a self,
75        _thread_id: usize,
76        _db: Option<Arc<Client>>,
77        transaction: &'a TransactionData,
78    ) -> PluginFuture<'a> {
79        async move {
80            let (vote_instruction_count, non_vote_instruction_count) =
81                instruction_vote_counts(transaction);
82
83            let slot = transaction.slot;
84            let mut entry = PENDING_BY_SLOT
85                .entry(slot)
86                .or_insert_with(|| SlotInstructionEvent {
87                    slot: slot.min(u32::MAX as u64) as u32,
88                    timestamp: 0,
89                    vote_instruction_count: 0,
90                    non_vote_instruction_count: 0,
91                    vote_transaction_count: 0,
92                    non_vote_transaction_count: 0,
93                });
94            entry.vote_instruction_count = entry
95                .vote_instruction_count
96                .saturating_add(vote_instruction_count);
97            entry.non_vote_instruction_count = entry
98                .non_vote_instruction_count
99                .saturating_add(non_vote_instruction_count);
100            if vote_instruction_count > 0 {
101                entry.vote_transaction_count = entry.vote_transaction_count.saturating_add(1);
102            } else {
103                entry.non_vote_transaction_count =
104                    entry.non_vote_transaction_count.saturating_add(1);
105            }
106
107            Ok(())
108        }
109        .boxed()
110    }
111
112    #[inline(always)]
113    fn on_block(
114        &self,
115        _thread_id: usize,
116        db: Option<Arc<Client>>,
117        block: &BlockData,
118    ) -> PluginFuture<'_> {
119        let slot = block.slot();
120        let block_time = block.block_time();
121        let was_skipped = block.was_skipped();
122
123        async move {
124            if was_skipped {
125                return Ok(());
126            }
127
128            let rows = Self::take_slot_event(slot, block_time)
129                .into_iter()
130                .collect::<Vec<_>>();
131
132            if let Some(db_client) = db
133                && !rows.is_empty()
134            {
135                tokio::spawn(async move {
136                    if let Err(err) = write_instruction_events(db_client, rows).await {
137                        log::error!("failed to write instruction events: {}", err);
138                    }
139                });
140            }
141
142            Ok(())
143        }
144        .boxed()
145    }
146
147    #[inline(always)]
148    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
149        async move {
150            log::info!("Instruction Tracking Plugin loaded.");
151            if let Some(db) = db {
152                log::info!("Ensuring slot_instructions table exists...");
153                db.query(
154                    r#"
155                    CREATE TABLE IF NOT EXISTS slot_instructions (
156                        slot                         UInt32,
157                        timestamp                    DateTime('UTC'),
158                        vote_instruction_count       UInt64,
159                        non_vote_instruction_count   UInt64,
160                        vote_transaction_count       UInt32,
161                        non_vote_transaction_count   UInt32
162                    )
163                    ENGINE = ReplacingMergeTree(timestamp)
164                    ORDER BY slot
165                    "#,
166                )
167                .execute()
168                .await?;
169                log::info!("done.");
170            } else {
171                log::warn!(
172                    "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
173                );
174            }
175            Ok(())
176        }
177        .boxed()
178    }
179
180    #[inline(always)]
181    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
182        async move {
183            if let Some(db_client) = db {
184                let rows = Self::drain_all_pending(None);
185                if !rows.is_empty() {
186                    write_instruction_events(Arc::clone(&db_client), rows)
187                        .await
188                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
189                            Box::new(err)
190                        })?;
191                }
192                backfill_instruction_timestamps(db_client)
193                    .await
194                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
195            }
196            Ok(())
197        }
198        .boxed()
199    }
200}
201
202async fn write_instruction_events(
203    db: Arc<Client>,
204    rows: Vec<SlotInstructionEvent>,
205) -> Result<(), clickhouse::error::Error> {
206    if rows.is_empty() {
207        return Ok(());
208    }
209    let mut insert = db
210        .insert::<SlotInstructionEvent>("slot_instructions")
211        .await?;
212    for row in rows {
213        insert.write(&row).await?;
214    }
215    insert.end().await?;
216    Ok(())
217}
218
219fn clamp_block_time(block_time: Option<i64>) -> u32 {
220    let Some(raw_ts) = block_time else {
221        return 0;
222    };
223    if raw_ts < 0 {
224        0
225    } else if raw_ts > u32::MAX as i64 {
226        u32::MAX
227    } else {
228        raw_ts as u32
229    }
230}
231
232fn instruction_vote_counts(transaction: &TransactionData) -> (u64, u64) {
233    let static_keys = transaction.transaction.message.static_account_keys();
234    let vote_program = vote_program_id();
235    let mut vote_count: u64 = 0;
236    let mut non_vote_count: u64 = 0;
237
238    let classify = |program_index: usize, vote_count: &mut u64, non_vote_count: &mut u64| {
239        if let Some(pid) = static_keys.get(program_index) {
240            if pid == &vote_program {
241                *vote_count = vote_count.saturating_add(1);
242            } else {
243                *non_vote_count = non_vote_count.saturating_add(1);
244            }
245        } else {
246            *non_vote_count = non_vote_count.saturating_add(1);
247        }
248    };
249
250    match &transaction.transaction.message {
251        VersionedMessage::Legacy(msg) => {
252            for ix in &msg.instructions {
253                classify(
254                    ix.program_id_index as usize,
255                    &mut vote_count,
256                    &mut non_vote_count,
257                );
258            }
259        }
260        VersionedMessage::V0(msg) => {
261            for ix in &msg.instructions {
262                classify(
263                    ix.program_id_index as usize,
264                    &mut vote_count,
265                    &mut non_vote_count,
266                );
267            }
268        }
269    }
270
271    if let Some(inner_sets) = transaction
272        .transaction_status_meta
273        .inner_instructions
274        .as_ref()
275    {
276        for set in inner_sets {
277            for ix in &set.instructions {
278                classify(
279                    ix.instruction.program_id_index as usize,
280                    &mut vote_count,
281                    &mut non_vote_count,
282                );
283            }
284        }
285    }
286
287    (vote_count, non_vote_count)
288}
289
290async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
291    db.query(
292        r#"
293        INSERT INTO slot_instructions
294        SELECT si.slot,
295               ss.block_time,
296               si.vote_instruction_count,
297               si.non_vote_instruction_count,
298               si.vote_transaction_count,
299               si.non_vote_transaction_count
300        FROM slot_instructions AS si
301        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
302        WHERE si.timestamp = toDateTime(0)
303          AND ss.block_time > toDateTime(0)
304        "#,
305    )
306    .execute()
307    .await?;
308
309    Ok(())
310}