jetstreamer_plugin/plugins/
instruction_tracking.rs

1use std::sync::Arc;
2
3use clickhouse::{Client, Row};
4use dashmap::DashMap;
5use futures_util::FutureExt;
6use once_cell::sync::Lazy;
7use serde::{Deserialize, Serialize};
8use solana_message::VersionedMessage;
9
10use crate::{Plugin, PluginFuture};
11use jetstreamer_firehose::firehose::{BlockData, TransactionData};
12
13static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotInstructionEvent>> = Lazy::new(DashMap::new);
14
15#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug)]
16struct SlotInstructionEvent {
17    slot: u32,
18    // Stored as ClickHouse DateTime('UTC') -> UInt32 seconds; clamp Solana's i64 timestamp.
19    timestamp: u32,
20    instruction_count: u64,
21    transaction_count: u32,
22}
23
24#[derive(Debug, Default, Clone)]
25/// Tracks total instructions executed per slot and batches writes to ClickHouse.
26pub struct InstructionTrackingPlugin;
27
28impl InstructionTrackingPlugin {
29    fn take_slot_event(slot: u64, block_time: Option<i64>) -> Option<SlotInstructionEvent> {
30        let timestamp = clamp_block_time(block_time);
31        PENDING_BY_SLOT.remove(&slot).map(|(_, mut event)| {
32            event.timestamp = timestamp;
33            event
34        })
35    }
36
37    fn drain_all_pending(block_time: Option<i64>) -> Vec<SlotInstructionEvent> {
38        let timestamp = clamp_block_time(block_time);
39        let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
40        let mut rows = Vec::new();
41        for slot in slots {
42            if let Some((_, mut event)) = PENDING_BY_SLOT.remove(&slot) {
43                event.timestamp = timestamp;
44                rows.push(event);
45            }
46        }
47        rows
48    }
49}
50
51impl Plugin for InstructionTrackingPlugin {
52    #[inline(always)]
53    fn name(&self) -> &'static str {
54        "Instruction Tracking"
55    }
56
57    #[inline(always)]
58    fn on_transaction<'a>(
59        &'a self,
60        _thread_id: usize,
61        _db: Option<Arc<Client>>,
62        transaction: &'a TransactionData,
63    ) -> PluginFuture<'a> {
64        async move {
65            let instruction_count = total_instruction_count(transaction);
66
67            let slot = transaction.slot;
68            let mut entry = PENDING_BY_SLOT
69                .entry(slot)
70                .or_insert_with(|| SlotInstructionEvent {
71                    slot: slot.min(u32::MAX as u64) as u32,
72                    timestamp: 0,
73                    instruction_count: 0,
74                    transaction_count: 0,
75                });
76            entry.instruction_count = entry.instruction_count.saturating_add(instruction_count);
77            entry.transaction_count = entry.transaction_count.saturating_add(1);
78
79            Ok(())
80        }
81        .boxed()
82    }
83
84    #[inline(always)]
85    fn on_block(
86        &self,
87        _thread_id: usize,
88        db: Option<Arc<Client>>,
89        block: &BlockData,
90    ) -> PluginFuture<'_> {
91        let slot = block.slot();
92        let block_time = block.block_time();
93        let was_skipped = block.was_skipped();
94
95        async move {
96            if was_skipped {
97                return Ok(());
98            }
99
100            let rows = Self::take_slot_event(slot, block_time)
101                .into_iter()
102                .collect::<Vec<_>>();
103
104            if let Some(db_client) = db.as_ref()
105                && !rows.is_empty()
106            {
107                write_instruction_events(Arc::clone(db_client), rows)
108                    .await
109                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
110            }
111
112            Ok(())
113        }
114        .boxed()
115    }
116
117    #[inline(always)]
118    fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
119        async move {
120            log::info!("Instruction Tracking Plugin loaded.");
121            if let Some(db) = db {
122                log::info!("Ensuring slot_instructions table exists...");
123                db.query(
124                    r#"
125                    CREATE TABLE IF NOT EXISTS slot_instructions (
126                        slot               UInt32,
127                        timestamp          DateTime('UTC'),
128                        instruction_count  UInt64,
129                        transaction_count  UInt32
130                    )
131                    ENGINE = ReplacingMergeTree(timestamp)
132                    ORDER BY slot
133                    "#,
134                )
135                .execute()
136                .await?;
137                log::info!("done.");
138            } else {
139                log::warn!(
140                    "Instruction Tracking Plugin running without ClickHouse; data will not be persisted."
141                );
142            }
143            Ok(())
144        }
145        .boxed()
146    }
147
148    #[inline(always)]
149    fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
150        async move {
151            if let Some(db_client) = db {
152                let rows = Self::drain_all_pending(None);
153                if !rows.is_empty() {
154                    write_instruction_events(Arc::clone(&db_client), rows)
155                        .await
156                        .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
157                            Box::new(err)
158                        })?;
159                }
160                backfill_instruction_timestamps(db_client)
161                    .await
162                    .map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
163            }
164            Ok(())
165        }
166        .boxed()
167    }
168}
169
170async fn write_instruction_events(
171    db: Arc<Client>,
172    rows: Vec<SlotInstructionEvent>,
173) -> Result<(), clickhouse::error::Error> {
174    if rows.is_empty() {
175        return Ok(());
176    }
177    let mut insert = db
178        .insert::<SlotInstructionEvent>("slot_instructions")
179        .await?;
180    for row in rows {
181        insert.write(&row).await?;
182    }
183    insert.end().await?;
184    Ok(())
185}
186
187fn clamp_block_time(block_time: Option<i64>) -> u32 {
188    let Some(raw_ts) = block_time else {
189        return 0;
190    };
191    if raw_ts < 0 {
192        0
193    } else if raw_ts > u32::MAX as i64 {
194        u32::MAX
195    } else {
196        raw_ts as u32
197    }
198}
199
200fn total_instruction_count(transaction: &TransactionData) -> u64 {
201    let message_instructions = match &transaction.transaction.message {
202        VersionedMessage::Legacy(msg) => msg.instructions.len() as u64,
203        VersionedMessage::V0(msg) => msg.instructions.len() as u64,
204    };
205    let inner_instruction_count = transaction
206        .transaction_status_meta
207        .inner_instructions
208        .as_ref()
209        .map(|sets| {
210            sets.iter()
211                .map(|set| set.instructions.len() as u64)
212                .sum::<u64>()
213        })
214        .unwrap_or(0);
215    message_instructions.saturating_add(inner_instruction_count)
216}
217
218async fn backfill_instruction_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
219    db.query(
220        r#"
221        INSERT INTO slot_instructions
222        SELECT si.slot,
223               ss.block_time,
224               si.instruction_count,
225               si.transaction_count
226        FROM slot_instructions AS si
227        ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
228        WHERE si.timestamp = toDateTime(0)
229          AND ss.block_time > toDateTime(0)
230        "#,
231    )
232    .execute()
233    .await?;
234
235    Ok(())
236}