use std::error::Error;
use base64::Engine;
use borsh::{BorshDeserialize, BorshSerialize};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter},
rpc_response::{Response, RpcLogsResponse},
};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::types::Cluster;
use crate::{constants, error};
#[derive(BorshSerialize, BorshDeserialize, Debug, Serialize, Deserialize)]
pub struct CreateEvent {
pub name: String,
pub symbol: String,
pub uri: String,
pub mint: Pubkey,
pub bonding_curve: Pubkey,
pub user: Pubkey,
pub creator: Pubkey,
pub timestamp: i64,
pub virtual_token_reserves: u64,
pub virtual_sol_reserves: u64,
pub real_token_reserves: u64,
pub token_total_supply: u64,
}
#[derive(BorshSerialize, BorshDeserialize, Debug, Serialize, Deserialize)]
pub struct TradeEvent {
pub mint: Pubkey,
pub sol_amount: u64,
pub token_amount: u64,
pub is_buy: bool,
pub user: Pubkey,
pub timestamp: i64,
pub virtual_sol_reserves: u64,
pub virtual_token_reserves: u64,
pub real_sol_reserves: u64,
pub real_token_reserves: u64,
pub fee_recipient: Pubkey,
pub fee_basis_points: u64,
pub fee: u64,
pub creator: Pubkey,
pub creator_fee_basis_points: u64,
pub creator_fee: u64,
pub track_volume: bool,
pub total_unclaimed_tokens: u64,
pub total_claimed_tokens: u64,
pub current_sol_volume: u64,
pub last_update_timestamp: i64,
}
#[derive(BorshSerialize, BorshDeserialize, Debug, Serialize, Deserialize)]
pub struct CompleteEvent {
pub user: Pubkey,
pub mint: Pubkey,
pub bonding_curve: Pubkey,
pub timestamp: i64,
}
#[derive(BorshSerialize, BorshDeserialize, Debug, Serialize, Deserialize)]
pub struct SetParamsEvent {
pub initial_virtual_token_reserves: u64,
pub initial_virtual_sol_reserves: u64,
pub initial_real_token_reserves: u64,
pub final_real_sol_reserves: u64,
pub token_total_supply: u64,
pub fee_basis_points: u64,
pub withdraw_authority: Pubkey,
pub enable_migrate: bool,
pub pool_migration_fee: u64,
pub creator_fee_basis_points: u64,
pub fee_recipients: [Pubkey; 8],
pub timestamp: i64,
pub set_creator_authority: Pubkey,
pub admin_set_creator_authority: Pubkey,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum PumpFunEvent {
Create(CreateEvent),
Trade(TradeEvent),
Complete(CompleteEvent),
SetParams(SetParamsEvent),
Unhandled(String, Vec<u8>), Unknown(String, Vec<u8>), }
pub struct Subscription {
pub task: JoinHandle<()>,
pub unsubscribe: Box<dyn Fn() + Send>,
}
impl Subscription {
pub fn new(task: JoinHandle<()>, unsubscribe: Box<dyn Fn() + Send>) -> Self {
Subscription { task, unsubscribe }
}
}
impl Drop for Subscription {
fn drop(&mut self) {
(self.unsubscribe)();
self.task.abort();
}
}
pub fn parse_event(
signature: &str,
data: &str,
) -> Result<PumpFunEvent, Box<dyn Error + Send + Sync>> {
let decoded = base64::engine::general_purpose::STANDARD.decode(data)?;
if decoded.len() < 8 {
return Err(format!("Data too short to contain discriminator: {}", data).into());
}
let discriminator = &decoded[..8];
match discriminator {
[27, 114, 169, 77, 222, 235, 99, 118] => Ok(PumpFunEvent::Create(
CreateEvent::try_from_slice(&decoded[8..])
.map_err(|e| format!("Failed to decode CreateEvent: {}", e))?,
)),
[189, 219, 127, 211, 78, 230, 97, 238] => Ok(PumpFunEvent::Trade(
TradeEvent::try_from_slice(&decoded[8..])
.map_err(|e| format!("Failed to decode TradeEvent: {}", e))?,
)),
[95, 114, 97, 156, 212, 46, 152, 8] => Ok(PumpFunEvent::Complete(
CompleteEvent::try_from_slice(&decoded[8..])
.map_err(|e| format!("Failed to decode CompleteEvent: {}", e))?,
)),
[223, 195, 159, 246, 62, 48, 143, 131] => Ok(PumpFunEvent::SetParams(
SetParamsEvent::try_from_slice(&decoded[8..])
.map_err(|e| format!("Failed to decode SetParamsEvent: {}", e))?,
)),
[64, 69, 192, 104, 29, 30, 25, 107]
| [245, 59, 70, 34, 75, 185, 109, 92]
| [147, 250, 108, 120, 247, 29, 67, 222]
| [79, 172, 246, 49, 205, 91, 206, 232]
| [146, 159, 189, 172, 146, 88, 56, 244]
| [122, 2, 127, 1, 14, 191, 12, 175]
| [189, 233, 93, 185, 92, 148, 234, 148]
| [97, 97, 215, 144, 93, 146, 22, 124]
| [134, 36, 13, 72, 232, 101, 130, 216]
| [237, 52, 123, 37, 245, 251, 72, 210]
| [142, 203, 6, 32, 127, 105, 191, 162]
| [197, 122, 167, 124, 116, 81, 91, 255]
| [182, 195, 137, 42, 35, 206, 207, 247] => {
Ok(PumpFunEvent::Unhandled(signature.to_string(), decoded))
}
_ => Ok(PumpFunEvent::Unknown(signature.to_string(), decoded)),
}
}
pub async fn subscribe<F>(
cluster: Cluster,
mentioned: Option<String>,
commitment: Option<CommitmentConfig>,
callback: F,
) -> Result<Subscription, error::ClientError>
where
F: Fn(
String,
Option<PumpFunEvent>,
Option<Box<dyn Error + Send + Sync>>,
Response<RpcLogsResponse>,
) + Send
+ Sync
+ 'static,
{
let ws_url = &cluster.rpc.ws;
let pubsub_client = PubsubClient::new(ws_url)
.await
.map_err(error::ClientError::PubsubClientError)?;
let (tx, _) = mpsc::channel(1);
let (cb_tx, mut cb_rx) = mpsc::channel(1000);
tokio::spawn(async move {
while let Some((sig, event, err, log)) = cb_rx.recv().await {
callback(sig, event, err, log);
}
});
let task = tokio::spawn(async move {
let (mut stream, _unsubscribe) = pubsub_client
.logs_subscribe(
RpcTransactionLogsFilter::Mentions(vec![
mentioned.unwrap_or(constants::accounts::PUMPFUN.to_string())
]),
RpcTransactionLogsConfig {
commitment: Some(commitment.unwrap_or(cluster.commitment)),
},
)
.await
.unwrap();
while let Some(log) = stream.next().await {
let signature = &log.value.signature;
for log_line in &log.value.logs {
if let Some(data) = log_line.strip_prefix("Program data: ") {
match parse_event(signature, data) {
Ok(event) => {
let _ = cb_tx
.send((signature.to_string(), Some(event), None, log.clone()))
.await;
}
Err(err) => {
let _ = cb_tx
.send((signature.to_string(), None, Some(err), log.clone()))
.await;
}
}
}
}
}
});
Ok(Subscription::new(
task,
Box::new(move || {
let _ = tx.try_send(());
}),
))
}
#[cfg(test)]
mod tests {
use crate::common::types::PriorityFee;
use super::*;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};
#[cfg(not(skip_expensive_tests))]
#[tokio::test]
async fn test_subscribe() {
if std::env::var("SKIP_EXPENSIVE_TESTS").is_ok() {
return;
}
let cluster = Cluster::mainnet(CommitmentConfig::processed(), PriorityFee::default());
let events: Arc<Mutex<Vec<PumpFunEvent>>> = Arc::new(Mutex::new(Vec::new()));
let callback = {
let events = Arc::clone(&events);
move |signature: String,
event: Option<PumpFunEvent>,
err: Option<Box<dyn Error + Send + Sync>>,
_: Response<RpcLogsResponse>| {
if let Some(event) = event {
let events = Arc::clone(&events);
tokio::spawn(async move {
let mut events = events.lock().await;
events.push(event);
});
} else if err.is_some() {
eprintln!("Error in subscription: signature={}", signature);
}
}
};
let subscription = subscribe(cluster, None, None, callback)
.await
.expect("Failed to start subscription");
let wait_duration = Duration::from_secs(30);
timeout(wait_duration, async {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
.await
.unwrap_err();
drop(subscription);
let events = events.lock().await;
assert!(
!events.is_empty(),
"No events received within {} seconds",
wait_duration.as_secs()
);
println!("Received {} events", events.len());
}
}