use ahash::RandomState;
use std::{collections::HashMap, sync::Arc};
use clickhouse::{Client, Row};
use dashmap::DashMap;
use futures_util::FutureExt;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use solana_address::Address;
use solana_message::VersionedMessage;
use crate::{Plugin, PluginFuture};
use jetstreamer_firehose::firehose::{BlockData, TransactionData};
type SlotProgramKey = (Address, bool);
type SlotProgramEvents = HashMap<SlotProgramKey, ProgramEvent, RandomState>;
static PENDING_BY_SLOT: Lazy<DashMap<u64, SlotProgramEvents, RandomState>> =
Lazy::new(|| DashMap::with_hasher(RandomState::new()));
#[derive(Row, Deserialize, Serialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
struct ProgramEvent {
pub slot: u32,
pub timestamp: u32,
pub program_id: Address,
pub is_vote: bool,
pub count: u32,
pub error_count: u32,
pub min_cus: u32,
pub max_cus: u32,
pub total_cus: u32,
}
#[derive(Debug, Clone)]
pub struct ProgramTrackingPlugin;
impl ProgramTrackingPlugin {
pub const fn new() -> Self {
Self
}
fn take_slot_events(slot: u64, block_time: Option<i64>) -> Vec<ProgramEvent> {
let timestamp = clamp_block_time(block_time);
if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
return events_by_program
.into_values()
.map(|mut event| {
event.timestamp = timestamp;
event
})
.collect();
}
Vec::new()
}
fn drain_all_pending(block_time: Option<i64>) -> Vec<ProgramEvent> {
let timestamp = clamp_block_time(block_time);
let slots: Vec<u64> = PENDING_BY_SLOT.iter().map(|entry| *entry.key()).collect();
let mut rows = Vec::new();
for slot in slots {
if let Some((_, events_by_program)) = PENDING_BY_SLOT.remove(&slot) {
rows.extend(events_by_program.into_values().map(|mut event| {
event.timestamp = timestamp;
event
}));
}
}
rows
}
}
impl Default for ProgramTrackingPlugin {
fn default() -> Self {
Self::new()
}
}
impl Plugin for ProgramTrackingPlugin {
#[inline(always)]
fn name(&self) -> &'static str {
"Program Tracking"
}
#[inline(always)]
fn on_transaction<'a>(
&'a self,
_thread_id: usize,
_db: Option<Arc<Client>>,
transaction: &'a TransactionData,
) -> PluginFuture<'a> {
async move {
let message = &transaction.transaction.message;
let (account_keys, instructions) = match message {
VersionedMessage::Legacy(msg) => (&msg.account_keys, &msg.instructions),
VersionedMessage::V0(msg) => (&msg.account_keys, &msg.instructions),
};
if instructions.is_empty() {
return Ok(());
}
let program_ids = instructions
.iter()
.filter_map(|ix| account_keys.get(ix.program_id_index as usize))
.cloned()
.collect::<Vec<_>>();
if program_ids.is_empty() {
return Ok(());
}
let total_cu = transaction
.transaction_status_meta
.compute_units_consumed
.unwrap_or(0) as u32;
let program_count = program_ids.len() as u32;
let errored = transaction.transaction_status_meta.status.is_err();
let slot = transaction.slot;
let is_vote = transaction.is_vote;
let mut slot_entry = PENDING_BY_SLOT
.entry(slot)
.or_insert_with(|| HashMap::with_hasher(RandomState::new()));
for program_id in program_ids.iter() {
let this_program_cu = if program_count == 0 {
0
} else {
total_cu / program_count
};
let event =
slot_entry
.entry((*program_id, is_vote))
.or_insert_with(|| ProgramEvent {
slot: slot.min(u32::MAX as u64) as u32,
timestamp: 0,
program_id: *program_id,
is_vote,
count: 0,
error_count: 0,
min_cus: u32::MAX,
max_cus: 0,
total_cus: 0,
});
event.min_cus = event.min_cus.min(this_program_cu);
event.max_cus = event.max_cus.max(this_program_cu);
event.total_cus = event.total_cus.saturating_add(this_program_cu);
event.count = event.count.saturating_add(1);
if errored {
event.error_count = event.error_count.saturating_add(1);
}
}
Ok(())
}
.boxed()
}
#[inline(always)]
fn on_block(
&self,
_thread_id: usize,
db: Option<Arc<Client>>,
block: &BlockData,
) -> PluginFuture<'_> {
let slot = block.slot();
let block_time = block.block_time();
let was_skipped = block.was_skipped();
async move {
if was_skipped {
return Ok(());
}
let rows = Self::take_slot_events(slot, block_time);
if let Some(db_client) = db
&& !rows.is_empty()
{
tokio::spawn(async move {
if let Err(err) = write_program_events(db_client, rows).await {
log::error!("failed to write program events: {}", err);
}
});
}
Ok(())
}
.boxed()
}
#[inline(always)]
fn on_load(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
async move {
log::info!("Program Tracking Plugin loaded.");
if let Some(db) = db {
log::info!("Creating program_invocations table if it does not exist...");
db.query(
r#"
CREATE TABLE IF NOT EXISTS program_invocations (
slot UInt32,
timestamp DateTime('UTC'),
program_id FixedString(32),
is_vote UInt8,
count UInt32,
error_count UInt32,
min_cus UInt32,
max_cus UInt32,
total_cus UInt32
)
ENGINE = ReplacingMergeTree(timestamp)
ORDER BY (slot, program_id, is_vote)
"#,
)
.execute()
.await?;
log::info!("done.");
} else {
log::warn!("Program Tracking Plugin running without ClickHouse; data will not be persisted.");
}
Ok(())
}
.boxed()
}
#[inline(always)]
fn on_exit(&self, db: Option<Arc<Client>>) -> PluginFuture<'_> {
async move {
if let Some(db_client) = db {
let rows = Self::drain_all_pending(None);
if !rows.is_empty() {
write_program_events(Arc::clone(&db_client), rows)
.await
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> {
Box::new(err)
})?;
}
backfill_program_timestamps(db_client)
.await
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })?;
}
Ok(())
}
.boxed()
}
}
async fn write_program_events(
db: Arc<Client>,
rows: Vec<ProgramEvent>,
) -> Result<(), clickhouse::error::Error> {
if rows.is_empty() {
return Ok(());
}
let mut insert = db.insert::<ProgramEvent>("program_invocations").await?;
for row in rows {
insert.write(&row).await?;
}
insert.end().await?;
Ok(())
}
fn clamp_block_time(block_time: Option<i64>) -> u32 {
let raw_ts = block_time.unwrap_or(0);
if raw_ts < 0 {
0
} else if raw_ts > u32::MAX as i64 {
u32::MAX
} else {
raw_ts as u32
}
}
async fn backfill_program_timestamps(db: Arc<Client>) -> Result<(), clickhouse::error::Error> {
db.query(
r#"
INSERT INTO program_invocations
SELECT pi.slot,
ss.block_time,
pi.program_id,
pi.is_vote,
pi.count,
pi.error_count,
pi.min_cus,
pi.max_cus,
pi.total_cus
FROM program_invocations AS pi
ANY INNER JOIN jetstreamer_slot_status AS ss USING (slot)
WHERE pi.timestamp = toDateTime(0)
AND ss.block_time > toDateTime(0)
"#,
)
.execute()
.await?;
Ok(())
}